Transforms

Alert

Transforms are currently a preview feature of Pipelines. Transform syntax and behavior may change in future versions.

A transform is a user-defined program that modifies ingested data and outputs the modified data in CSV format. Transforms can be written in any language, but the MemSQL node’s host Linux distribution must have the required dependencies to execute the transform. For example, if you write a transform in Python, the node’s Linux distribution must have Python installed and configured before it can be executed.

In a pipeline’s lifecycle, a transform is referenced by using an http:// or file:// URI when executing a CREATE PIPELINE statement. During pipeline creation, a cluster’s master aggregator distributes the transform executable to each leaf node in the cluster. Each leaf node then executes the transform every time a batch partition is processed. For complete Transform syntax, see the CREATE PIPELINE reference documentation.

When the CREATE PIPELINE statement is executed, the transform must be accessible at the specified file system or network endpoint. If the transform is unavailable, pipeline creation will fail. An example CREATE PIPELINE statement with transform is shown below:

CREATE PIPELINE mypipeline AS
LOAD DATA KAFKA '192.168.1.100:9092/my-topic'
WITH TRANSFORM ('http://www.memsql.com/my-transform.tar.gz', 'my-executable.py', '')
INTO TABLE t

Creating a Transform

There are a few important things to consider when creating a transform. First, the host Linux distribution must have the required dependencies to execute the transform. If the necessary dependencies aren’t installed on your node, the transform will fail to run. Also, because transforms must be executable, a shebang should be at the top of your transform file (to ensure the appropriate interpreter is invoked). This will depend on the programming language used in your transform (e.g. #!/usr/bin/env python3 for Python 3 or #!/usr/bin/env ruby for Ruby).

Depending on your desired language and platform, any virtual machine overhead may greatly reduce a pipeline’s performance. As described in the section above, transforms are executed every time a batch partition is processed, which can be many times per second. Virtual machine overhead will reduce the execution speed of a transform, and thus degrade the performance of the entire pipeline.

One of the most important considerations about creating a transform is the way that data is exchanged from the pipeline’s extractor to the transform. When data is extracted from the source, it’s sent to the transform via the stdin communication channel. This data is also byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes. Your transform should process the stdin stream in the following way:

  1. Read the first eight bytes to determine the byte length of the actual message data that follows.
  2. Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire stdin stream has been read.
  3. If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.
  4. Continue repeating this process until all data from stdin has been read.

The example in the next section shows how to do this process.

Example Kafka Transform

The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it. This transform reads the first eight bytes of stdin to determine the length of the message, and then simply writes the contents of the message to stdout.

Info

The code example works for both Python 2 and Python 3.

#!/usr/bin/python

import struct
import sys

binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer

def input_stream():
    """
        Consume STDIN and yield each record that is received from MemSQL
    """
    while True:
        byte_len = binary_stdin.read(8)
        if len(byte_len) == 8:
            byte_len = struct.unpack("L", byte_len)[0]
            result = binary_stdin.read(byte_len)
            yield result
        else:
            assert len(byte_len) == 0, byte_len
            return


def log(message):
    """
        Log an informational message to stderr which will show up in MemSQL in
        the event of transform failure.
    """
    binary_stderr.write(message + b"\n")


def emit(message):
    """
        Emit a record back to MemSQL by writing it to STDOUT.  The record
        should be formatted as JSON, Avro, or CSV as it will be parsed by
        LOAD DATA.
    """
    binary_stdout.write(message + b"\n")

log(b"Begin transform")

# We start the transform here by reading from the input_stream() iterator.
for data in input_stream():
    # Since this is an identity transform we just emit what we receive.
    emit(data)

log(b"End transform")

See Also