Engine SDK Vs Platform SDK: Key Differences

Last modified: July 08, 2021

With the release of the new Platform SDK, it’s a good time to highlight the key differences between the AYX Python SDK and the original Python Engine SDK. This article is also intended to help you understand how to recreate your Python Engine SDK custom tools using the new AYX Python SDK.

AYX Python SDK Callback Definitions

Init

__init__(self, provider): This is the plugin’s constructor.

This callback is roughly the equivalent of the Python Engine SDK’s pi_init function. While pi_init takes in a str_xml parameter (the tool config), in the new AYX Python SDK you can access this information via provider.tool_config (in Python dictionary format).

In general, as a best practice, you should...

  • Save the provider parameter as a plugin attribute.
  • Save the input and output anchors as plugin attributes. You can access these via provider.get_input_anchor() and provider.get_output_anchor().

On Input Connection Opened

on_input_connection_opened(self, input_connection): This is the code that runs when a data source is connected for the first time (or on updates). Once you run this code, the input connection is able to receive information.

This callback is roughly the equivalent of pi_add_incoming_connection, ii_init, and pi_add_outgoing_connection in the Python Engine SDK. pi_add_incoming_connection and ii_init are both handled under-the-hood, whereas on_input_connection_opened receives the relevant input connection as either an AMPInputConnection or an E1InputConnection.

This method is called exactly once for each input connection. In general, this method is used to...

  • Set the input_connection.max_packet_size attribute, to restrict incoming packet sizes.
  • Set output anchor metadata using provider.get_output_anchor(anchor_name).open().

On Record Packet

on_record_packet(self, input_connection): The code that runs during the workflow, whenever a new record packet is ready to be processed.

This callback is roughly the equivalent of pi_push_all_records, ii_push_record, and ii_close. Note that ii_close (or closing the input connection after the last record is sent) isn’t handled separately in the new AYX Python SDK.

This method is called for each input connection until the final record packet is sent. To retrieve record packets, call input_connection.read().

On Complete

on_complete(self): This is the code that runs after all of the record packets (for each of the input connections) have been processed. This callback is roughly the equivalent of pi_close in the Python Engine SDK.

Lifecycle Equivalents: Engine SDK vs Platform SDK

Engine Platform (AYX Python SDK)
pi_init __init__(self, provider)
pi_add_incoming_connection on_input_connection_opened(self, input_connection)
ii_init  
pi_add_outgoing_connection  
pi_push_all_records on_record_packet(self, input_connection)
ii_push_record  
ii_close  
pi_close on_complete(self)

Other

Engine Platorm (AYX Python SDK)
xmsg() io_base.translate_msg()
update_progress() io_base.update_progress()

Unimplemented

Engine Platform (AYX Python SDK)
build_sort_info  

Implementation Examples

Compare code examples for the Python Engine SDK and the new AYX Python SDK.

Python Engine SDK

Python
  1. """
  2. AyxPlugin (required) has-a IncomingInterface (optional).
  3. Although defining IncomingInterface is optional, the interface methods are needed if an upstream tool exists.
  4. """
  5.  
  6. import AlteryxPythonSDK as Sdk
  7. import xml.etree.ElementTree as Et
  8. import itertools as it
  9.  
  10.  
  11. class AyxPlugin:
  12.     """
  13.     Implements the plugin interface methods, to be utilized by the Alteryx engine to communicate with a plugin.
  14.     Prefixed with "pi", the Alteryx engine will expect the below five interface methods to be defined.
  15.     """
  16.  
  17.     def __init__(self, n_tool_id: int, alteryx_engine: object, output_anchor_mgr: object):
  18.         """
  19.         Constructor is called whenever the Alteryx engine wants to instantiate an instance of this plugin.
  20.         :param n_tool_id: The assigned unique identification for a tool instance.
  21.         :param alteryx_engine: Provides an interface into the Alteryx engine.
  22.         :param output_anchor_mgr: A helper that wraps the outgoing connections for a plugin.
  23.         """
  24.  
  25.         # Default properties
  26.         self.n_tool_id = n_tool_id
  27.         self.alteryx_engine = alteryx_engine
  28.         self.output_anchor_mgr = output_anchor_mgr
  29.  
  30.         # Custom properties
  31.         self.left_input = None
  32.         self.right_input = None
  33.         self.left_prefix = ''
  34.         self.right_prefix = ''
  35.         self.output_anchor = None
  36.  
  37.     def pi_init(self, str_xml: str):
  38.         """
  39.         Getting the user-entered prefixes from the GUI, and the output anchor from the XML file.
  40.         Called when the Alteryx engine is ready to provide the tool configuration from the GUI.
  41.         :param str_xml: The raw XML from the GUI.
  42.         """
  43.  
  44.         self.left_prefix = Et.fromstring(str_xml).find('LeftPrefix').text
  45.         self.right_prefix = Et.fromstring(str_xml).find('RightPrefix').text
  46.         self.output_anchor = self.output_anchor_mgr.get_output_anchor('Output')
  47.  
  48.     def pi_add_incoming_connection(self, str_type: str, str_name: str) -> object:
  49.         """
  50.         The IncomingInterface objects are instantiated here, one object per incoming connection.
  51.         Called when the Alteryx engine is attempting to add an incoming data connection.
  52.         :param str_type: The name of the input connection anchor, defined in the Config.xml file.
  53.         :param str_name: The name of the wire, defined by the workflow author.
  54.         :return: The IncomingInterface object(s).
  55.         """
  56.  
  57.         if str_type == 'Left':
  58.             self.left_input = IncomingInterface(self, self.left_prefix)
  59.             return self.left_input
  60.         elif str_type == 'Right':
  61.             self.right_input = IncomingInterface(self, self.right_prefix)
  62.             return self.right_input
  63.         else:
  64.             self.display_error_message('Invalid Input Connection')
  65.  
  66.     def pi_add_outgoing_connection(self, str_name: str) -> bool:
  67.         """
  68.         Called when the Alteryx engine is attempting to add an outgoing data connection.
  69.         :param str_name: The name of the output connection anchor, defined in the Config.xml file.
  70.         :return: True signifies that the connection is accepted.
  71.         """
  72.  
  73.         return True
  74.  
  75.     def pi_push_all_records(self, n_record_limit: int) -> bool:
  76.         """
  77.         Called when a tool has no incoming data connection.
  78.         :param n_record_limit: Set it to <0 for no limit, 0 for no records, and >0 to specify the number of records.
  79.         :return: True for success, False for failure.
  80.         """
  81.  
  82.         self.display_error_message('Missing Incoming Connection')
  83.         return False
  84.  
  85.     def pi_close(self, b_has_errors: bool):
  86.         """
  87.         Called after all records have been processed.
  88.         :param b_has_errors: Set to true to not do the final processing.
  89.         """
  90.  
  91.         self.output_anchor.assert_close()  # Checks whether connections were properly closed.
  92.  
  93.     def check_input_complete(self):
  94.         """
  95.         A non-interface helper tasked to verify end of processing for both incoming connections.
  96.         """
  97.  
  98.         if self.right_input is not None and self.left_input is not None:
  99.             if self.right_input.input_complete and self.left_input.input_complete:
  100.                 self.process_output()
  101.         else:
  102.             self.display_error_message('Both left and right inputs must have connections')
  103.  
  104.     @staticmethod
  105.     def init_record_info_out(child: object, record_info_out: object):
  106.         """
  107.         A non-interface helper for process_output() that handles building out the layout for record_info_out.
  108.         :param child: An incoming connection.
  109.         :param record_info_out: The outgoing record info object.
  110.         :return: Updated initialization of record_info_out.
  111.         """
  112.  
  113.         record_info_out.init_from_xml(
  114.             child.record_info_in.get_record_xml_meta_data(True),
  115.             child.rename_prefix + '_' if child.rename_prefix is not None else ''
  116.         )
  117.         return record_info_out
  118.  
  119.     @staticmethod
  120.     def swap_outgoing_order(left_input: object, right_input: object):
  121.         """
  122.         A non-interface helper for process_output() that assigns the mapping order based on number of records.
  123.         :param left_input: the object from the left incoming connection
  124.         :param right_input: the object from the right incoming connection
  125.         :return: New names for the incoming connections.
  126.         """
  127.  
  128.         min_n_records = min(len(left_input.record_list), len(right_input.record_list))
  129.         max_n_records = max(len(left_input.record_list), len(right_input.record_list))
  130.  
  131.         # Having the shortest list be the first to output, so set_dest_to_null is applied only for the first copy,\
  132.         # when dealing with an uneven record pair. This swap process will eventually be replaced in subsequent releases.
  133.         if min_n_records != max_n_records:
  134.             first_half_output = left_input if min_n_records == len(left_input.record_list) else right_input
  135.             second_half_output = right_input if first_half_output == left_input else left_input
  136.         else:
  137.             first_half_output = left_input
  138.             second_half_output = right_input
  139.         return first_half_output, second_half_output
  140.  
  141.     @staticmethod
  142.     def setup_record_copier(child: object, record_info_out: object, start_index: int):
  143.         """
  144.         A non-interface helper for process_output() that maps the appropriate fields to their designated positions.
  145.         :param child: Incoming connection object.
  146.         :param record_info_out: The outgoing record layout.
  147.         :param start_index: The starting field position of an incoming connection object.
  148.         :return: The starting field position for the next incoming connection object.
  149.         """
  150.  
  151.         child.record_copier = Sdk.RecordCopier(record_info_out, child.record_info_in)
  152.         for index in range(child.record_info_in.num_fields):
  153.             child.record_copier.add(start_index + index, index)
  154.         child.record_copier.done_adding()
  155.         return child.record_info_in.num_fields
  156.  
  157.     def process_output(self):
  158.         """
  159.         A non-interface method responsible for pushing the records based on the joined record layout.
  160.         """
  161.  
  162.         # Determining the mapping order based on length of the incoming data streams.
  163.         first_half_output, second_half_output = self.swap_outgoing_order(self.left_input, self.right_input)
  164.  
  165.         # Having the helper initialize the RecordInfo object for the outgoing stream.
  166.         record_info_out = self.init_record_info_out(first_half_output, Sdk.RecordInfo(self.alteryx_engine))
  167.         record_info_out = self.init_record_info_out(second_half_output, record_info_out)
  168.  
  169.         self.output_anchor.init(record_info_out)  # Lets the downstream tools know of the outgoing record metadata.
  170.  
  171.         # Having the helper function handle the field index mapping from both incoming streams, into record_info_out.
  172.         start_index = self.setup_record_copier(first_half_output, record_info_out, 0)
  173.         self.setup_record_copier(second_half_output, record_info_out, start_index)
  174.  
  175.         record_creator = record_info_out.construct_record_creator()  # Creating a new record_creator for the joined records.
  176.  
  177.         for input_pair in it.zip_longest(first_half_output.record_list, second_half_output.record_list):
  178.  
  179.             # Copying the record into the record creator. NULL values will be used to fill for the difference.
  180.             if input_pair[0] is not None:
  181.                 first_half_output.record_copier.copy(record_creator, input_pair[0].finalize_record())
  182.             else:
  183.                 first_half_output.record_copier.set_dest_to_null(record_creator)
  184.             second_half_output.record_copier.copy(record_creator, input_pair[1].finalize_record())
  185.  
  186.             # Asking for a record to push downstream, then resetting the record to prevent unexpected results.
  187.             output_record = record_creator.finalize_record()
  188.             self.output_anchor.push_record(output_record, False)
  189.             record_creator.reset()
  190.  
  191.             #TODO: The progress update to the downstream tool, based on time elapsed, should go here.
  192.  
  193.         self.output_anchor.close()  # Close outgoing connections.
  194.  
  195.     def process_update_input_progress(self):
  196.         """
  197.         A non-interface helper to update the incoming progress based on records received from the input streams.
  198.         """
  199.  
  200.         if self.right_input is not None and self.left_input is not None:
  201.             # We're assuming receiving the input data accounts for half the progress.
  202.             input_percent = (self.right_input.d_progress_percentage + self.left_input.d_progress_percentage) / 2
  203.             self.alteryx_engine.output_tool_progress(self.n_tool_id, input_percent / 2)
  204.  
  205.     def display_error_message(self, msg_string: str):
  206.         """
  207.         A non-interface helper function, responsible for outputting error messages.
  208.         :param msg_string: The error message string.
  209.         """
  210.  
  211.         self.alteryx_engine.output_message(self.n_tool_id, Sdk.EngineMessageType.error, self.xmsg(msg_string))
  212.  
  213.     def xmsg(self, msg_string: str):
  214.         """
  215.         A non-interface, non-operational placeholder for the eventual localization of predefined user-facing strings.
  216.         :param msg_string: The user-facing string.
  217.         :return: msg_string
  218.         """
  219.  
  220.         return msg_string
  221.  
  222.  
  223. class IncomingInterface:
  224.     """
  225.     This optional class is returned by pi_add_incoming_connection, and it implements the incoming interface methods, to
  226.     be utilized by the Alteryx engine to communicate with a plugin when processing an incoming connection.
  227.     Prefixed with "ii", the Alteryx engine will expect the below four interface methods to be defined.
  228.     """
  229.  
  230.     def __init__(self, parent: object, rename_prefix: str):
  231.         """
  232.         Constructor for IncomingInterface.
  233.         :param parent: AyxPlugin
  234.         :param rename_prefix: The prefix string entered by the user, if any.
  235.         """
  236.  
  237.         # Default properties
  238.         self.parent = parent
  239.         self.rename_prefix = rename_prefix
  240.  
  241.         # Custom properties
  242.         self.input_complete = False
  243.         self.d_progress_percentage = 0
  244.         self.record_info_in = None
  245.         self.record_copier = None
  246.         self.record_list = []
  247.  
  248.     def ii_init(self, record_info_in: object) -> bool:
  249.         """
  250.         Although no new records are being added, the prep work here will allow for data state preservation in ii_push_record.
  251.         Called to report changes of the incoming connection's record metadata to the Alteryx engine.
  252.         :param record_info_in: A RecordInfo object for the incoming connection's fields.
  253.         :return: True for success, otherwise False.
  254.         """
  255.  
  256.         self.record_copier = Sdk.RecordCopier(record_info_in, record_info_in)
  257.  
  258.         # Map each column of the input to where we want in the output.
  259.         for index in range(record_info_in.num_fields):
  260.             self.record_copier.add(index, index)
  261.  
  262.         self.record_copier.done_adding()  # A necessary step to let record copier know that field mappings are done.
  263.         self.record_info_in = record_info_in  # For later reference.
  264.         return True
  265.  
  266.     def ii_push_record(self, in_record: object) -> bool:
  267.         """
  268.         Preserving the state of the incoming record data, since the reference to a record dies beyond this point.
  269.         Called when an input record is being sent to the plugin.
  270.         :param in_record: The data for the incoming record.
  271.         :return: False if method calling limit (record_cnt) is hit.
  272.         """
  273.  
  274.         self.record_list.append(self.record_info_in.construct_record_creator())
  275.         self.record_copier.copy(self.record_list[-1], in_record)
  276.         return True
  277.  
  278.     def ii_update_progress(self, d_percent: float):
  279.         """
  280.         Called by the upstream tool to report what percentage of records have been pushed.
  281.         :param d_percent: Value between 0.0 and 1.0.
  282.         """
  283.  
  284.         self.d_progress_percentage = d_percent
  285.         self.parent.process_update_input_progress()
  286.  
  287.     def ii_close(self):
  288.         """
  289.         Called when the incoming connection has finished passing all of its records.
  290.         """
  291.  
  292.         self.input_complete = True
  293.         self.parent.check_input_complete()

AYX Python SDK

Python
  1. # Copyright (C) 2021 Alteryx, Inc. All rights reserved.
  2. #
  3. # Licensed under the ALTERYX SDK AND API LICENSE AGREEMENT;
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. #    https://www.alteryx.com/alteryx-sdk-and-api-license-agreement
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Example multiple input anchor tool."""
  15. import pandas as pd
  16. from ayx_python_sdk.core import (
  17.     InputConnectionBase,
  18.     Metadata,
  19.     Plugin,
  20.     ProviderBase,
  21.     RecordPacket,
  22.     register_plugin,
  23. )
  24. from ayx_python_sdk.core.exceptions import WorkflowRuntimeError
  25.  
  26.  
  27. class MultiInputTool(Plugin):
  28.     """Concrete implementation of an AyxPlugin."""
  29.  
  30.     def __init__(self, provider: ProviderBase) -> None:
  31.         """Construct a plugin."""
  32.         self.provider = provider
  33.         self.output_anchor = self.provider.get_output_anchor("Output")
  34.         self.tool_config = self.provider.tool_config
  35.         self.left_prefix = self.tool_config.get("leftField")
  36.         self.right_prefix = self.tool_config.get("rightField")
  37.         self.left_df = None
  38.         self.right_df = None
  39.         self.provider.io.info("Plugin initialized.")
  40.         self.connections_opened = False
  41.         self.output_df = None
  42.         self.output_record_packet = None
  43.         self.output_metadata = Metadata()
  44.  
  45.     def on_input_connection_opened(self, input_connection: InputConnectionBase) -> None:
  46.         """Initialize the Input Connections of this plugin."""
  47.  
  48.         if input_connection.anchor.name == "Input1":
  49.             for field in input_connection.metadata:
  50.                 col_name = self.left_prefix + "_" + field.name if self.left_prefix else field.name
  51.                 self.output_metadata.add_field(col_name, field.type)
  52.  
  53.         if input_connection.anchor.name == "Input2":
  54.             self.connections_opened = True
  55.             for field in input_connection.metadata:
  56.                 col_name = self.right_prefix + "_" + field.name if self.right_prefix else field.name
  57.                 self.output_metadata.add_field(col_name, field.type)
  58.  
  59.         self.provider.io.info(f"Connection {input_connection.name} Initialized!")
  60.         if self.connections_opened:
  61.             self.output_anchor.open(self.output_metadata)
  62.         input_connection.max_packet_size = 1
  63.  
  64.     def on_record_packet(self, input_connection: InputConnectionBase) -> None:
  65.         """Handle the record packet received through the input connection."""
  66.         self.provider.io.info("Record packet received!")
  67.         input_df = input_connection.read().to_dataframe()
  68.         # aggregate all left and right records
  69.         if input_connection.anchor.name == "Input1":
  70.             if self.left_df is None:
  71.                 self.left_df = input_df
  72.             else:
  73.                 self.left_df = self.left_df.append(input_df, ignore_index=True)
  74.         if input_connection.anchor.name == "Input2":
  75.             if self.right_df is None:
  76.                 self.right_df = input_df
  77.             else:
  78.                 self.right_df = self.right_df.append(input_df, ignore_index=True)
  79.  
  80.     def on_complete(self) -> None:
  81.         """Finalize the plugin."""
  82.         self.left_df.columns = [self.left_prefix + "_" + col_name for col_name in self.left_df.columns]
  83.         self.right_df.columns = [self.right_prefix + "_" + col_name for col_name in self.right_df.columns]
  84.  
  85.         res = pd.concat([self.left_df, self.right_df], axis=1, join="inner")
  86.         self.output_record_packet = RecordPacket.from_dataframe(metadata=self.output_metadata, df=res)
  87.  
  88.         self.output_anchor.write(self.output_record_packet)
  89.         self.provider.io.info("Completed processing records.")
  90.  
  91.  
  92. AyxPlugin = register_plugin(MultiInputTool)
Was This Page Helpful?

Running into problems or issues with your Alteryx product? Visit the Alteryx Community or contact support. Can't submit this form? Email us.