Engine SDK Vs Platform SDK: Key Differences
With the release of the new Platform SDK, it’s a good time to highlight the key differences between the AYX Python SDK and the original Python Engine SDK. This article is also intended to help you understand how to recreate your Python Engine SDK custom tools using the new AYX Python SDK.
Overview
Packaging
The original Python Engine SDK was built around a Python package called AlteryxPythonSDK
. This package is available at runtime inside of Alteryx Designer and can be accessed via import.
The new Platform SDK is a standalone Python pip
package that you can install via pip install
. It doesn't depend on any special libraries that only ship with Designer.
Getting Started
Getting started in the old SDK typically means you need to reference an example plugin, copy the code and file structure, and then modify it to suit your own needs.
The new Platform SDK provides a new command-line interface (CLI) that provides functionality to take care of all of this project setup. Go to the Getting Started Guide to learn more.
Development
In the original Platform SDK, the backend of a tool was developed via a class definition that satisfied the interface described in AyxPlugin Python Class.
This class implements certain methods like pi_init
and pi_add_incoming_connection
(among others). This paradigm leads to lots of boilerplate code and makes plugin development a burden. This typically leads the "meat" of the plugin to be only a few lines of Python, whereas the overall tool definition is hundreds of lines.
The new SDK has alleviated this problem by simplifying the interface that must be satisfied, to a bare minimum set of requirements.
Similar to the old SDK, in the new SDK, you must write a PluginV2
class. In the new SDK, a base class definition of PluginV2
is defined to be used as a parent. This gives you a level of comfort that you have implemented all necessary methods to have a valid Alteryx Designer Plugin. Only 4 methods need to be implemented at most with the new PluginV2
class.
Additionally, in the old SDK, a class called IncomingInterface
was required. This requirement has been removed in the new SDK, as incoming interfaces and connections are handled behind the scenes by the SDK, and made available via the new ProviderV2
concept.
Deployment
Once plugin development is complete, it's often distributed via the YXI file format. You can find the instructions for packaging a YXI in the old SDK on the Package a Tool page. However, this packaging process has been significantly simplified by the new CLI, described in the Getting Started Guide.
AYX Python SDK Callback Definitions
Init
__init__(self, provider)
: This is the plugin’s constructor.
This callback is roughly the equivalent of the Python Engine SDK’s pi_init
function. While pi_init
takes in a str_xml
parameter (the tool config), in the new AYX Python SDK you can access this information via provider.tool_config
(in Python dictionary format).
In general, as a best practice, you should...
Save the provider parameter as a plugin attribute.
Save the input and output anchors as plugin attributes. You can access these via
provider.get_input_anchor()
andprovider.get_output_anchor()
.
On Input Connection Opened
on_input_connection_opened(self, input_connection)
: This is the code that runs when a data source is connected for the first time (or on updates). Once you run this code, the input connection is able to receive information.
This callback is roughly the equivalent of pi_add_incoming_connection
, ii_init
, and pi_add_outgoing_connection
in the Python Engine SDK. pi_add_incoming_connection
and ii_init
are both handled under-the-hood, whereas on_input_connection_opened
receives the relevant input connection as either an AMPInputConnection or an E1InputConnection.
This method is called exactly once for each input connection. In general, this method is used to...
Set the
input_connection.max_packet_size
attribute, to restrict incoming packet sizes.Set output anchor metadata using
provider.get_output_anchor(anchor_name).open()
.
On Record Packet
on_record_packet(self, input_connection)
: The code that runs during the workflow, whenever a new record packet is ready to be processed.
This callback is roughly the equivalent of pi_push_all_records
, ii_push_record
, and ii_close
. Note that ii_close
(or closing the input connection after the last record is sent) isn’t handled separately in the new AYX Python SDK.
This method is called for each input connection until the final record packet is sent. To retrieve record packets, call input_connection.read()
.
On Complete
on_complete(self)
: This is the code that runs after all of the record packets (for each of the input connections) have been processed. This callback is roughly the equivalent of pi_close
in the Python Engine SDK.
Lifecycle Equivalents: Engine SDK vs Platform SDK
Engine | Platform (AYX Python SDK) |
---|---|
pi_init | __init__(self, provider) |
pi_add_incoming_connection | on_input_connection_opened(self, input_connection) |
ii_init | |
pi_add_outgoing_connection | |
pi_push_all_records | on_record_packet(self, input_connection) |
ii_push_record | |
ii_close | |
pi_close | on_complete(self) |
Other
Engine | Platorm (AYX Python SDK) |
---|---|
xmsg() | io_base.translate_msg() |
update_progress() | io_base.update_progress() |
Unimplemented
Engine | Platform (AYX Python SDK) |
---|---|
build_sort_info |
Implementation Examples
Compare code examples for the Python Engine SDK and the new AYX Python SDK.
Python Engine SDK
""" AyxPlugin (required) has-a IncomingInterface (optional). Although defining IncomingInterface is optional, the interface methods are needed if an upstream tool exists. """ import AlteryxPythonSDK as Sdk import xml.etree.ElementTree as Et import itertools as it class AyxPlugin: """ Implements the plugin interface methods, to be utilized by the Alteryx engine to communicate with a plugin. Prefixed with "pi", the Alteryx engine will expect the below five interface methods to be defined. """ def __init__(self, n_tool_id: int, alteryx_engine: object, output_anchor_mgr: object): """ Constructor is called whenever the Alteryx engine wants to instantiate an instance of this plugin. :param n_tool_id: The assigned unique identification for a tool instance. :param alteryx_engine: Provides an interface into the Alteryx engine. :param output_anchor_mgr: A helper that wraps the outgoing connections for a plugin. """ # Default properties self.n_tool_id = n_tool_id self.alteryx_engine = alteryx_engine self.output_anchor_mgr = output_anchor_mgr # Custom properties self.left_input = None self.right_input = None self.left_prefix = '' self.right_prefix = '' self.output_anchor = None def pi_init(self, str_xml: str): """ Getting the user-entered prefixes from the GUI, and the output anchor from the XML file. Called when the Alteryx engine is ready to provide the tool configuration from the GUI. :param str_xml: The raw XML from the GUI. """ self.left_prefix = Et.fromstring(str_xml).find('LeftPrefix').text self.right_prefix = Et.fromstring(str_xml).find('RightPrefix').text self.output_anchor = self.output_anchor_mgr.get_output_anchor('Output') def pi_add_incoming_connection(self, str_type: str, str_name: str) -> object: """ The IncomingInterface objects are instantiated here, one object per incoming connection. Called when the Alteryx engine is attempting to add an incoming data connection. :param str_type: The name of the input connection anchor, defined in the Config.xml file. :param str_name: The name of the wire, defined by the workflow author. :return: The IncomingInterface object(s). """ if str_type == 'Left': self.left_input = IncomingInterface(self, self.left_prefix) return self.left_input elif str_type == 'Right': self.right_input = IncomingInterface(self, self.right_prefix) return self.right_input else: self.display_error_message('Invalid Input Connection') def pi_add_outgoing_connection(self, str_name: str) -> bool: """ Called when the Alteryx engine is attempting to add an outgoing data connection. :param str_name: The name of the output connection anchor, defined in the Config.xml file. :return: True signifies that the connection is accepted. """ return True def pi_push_all_records(self, n_record_limit: int) -> bool: """ Called when a tool has no incoming data connection. :param n_record_limit: Set it to <0 for no limit, 0 for no records, and >0 to specify the number of records. :return: True for success, False for failure. """ self.display_error_message('Missing Incoming Connection') return False def pi_close(self, b_has_errors: bool): """ Called after all records have been processed. :param b_has_errors: Set to true to not do the final processing. """ self.output_anchor.assert_close() # Checks whether connections were properly closed. def check_input_complete(self): """ A non-interface helper tasked to verify end of processing for both incoming connections. """ if self.right_input is not None and self.left_input is not None: if self.right_input.input_complete and self.left_input.input_complete: self.process_output() else: self.display_error_message('Both left and right inputs must have connections') @staticmethod def init_record_info_out(child: object, record_info_out: object): """ A non-interface helper for process_output() that handles building out the layout for record_info_out. :param child: An incoming connection. :param record_info_out: The outgoing record info object. :return: Updated initialization of record_info_out. """ record_info_out.init_from_xml( child.record_info_in.get_record_xml_meta_data(True), child.rename_prefix + '_' if child.rename_prefix is not None else '' ) return record_info_out @staticmethod def swap_outgoing_order(left_input: object, right_input: object): """ A non-interface helper for process_output() that assigns the mapping order based on number of records. :param left_input: the object from the left incoming connection :param right_input: the object from the right incoming connection :return: New names for the incoming connections. """ min_n_records = min(len(left_input.record_list), len(right_input.record_list)) max_n_records = max(len(left_input.record_list), len(right_input.record_list)) # Having the shortest list be the first to output, so set_dest_to_null is applied only for the first copy,\ # when dealing with an uneven record pair. This swap process will eventually be replaced in subsequent releases. if min_n_records != max_n_records: first_half_output = left_input if min_n_records == len(left_input.record_list) else right_input second_half_output = right_input if first_half_output == left_input else left_input else: first_half_output = left_input second_half_output = right_input return first_half_output, second_half_output @staticmethod def setup_record_copier(child: object, record_info_out: object, start_index: int): """ A non-interface helper for process_output() that maps the appropriate fields to their designated positions. :param child: Incoming connection object. :param record_info_out: The outgoing record layout. :param start_index: The starting field position of an incoming connection object. :return: The starting field position for the next incoming connection object. """ child.record_copier = Sdk.RecordCopier(record_info_out, child.record_info_in) for index in range(child.record_info_in.num_fields): child.record_copier.add(start_index + index, index) child.record_copier.done_adding() return child.record_info_in.num_fields def process_output(self): """ A non-interface method responsible for pushing the records based on the joined record layout. """ # Determining the mapping order based on length of the incoming data streams. first_half_output, second_half_output = self.swap_outgoing_order(self.left_input, self.right_input) # Having the helper initialize the RecordInfo object for the outgoing stream. record_info_out = self.init_record_info_out(first_half_output, Sdk.RecordInfo(self.alteryx_engine)) record_info_out = self.init_record_info_out(second_half_output, record_info_out) self.output_anchor.init(record_info_out) # Lets the downstream tools know of the outgoing record metadata. # Having the helper function handle the field index mapping from both incoming streams, into record_info_out. start_index = self.setup_record_copier(first_half_output, record_info_out, 0) self.setup_record_copier(second_half_output, record_info_out, start_index) record_creator = record_info_out.construct_record_creator() # Creating a new record_creator for the joined records. for input_pair in it.zip_longest(first_half_output.record_list, second_half_output.record_list): # Copying the record into the record creator. NULL values will be used to fill for the difference. if input_pair[0] is not None: first_half_output.record_copier.copy(record_creator, input_pair[0].finalize_record()) else: first_half_output.record_copier.set_dest_to_null(record_creator) second_half_output.record_copier.copy(record_creator, input_pair[1].finalize_record()) # Asking for a record to push downstream, then resetting the record to prevent unexpected results. output_record = record_creator.finalize_record() self.output_anchor.push_record(output_record, False) record_creator.reset() #TODO: The progress update to the downstream tool, based on time elapsed, should go here. self.output_anchor.close() # Close outgoing connections. def process_update_input_progress(self): """ A non-interface helper to update the incoming progress based on records received from the input streams. """ if self.right_input is not None and self.left_input is not None: # We're assuming receiving the input data accounts for half the progress. input_percent = (self.right_input.d_progress_percentage + self.left_input.d_progress_percentage) / 2 self.alteryx_engine.output_tool_progress(self.n_tool_id, input_percent / 2) def display_error_message(self, msg_string: str): """ A non-interface helper function, responsible for outputting error messages. :param msg_string: The error message string. """ self.alteryx_engine.output_message(self.n_tool_id, Sdk.EngineMessageType.error, self.xmsg(msg_string)) def xmsg(self, msg_string: str): """ A non-interface, non-operational placeholder for the eventual localization of predefined user-facing strings. :param msg_string: The user-facing string. :return: msg_string """ return msg_string class IncomingInterface: """ This optional class is returned by pi_add_incoming_connection, and it implements the incoming interface methods, to be utilized by the Alteryx engine to communicate with a plugin when processing an incoming connection. Prefixed with "ii", the Alteryx engine will expect the below four interface methods to be defined. """ def __init__(self, parent: object, rename_prefix: str): """ Constructor for IncomingInterface. :param parent: AyxPlugin :param rename_prefix: The prefix string entered by the user, if any. """ # Default properties self.parent = parent self.rename_prefix = rename_prefix # Custom properties self.input_complete = False self.d_progress_percentage = 0 self.record_info_in = None self.record_copier = None self.record_list = [] def ii_init(self, record_info_in: object) -> bool: """ Although no new records are being added, the prep work here will allow for data state preservation in ii_push_record. Called to report changes of the incoming connection's record metadata to the Alteryx engine. :param record_info_in: A RecordInfo object for the incoming connection's fields. :return: True for success, otherwise False. """ self.record_copier = Sdk.RecordCopier(record_info_in, record_info_in) # Map each column of the input to where we want in the output. for index in range(record_info_in.num_fields): self.record_copier.add(index, index) self.record_copier.done_adding() # A necessary step to let record copier know that field mappings are done. self.record_info_in = record_info_in # For later reference. return True def ii_push_record(self, in_record: object) -> bool: """ Preserving the state of the incoming record data, since the reference to a record dies beyond this point. Called when an input record is being sent to the plugin. :param in_record: The data for the incoming record. :return: False if method calling limit (record_cnt) is hit. """ self.record_list.append(self.record_info_in.construct_record_creator()) self.record_copier.copy(self.record_list[-1], in_record) return True def ii_update_progress(self, d_percent: float): """ Called by the upstream tool to report what percentage of records have been pushed. :param d_percent: Value between 0.0 and 1.0. """ self.d_progress_percentage = d_percent self.parent.process_update_input_progress() def ii_close(self): """ Called when the incoming connection has finished passing all of its records. """ self.input_complete = True self.parent.check_input_complete()
AYX Python SDK
# Copyright (C) 2021 Alteryx, Inc. All rights reserved. # # Licensed under the ALTERYX SDK AND API LICENSE AGREEMENT; # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.alteryx.com/alteryx-sdk-and-api-license-agreement # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Example multiple input anchor tool.""" import pandas as pd from ayx_python_sdk.core import ( InputConnectionBase, Metadata, Plugin, ProviderBase, RecordPacket, register_plugin, ) from ayx_python_sdk.core.exceptions import WorkflowRuntimeError class MultiInputTool(Plugin): """Concrete implementation of an AyxPlugin.""" def __init__(self, provider: ProviderBase) -> None: """Construct a plugin.""" self.provider = provider self.output_anchor = self.provider.get_output_anchor("Output") self.tool_config = self.provider.tool_config self.left_prefix = self.tool_config.get("leftField") self.right_prefix = self.tool_config.get("rightField") self.left_df = None self.right_df = None self.provider.io.info("Plugin initialized.") self.connections_opened = False self.output_df = None self.output_record_packet = None self.output_metadata = Metadata() def on_input_connection_opened(self, input_connection: InputConnectionBase) -> None: """Initialize the Input Connections of this plugin.""" if input_connection.anchor.name == "Input1": for field in input_connection.metadata: col_name = self.left_prefix + "_" + field.name if self.left_prefix else field.name self.output_metadata.add_field(col_name, field.type) if input_connection.anchor.name == "Input2": self.connections_opened = True for field in input_connection.metadata: col_name = self.right_prefix + "_" + field.name if self.right_prefix else field.name self.output_metadata.add_field(col_name, field.type) self.provider.io.info(f"Connection {input_connection.name} Initialized!") if self.connections_opened: self.output_anchor.open(self.output_metadata) input_connection.max_packet_size = 1 def on_record_packet(self, input_connection: InputConnectionBase) -> None: """Handle the record packet received through the input connection.""" self.provider.io.info("Record packet received!") input_df = input_connection.read().to_dataframe() # aggregate all left and right records if input_connection.anchor.name == "Input1": if self.left_df is None: self.left_df = input_df else: self.left_df = self.left_df.append(input_df, ignore_index=True) if input_connection.anchor.name == "Input2": if self.right_df is None: self.right_df = input_df else: self.right_df = self.right_df.append(input_df, ignore_index=True) def on_complete(self) -> None: """Finalize the plugin.""" self.left_df.columns = [self.left_prefix + "_" + col_name for col_name in self.left_df.columns] self.right_df.columns = [self.right_prefix + "_" + col_name for col_name in self.right_df.columns] res = pd.concat([self.left_df, self.right_df], axis=1, join="inner") self.output_record_packet = RecordPacket.from_dataframe(metadata=self.output_metadata, df=res) self.output_anchor.write(self.output_record_packet) self.provider.io.info("Completed processing records.") AyxPlugin = register_plugin(MultiInputTool)