Transforms
Transforms are currently a preview feature of Pipelines. Transform syntax and behavior may change in future versions.
A transform is a user-defined program that modifies ingested data and outputs the modified data in CSV format. Transforms can be written in any language, but the MemSQL node’s host Linux distribution must have the required dependencies to execute the transform. For example, if you write a transform in Python, the node’s Linux distribution must have Python installed and configured before it can be executed.
In a pipeline’s lifecycle, a transform is referenced by using an http://
or file://
URI when executing a CREATE PIPELINE statement. During pipeline creation, a cluster’s master aggregator distributes the transform executable to each leaf node in the cluster. Each leaf node then executes the transform every time a batch partition is processed. For complete Transform syntax, see the CREATE PIPELINE reference documentation.
When the CREATE PIPELINE
statement is executed, the transform must be accessible at the specified file system or network endpoint. If the transform is unavailable, pipeline creation will fail. An example CREATE PIPELINE
statement with transform is shown below:
CREATE PIPELINE mypipeline AS
LOAD DATA KAFKA '192.168.1.100:9092/my-topic'
WITH TRANSFORM ('http://www.memsql.com/my-transform.tar.gz', 'my-executable.py', '')
INTO TABLE t
Creating a Transform
There are a few important things to consider when creating a transform. First, the host Linux distribution must have the required dependencies to execute the transform. If the necessary dependencies aren’t installed on your node, the transform will fail to run. Also, because transforms must be executable, a shebang should be at the top of your transform file (to ensure the appropriate interpreter is invoked). This will depend on the programming language used in your transform (e.g. #!/usr/bin/env python3
for Python 3 or #!/usr/bin/env ruby
for Ruby).
Depending on your desired language and platform, any virtual machine overhead may greatly reduce a pipeline’s performance. As described in the section above, transforms are executed every time a batch partition is processed, which can be many times per second. Virtual machine overhead will reduce the execution speed of a transform, and thus degrade the performance of the entire pipeline.
One of the most important considerations about creating a transform is the way that data is exchanged from the pipeline’s extractor to the transform. When data is extracted from the source, it’s sent to the transform via the stdin
communication channel. This data is also byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes. Your transform should process the stdin
stream in the following way:
- Read the first eight bytes to determine the byte length of the actual message data that follows.
- Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire
stdin
stream has been read. - If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.
- Continue repeating this process until all data from
stdin
has been read.
The example in the next section shows how to do this process.
Example Kafka Transform
The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it. This transform reads the first eight bytes of stdin
to determine the length of the message, and then simply writes the contents of the message to stdout
.
The code example works for both Python 2 and Python 3.
#!/usr/bin/python
import struct
import sys
binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer
def input_stream():
"""
Consume STDIN and yield each record that is received from MemSQL
"""
while True:
byte_len = binary_stdin.read(8)
if len(byte_len) == 8:
byte_len = struct.unpack("L", byte_len)[0]
result = binary_stdin.read(byte_len)
yield result
else:
assert len(byte_len) == 0, byte_len
return
def log(message):
"""
Log an informational message to stderr which will show up in MemSQL in
the event of transform failure.
"""
binary_stderr.write(message + b"\n")
def emit(message):
"""
Emit a record back to MemSQL by writing it to STDOUT. The record
should be formatted as JSON, Avro, or CSV as it will be parsed by
LOAD DATA.
"""
binary_stdout.write(message + b"\n")
log(b"Begin transform")
# We start the transform here by reading from the input_stream() iterator.
for data in input_stream():
# Since this is an identity transform we just emit what we receive.
emit(data)
log(b"End transform")