A Streamliner pipeline consists of three phases: extract, transform, and load. For the extract and transform phases, the user may select from a series of pre-defined extractors and transformers, or may supply custom ones. For more information on custom extractors and transformers, see Using Python with Streamliner and Using Scala with Streamliner.
Extractors
Extractors define the point of ingest for a pipeline. They specify how batches are constructed and pass an RDD to the Transformer.
Test
The Test Extractor is a special Extractor that reads data input by the user into a text box in the MemSQL Ops web interface. It treats each line as a record, and the collection of all lines as a batch. At the end of the batch interval, the Test Extractor will emit the same batch over again.
Kafka
The Kafka Extractor subscribes to a Kafka topic and extracts messages as records. Note that this Extractor simply outputs the message data in the format it receives from Kakfa. You may need to write a custom extractor to, for instance, deserialize data from a given format.
Custom Extractor
Streamliner allows users to write custom Extractors and upload them to MemSQL Ops.
Transformers
Transformers take an RDD (output by an Extractor), process it, and output a DataFrame. Transformers can serve many purposes including data enrichment, filtering, and aggregation.
JSON Emitter
The JSON Transformer takes an RDD of strings and produces a DataFrame where each record is a JSON object. Note that the incoming strings must be valid JSON, or Streamliner will give an error.
CSV Transformer
The CSV Transformer takes an RDD of strings in CSV format and produces a DataFrame which maps each item in the CSV to a column in the database. Users have the option to define the CSV delimiter that separates each column, the escape key, the character used as a quote for strings, and the string that represents NULL fields.
The CSV Transformer requires a Column Spec in the form of a JSON array with name and column_type pairs defined for each value in the CSV. Each name,column_type
pair is applied in the order they are defined to the CSV entry in the order its values appear. For instance, say the CSV entry is 1,"Stephen","Jones"
. The first name,column_type
pair in the JSON Column Spec will be applied to 1
, followed by Stephen
, then Jones
. A logical Column Spec in this case could be:
[
{ "name": "id", "column_type" : "integer" },
{ "name": "first_name", "column_type" : "string" },
{ "name": "last_name", "column_type" : "string" }
]
Users can also define "skip" : true
entries to skip over CSV values, not inserting them into MemSQL. For example, this Custom Spec will make the Transformer skip over the last two items of the CSV.
[
{ "name":"id", "column_type": "integer"},
{ "skip" : true },
{ "skip" : true }
]
If no column_type
is specified, it is assumed to be String
. The full list of possible column_type
values are listed in the memsql-spark-connector repository.
Custom Transformer
Streamliner allows users to write custom Transformers and upload them to MemSQL Ops.
Loaders
The Streamliner load phase provides configuration options for writing data to MemSQL. The user provides a database name and a table name as the destination for the load. If either the database or the table does not exist, Streamliner will create it automatically. If both the database and table exist, the DataFrame produced by the Transform phase must match the schema of the table or else Streamliner will produce an error.
The user can also make the load phase a “dry run” by selecting “Enabled” under the Dry Run dropdown. When Dry Run is enabled, Streamliner will perform the extract and transform phases, but will not actually load the data into MemSQL. After starting the pipeline and clicking on a batch, you will see that the Load phase wrote zero records. Using Dry Run in combination with the batch tracing feature allows users to test and debug Extractors and Transformers without actually loading any data.
Users have three options for Table Type - Row, Column and Custom.
Row Loader
Streamliner allows the user to load data into a MemSQL in-memory rowstore table.
Column Loader
Streamliner allows the user to load data into a MemSQL disk-based columnstore table. For more information on the columnstore, see :ref:columnar
.
Custom Loader
Streamliner also allows the user to specify a custom load configuration. The configuration is specified using JSON. The settings are the following:
-
duplicate_key_behavior
defines behavior for when Streamliner attempts to insert a duplicate unique key. If the value is empty ornull
, the server will throw an error on duplicate keys.Ignore
will do nothing - the row that exists in MemSQL will remain the same.Replace
will replace the entire existing row with the new row.Update
will trigger the behavior specified inon_duplicate_key_sql
. -
on_duplicate_key_sql
defines the update behavior on duplicate key whenduplicate_key_behavior
is set toUpdate
. The value foron_duplicate_key_sql
will be inserted into anINSERT
statement in the following way:INSERT INTO ... ON DUPLICATE KEY UPDATE <on_duplicate_key_sql>
. For more information see :ref:UPDATE
. -
upsert_batch_size
sets the batch size for whenduplicate_key_behavior
is set toUpdate
. -
table_extra_columns
takes a list of objects that define “extra” columns to include when creating a new table. The objects include the following parameters:name
is the column name.col_type
is the column type. For more information, see :ref:data_types
.nullable
defines whether the column is nullable (takestrue
orfalse
).default_value
specifies a default value for the column using thedefault_value
behavior inCREATE TABLE
.persisted
is an optional parameter that takes an expression that defines a computed column. The value given forpersisted
will be inserted into theCREATE TABLE
statement as...[column_name] AS <persisted> PERSISTED [type]...
.
-
table_keys
takes a list of objects that define keys for the table. For more information on types of keys, see [CREATE TABLE] and [Distributed SQL]. The objects contain the following parameters:key_type
specifies the key type. Possible values are:Shard
,Key
,PrimaryKey
,UniqueKey
, andKeyUsingClusteredColumnStore
.column_names
takes a list of column names to be included in the key. For example,"column_names": [ "a", "b" ]
will add the key on columnsa
andb
.