Outdated Version

You are viewing an older version of this section. View current production version.

Using Python with Streamliner min read


Warning

MemSQL Streamliner will be deprecated in MemSQL 6.0. For current Streamliner users, we recommend migrating to MemSQL Pipelines instead. MemSQL Pipelines provides increased stability, improved ingest performance, and exactly-once semantics. For more information about Streamliner deprecation, see the 5.8 Release Notes. For more information about Pipelines, see the MemSQL Pipelines documentation.

Streamliner allows users to deploy custom extractors and transformers by writing Python directing in the MemSQL Ops web interface. While there are certain methods the user must implement, note that you can write arbitrary Python programs for these components including external libraries. For more information on working with external Python packages, read the Working with External Python Packages section here.

Python Custom Extractor

To implement a custom Python Extractor, import and extend the Extractor class.

from pystreamliner.api import Extractor

class MyCustomExtractor(Extractor):
  # class body

The Extractor class contains three methods: initialize, cleanup, and next. Extractor requires the user to override next, but initialize and cleanup are optional. These methods’ signatures are as follow:

def initialize(self, spark_context, sql_context, batch_interval, logger):
    # no return statement

def cleanup(self, spark_context, sql_context, batch_interval, logger):
    # no return statement

def next(self, spark_context, sql_context, batch_interval, logger):
    # method body
    return MyDataFrame

next defines how Streamliner produces batches of records. The method must return a DataFrame. Visit the PySpark API documentation for more information on Spark DataFrames.

initialize is called when starting a pipeline and is meant for tasks like creating connections to external data sources. cleanup is called when stopping a pipeline and is meant for tasks like closing connections to external data sources.

Python Custom Transformer

To implement a custom Python Transformer, import and extend the Transformer class.

from pystreamliner.api import Transformer 

class MyCustomExtractor(Transformer):
   # class body

The Transformer class contains three methods: initialize, cleanup, and transform. Transformer requires the user to override transform, but initialize and cleanup are optional. These methods’ signatures are as follow:

def initialize(self, sql_context, logger):
    # no return statement

def cleanup(self, sql_context, logger):
    # no return statement

def transform(self, sql_context, df, logger):
    # method body
    return TransformedDataFrame

transform defines how Streamliner operates on a batch of record. The method must return a DataFrame. Visit the PySpark API documentation for more information on Spark DataFrames.

initialize is called when starting a pipeline and is meant for tasks like creating connections to external data sources. cleanup is called when stopping a pipeline and is meant for tasks like closing connections to external data sources.

Info

Normally transformers operate at the dataframe level, but in advanced use cases you may need access to the underlying Spark context. This can be accessed with sql_context._sc.

Working with External Python Packages

While Streamliner requires the user to implement particular classes and methods, these are the only restrictions on Extractors and Transformers. In fact, users can write arbitrary Python and even include external Python packages. Users can install Python packages to be used in an Extractor or Transformer through MemSQL Ops. MemSQL Ops provides a utility for installing Python packages cluster-wide, which wraps the pip the Python package manager.

memsql-ops pip install [package name]

To install Python packages that rely on C or FORTRAN libraries (numpy, for example), the user must ensure that dependencies are installed on each node in the cluster.