Skip to main content

Engine SDK Vs Platform SDK: Key Differences

With the release of the new Platform SDK, it’s a good time to highlight the key differences between the AYX Python SDK and the original Python Engine SDK. This article is also intended to help you understand how to recreate your Python Engine SDK custom tools using the new AYX Python SDK.

Overview

Packaging

The original Python Engine SDK was built around a Python package called AlteryxPythonSDK. This package is available at runtime inside of Alteryx Designer and can be accessed via import.

The new Platform SDK is a standalone Python pip package that you can install via pip install. It doesn't depend on any special libraries that only ship with Designer.

Getting Started

Getting started in the old SDK typically means you need to reference an example plugin, copy the code and file structure, and then modify it to suit your own needs.

The new Platform SDK provides a new command-line interface (CLI) that provides functionality to take care of all of this project setup. Go to the Getting Started Guide to learn more.

Development

In the original Platform SDK, the backend of a tool was developed via a class definition that satisfied the interface described in AyxPlugin Python Class.

This class implements certain methods like pi_init and pi_add_incoming_connection(among others). This paradigm leads to lots of boilerplate code and makes plugin development a burden. This typically leads the "meat" of the plugin to be only a few lines of Python, whereas the overall tool definition is hundreds of lines.

The new SDK has alleviated this problem by simplifying the interface that must be satisfied, to a bare minimum set of requirements.

Similar to the old SDK, in the new SDK, you must write a PluginV2 class. In the new SDK, a base class definition of PluginV2 is defined to be used as a parent. This gives you a level of comfort that you have implemented all necessary methods to have a valid Alteryx Designer Plugin. Only 4 methods need to be implemented at most with the new PluginV2 class.

Additionally, in the old SDK, a class called IncomingInterface was required. This requirement has been removed in the new SDK, as incoming interfaces and connections are handled behind the scenes by the SDK, and made available via the new ProviderV2 concept.

Deployment

Once plugin development is complete, it's often distributed via the YXI file format. You can find the instructions for packaging a YXI in the old SDK on the Package a Tool page. However, this packaging process has been significantly simplified by the new CLI, described in the Getting Started Guide.

AYX Python SDK Callback Definitions

Init

__init__(self, provider): This is the plugin’s constructor.

This callback is roughly the equivalent of the Python Engine SDK’s pi_init function. While pi_init takes in a str_xml parameter (the tool config), in the new AYX Python SDK you can access this information via provider.tool_config (in Python dictionary format).

In general, as a best practice, you should...

  • Save the provider parameter as a plugin attribute.

  • Save the input and output anchors as plugin attributes. You can access these via provider.get_input_anchor() and provider.get_output_anchor().

On Input Connection Opened

on_input_connection_opened(self, input_connection): This is the code that runs when a data source is connected for the first time (or on updates). Once you run this code, the input connection is able to receive information.

This callback is roughly the equivalent of pi_add_incoming_connection, ii_init, and pi_add_outgoing_connection in the Python Engine SDK. pi_add_incoming_connection and ii_init are both handled under-the-hood, whereas on_input_connection_opened receives the relevant input connection as either an AMPInputConnection or an E1InputConnection.

This method is called exactly once for each input connection. In general, this method is used to...

  • Set the input_connection.max_packet_size attribute, to restrict incoming packet sizes.

  • Set output anchor metadata using provider.get_output_anchor(anchor_name).open().

On Record Packet

on_record_packet(self, input_connection): The code that runs during the workflow, whenever a new record packet is ready to be processed.

This callback is roughly the equivalent of pi_push_all_records, ii_push_record, and ii_close. Note that ii_close (or closing the input connection after the last record is sent) isn’t handled separately in the new AYX Python SDK.

This method is called for each input connection until the final record packet is sent. To retrieve record packets, call input_connection.read().

On Complete

on_complete(self): This is the code that runs after all of the record packets (for each of the input connections) have been processed. This callback is roughly the equivalent of pi_close in the Python Engine SDK.

Lifecycle Equivalents: Engine SDK vs Platform SDK

Engine

Platform (AYX Python SDK)

pi_init

__init__(self, provider)

pi_add_incoming_connection

on_input_connection_opened(self, input_connection)

ii_init

pi_add_outgoing_connection

pi_push_all_records

on_record_packet(self, input_connection)

ii_push_record

ii_close

pi_close

on_complete(self)

Other

Engine

Platorm (AYX Python SDK)

xmsg()

io_base.translate_msg()

update_progress()

io_base.update_progress()

Unimplemented

Engine

Platform (AYX Python SDK)

build_sort_info

Implementation Examples

Compare code examples for the Python Engine SDK and the new AYX Python SDK.

Python Engine SDK

"""
AyxPlugin (required) has-a IncomingInterface (optional).
Although defining IncomingInterface is optional, the interface methods are needed if an upstream tool exists.
"""

import AlteryxPythonSDK as Sdk
import xml.etree.ElementTree as Et
import itertools as it


class AyxPlugin:
    """
    Implements the plugin interface methods, to be utilized by the Alteryx engine to communicate with a plugin.
    Prefixed with "pi", the Alteryx engine will expect the below five interface methods to be defined.
    """

    def __init__(self, n_tool_id: int, alteryx_engine: object, output_anchor_mgr: object):
        """
        Constructor is called whenever the Alteryx engine wants to instantiate an instance of this plugin.
        :param n_tool_id: The assigned unique identification for a tool instance.
        :param alteryx_engine: Provides an interface into the Alteryx engine.
        :param output_anchor_mgr: A helper that wraps the outgoing connections for a plugin.
        """

        # Default properties
        self.n_tool_id = n_tool_id
        self.alteryx_engine = alteryx_engine
        self.output_anchor_mgr = output_anchor_mgr

        # Custom properties
        self.left_input = None
        self.right_input = None
        self.left_prefix = ''
        self.right_prefix = ''
        self.output_anchor = None

    def pi_init(self, str_xml: str):
        """
        Getting the user-entered prefixes from the GUI, and the output anchor from the XML file.
        Called when the Alteryx engine is ready to provide the tool configuration from the GUI.
        :param str_xml: The raw XML from the GUI.
        """

        self.left_prefix = Et.fromstring(str_xml).find('LeftPrefix').text
        self.right_prefix = Et.fromstring(str_xml).find('RightPrefix').text
        self.output_anchor = self.output_anchor_mgr.get_output_anchor('Output')

    def pi_add_incoming_connection(self, str_type: str, str_name: str) -> object:
        """
        The IncomingInterface objects are instantiated here, one object per incoming connection.
        Called when the Alteryx engine is attempting to add an incoming data connection.
        :param str_type: The name of the input connection anchor, defined in the Config.xml file.
        :param str_name: The name of the wire, defined by the workflow author.
        :return: The IncomingInterface object(s).
        """

        if str_type == 'Left':
            self.left_input = IncomingInterface(self, self.left_prefix)
            return self.left_input
        elif str_type == 'Right':
            self.right_input = IncomingInterface(self, self.right_prefix)
            return self.right_input
        else:
            self.display_error_message('Invalid Input Connection')

    def pi_add_outgoing_connection(self, str_name: str) -> bool:
        """
        Called when the Alteryx engine is attempting to add an outgoing data connection.
        :param str_name: The name of the output connection anchor, defined in the Config.xml file.
        :return: True signifies that the connection is accepted.
        """

        return True

    def pi_push_all_records(self, n_record_limit: int) -> bool:
        """
        Called when a tool has no incoming data connection.
        :param n_record_limit: Set it to <0 for no limit, 0 for no records, and >0 to specify the number of records.
        :return: True for success, False for failure.
        """

        self.display_error_message('Missing Incoming Connection')
        return False

    def pi_close(self, b_has_errors: bool):
        """
        Called after all records have been processed.
        :param b_has_errors: Set to true to not do the final processing.
        """

        self.output_anchor.assert_close()  # Checks whether connections were properly closed.

    def check_input_complete(self):
        """
        A non-interface helper tasked to verify end of processing for both incoming connections.
        """

        if self.right_input is not None and self.left_input is not None:
            if self.right_input.input_complete and self.left_input.input_complete:
                self.process_output()
        else:
            self.display_error_message('Both left and right inputs must have connections')

    @staticmethod
    def init_record_info_out(child: object, record_info_out: object):
        """
        A non-interface helper for process_output() that handles building out the layout for record_info_out.
        :param child: An incoming connection.
        :param record_info_out: The outgoing record info object.
        :return: Updated initialization of record_info_out.
        """

        record_info_out.init_from_xml(
            child.record_info_in.get_record_xml_meta_data(True),
            child.rename_prefix + '_' if child.rename_prefix is not None else ''
        )
        return record_info_out

    @staticmethod
    def swap_outgoing_order(left_input: object, right_input: object):
        """
        A non-interface helper for process_output() that assigns the mapping order based on number of records.
        :param left_input: the object from the left incoming connection
        :param right_input: the object from the right incoming connection
        :return: New names for the incoming connections.
        """

        min_n_records = min(len(left_input.record_list), len(right_input.record_list))
        max_n_records = max(len(left_input.record_list), len(right_input.record_list))

        # Having the shortest list be the first to output, so set_dest_to_null is applied only for the first copy,\
        # when dealing with an uneven record pair. This swap process will eventually be replaced in subsequent releases.
        if min_n_records != max_n_records:
            first_half_output = left_input if min_n_records == len(left_input.record_list) else right_input
            second_half_output = right_input if first_half_output == left_input else left_input
        else:
            first_half_output = left_input
            second_half_output = right_input
        return first_half_output, second_half_output

    @staticmethod
    def setup_record_copier(child: object, record_info_out: object, start_index: int):
        """
        A non-interface helper for process_output() that maps the appropriate fields to their designated positions.
        :param child: Incoming connection object.
        :param record_info_out: The outgoing record layout.
        :param start_index: The starting field position of an incoming connection object.
        :return: The starting field position for the next incoming connection object.
        """

        child.record_copier = Sdk.RecordCopier(record_info_out, child.record_info_in)
        for index in range(child.record_info_in.num_fields):
            child.record_copier.add(start_index + index, index)
        child.record_copier.done_adding()
        return child.record_info_in.num_fields

    def process_output(self):
        """
        A non-interface method responsible for pushing the records based on the joined record layout.
        """

        # Determining the mapping order based on length of the incoming data streams.
        first_half_output, second_half_output = self.swap_outgoing_order(self.left_input, self.right_input)

        # Having the helper initialize the RecordInfo object for the outgoing stream.
        record_info_out = self.init_record_info_out(first_half_output, Sdk.RecordInfo(self.alteryx_engine))
        record_info_out = self.init_record_info_out(second_half_output, record_info_out)

        self.output_anchor.init(record_info_out)  # Lets the downstream tools know of the outgoing record metadata.

        # Having the helper function handle the field index mapping from both incoming streams, into record_info_out.
        start_index = self.setup_record_copier(first_half_output, record_info_out, 0)
        self.setup_record_copier(second_half_output, record_info_out, start_index)

        record_creator = record_info_out.construct_record_creator()  # Creating a new record_creator for the joined records.

        for input_pair in it.zip_longest(first_half_output.record_list, second_half_output.record_list):

            # Copying the record into the record creator. NULL values will be used to fill for the difference.
            if input_pair[0] is not None:
                first_half_output.record_copier.copy(record_creator, input_pair[0].finalize_record())
            else:
                first_half_output.record_copier.set_dest_to_null(record_creator)
            second_half_output.record_copier.copy(record_creator, input_pair[1].finalize_record())

            # Asking for a record to push downstream, then resetting the record to prevent unexpected results.
            output_record = record_creator.finalize_record()
            self.output_anchor.push_record(output_record, False)
            record_creator.reset()

            #TODO: The progress update to the downstream tool, based on time elapsed, should go here.

        self.output_anchor.close()  # Close outgoing connections.

    def process_update_input_progress(self):
        """
        A non-interface helper to update the incoming progress based on records received from the input streams.
        """

        if self.right_input is not None and self.left_input is not None:
            # We're assuming receiving the input data accounts for half the progress.
            input_percent = (self.right_input.d_progress_percentage + self.left_input.d_progress_percentage) / 2
            self.alteryx_engine.output_tool_progress(self.n_tool_id, input_percent / 2)

    def display_error_message(self, msg_string: str):
        """
        A non-interface helper function, responsible for outputting error messages.
        :param msg_string: The error message string.
        """

        self.alteryx_engine.output_message(self.n_tool_id, Sdk.EngineMessageType.error, self.xmsg(msg_string))

    def xmsg(self, msg_string: str):
        """
        A non-interface, non-operational placeholder for the eventual localization of predefined user-facing strings.
        :param msg_string: The user-facing string.
        :return: msg_string
        """

        return msg_string


class IncomingInterface:
    """
    This optional class is returned by pi_add_incoming_connection, and it implements the incoming interface methods, to
    be utilized by the Alteryx engine to communicate with a plugin when processing an incoming connection.
    Prefixed with "ii", the Alteryx engine will expect the below four interface methods to be defined.
    """

    def __init__(self, parent: object, rename_prefix: str):
        """
        Constructor for IncomingInterface.
        :param parent: AyxPlugin
        :param rename_prefix: The prefix string entered by the user, if any.
        """

        # Default properties
        self.parent = parent
        self.rename_prefix = rename_prefix

        # Custom properties
        self.input_complete = False
        self.d_progress_percentage = 0
        self.record_info_in = None
        self.record_copier = None
        self.record_list = []

    def ii_init(self, record_info_in: object) -> bool:
        """
        Although no new records are being added, the prep work here will allow for data state preservation in ii_push_record.
        Called to report changes of the incoming connection's record metadata to the Alteryx engine.
        :param record_info_in: A RecordInfo object for the incoming connection's fields.
        :return: True for success, otherwise False.
        """

        self.record_copier = Sdk.RecordCopier(record_info_in, record_info_in)

        # Map each column of the input to where we want in the output.
        for index in range(record_info_in.num_fields):
            self.record_copier.add(index, index)

        self.record_copier.done_adding()  # A necessary step to let record copier know that field mappings are done.
        self.record_info_in = record_info_in  # For later reference.
        return True

    def ii_push_record(self, in_record: object) -> bool:
        """
        Preserving the state of the incoming record data, since the reference to a record dies beyond this point.
        Called when an input record is being sent to the plugin.
        :param in_record: The data for the incoming record.
        :return: False if method calling limit (record_cnt) is hit.
        """

        self.record_list.append(self.record_info_in.construct_record_creator())
        self.record_copier.copy(self.record_list[-1], in_record)
        return True

    def ii_update_progress(self, d_percent: float):
        """
        Called by the upstream tool to report what percentage of records have been pushed.
        :param d_percent: Value between 0.0 and 1.0.
        """

        self.d_progress_percentage = d_percent
        self.parent.process_update_input_progress()

    def ii_close(self):
        """
        Called when the incoming connection has finished passing all of its records.
        """

        self.input_complete = True
        self.parent.check_input_complete()

AYX Python SDK

# Copyright (C) 2021 Alteryx, Inc. All rights reserved.
#
# Licensed under the ALTERYX SDK AND API LICENSE AGREEMENT;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    https://www.alteryx.com/alteryx-sdk-and-api-license-agreement
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example multiple input anchor tool."""
import pandas as pd
from ayx_python_sdk.core import (
    InputConnectionBase,
    Metadata,
    Plugin,
    ProviderBase,
    RecordPacket,
    register_plugin,
)
from ayx_python_sdk.core.exceptions import WorkflowRuntimeError


class MultiInputTool(Plugin):
    """Concrete implementation of an AyxPlugin."""

    def __init__(self, provider: ProviderBase) -> None:
        """Construct a plugin."""
        self.provider = provider
        self.output_anchor = self.provider.get_output_anchor("Output")
        self.tool_config = self.provider.tool_config
        self.left_prefix = self.tool_config.get("leftField")
        self.right_prefix = self.tool_config.get("rightField")
        self.left_df = None
        self.right_df = None
        self.provider.io.info("Plugin initialized.")
        self.connections_opened = False
        self.output_df = None
        self.output_record_packet = None
        self.output_metadata = Metadata()

    def on_input_connection_opened(self, input_connection: InputConnectionBase) -> None:
        """Initialize the Input Connections of this plugin."""

        if input_connection.anchor.name == "Input1":
            for field in input_connection.metadata:
                col_name = self.left_prefix + "_" + field.name if self.left_prefix else field.name
                self.output_metadata.add_field(col_name, field.type)

        if input_connection.anchor.name == "Input2":
            self.connections_opened = True
            for field in input_connection.metadata:
                col_name = self.right_prefix + "_" + field.name if self.right_prefix else field.name
                self.output_metadata.add_field(col_name, field.type)

        self.provider.io.info(f"Connection {input_connection.name} Initialized!")
        if self.connections_opened:
            self.output_anchor.open(self.output_metadata)
        input_connection.max_packet_size = 1

    def on_record_packet(self, input_connection: InputConnectionBase) -> None:
        """Handle the record packet received through the input connection."""
        self.provider.io.info("Record packet received!")
        input_df = input_connection.read().to_dataframe()
        # aggregate all left and right records
        if input_connection.anchor.name == "Input1":
            if self.left_df is None:
                self.left_df = input_df
            else:
                self.left_df = self.left_df.append(input_df, ignore_index=True)
        if input_connection.anchor.name == "Input2":
            if self.right_df is None:
                self.right_df = input_df
            else:
                self.right_df = self.right_df.append(input_df, ignore_index=True)

    def on_complete(self) -> None:
        """Finalize the plugin."""
        self.left_df.columns = [self.left_prefix + "_" + col_name for col_name in self.left_df.columns]
        self.right_df.columns = [self.right_prefix + "_" + col_name for col_name in self.right_df.columns]

        res = pd.concat([self.left_df, self.right_df], axis=1, join="inner")
        self.output_record_packet = RecordPacket.from_dataframe(metadata=self.output_metadata, df=res)

        self.output_anchor.write(self.output_record_packet)
        self.provider.io.info("Completed processing records.")


AyxPlugin = register_plugin(MultiInputTool)