Streamliner allows users to deploy custom extractors and transformers by writing Python directing in the MemSQL Ops web interface. While there are certain methods the user must implement, note that you can write arbitrary Python programs for these components including external libraries. For more information on working with external Python packages, read the Working with External Python Packages section here.
Python Custom Extractor
To implement a custom Python Extractor, import and extend the Extractor
class.
from pystreamliner.api import Extractor
class MyCustomExtractor(Extractor):
# class body
The Extractor
class contains three methods: initialize
, cleanup
, and
next
. Extractor
requires the user to override next
, but
initialize
and cleanup
are optional. These methods’ signatures are as
follow:
def initialize(self, spark_context, sql_context, batch_interval, logger):
# no return statement
def cleanup(self, spark_context, sql_context, batch_interval, logger):
# no return statement
def next(self, spark_context, sql_context, batch_interval, logger):
# method body
return MyDataFrame
next
defines how Streamliner produces batches of records. The method must
return a DataFrame
. Visit the PySpark API documentation
for more information on Spark DataFrames.
initialize
is called when starting a pipeline and is meant for tasks like
creating connections to external data sources. cleanup
is called when
stopping a pipeline and is meant for tasks like closing connections to external
data sources.
Python Custom Transformer
To implement a custom Python Transformer, import and extend the Transformer
class.
from pystreamliner.api import Transformer
class MyCustomExtractor(Transformer):
# class body
The Transformer
class contains three methods: initialize
, cleanup
, and
transform
. Transformer
requires the user to override transform
, but
initialize
and cleanup
are optional. These methods’ signatures are as
follow:
def initialize(self, sql_context, logger):
# no return statement
def cleanup(self, sql_context, logger):
# no return statement
def transform(self, sql_context, df, logger):
# method body
return TransformedDataFrame
transform
defines how Streamliner operates on a batch of record. The method must
return a DataFrame
. Visit the PySpark API documentation
for more information on Spark DataFrames.
initialize
is called when starting a pipeline and is meant for tasks like
creating connections to external data sources. cleanup
is called when
stopping a pipeline and is meant for tasks like closing connections to external
data sources.
Normally transformers operate at the dataframe level, but in advanced use cases you may need access to the underlying Spark context. This can be accessed with sql_context._sc
.
Working with External Python Packages
While Streamliner requires the user to implement particular classes and methods,
these are the only restrictions on Extractors and Transformers. In fact, users
can write arbitrary Python and even include external Python packages. Users can
install Python packages to be used in an Extractor or Transformer through MemSQL
Ops. MemSQL Ops provides a utility for installing Python packages cluster-wide,
which wraps the pip
the Python package manager.
memsql-ops pip install [package name]
To install Python packages that rely on C or FORTRAN libraries (numpy, for example), the user must ensure that dependencies are installed on each node in the cluster.