Outdated Version

You are viewing an older version of this section. View current production version.

Streamliner Pipelines min read


A Streamliner pipeline consists of three phases: extract, transform, and load. For the extract and transform phases, the user may select from a series of pre-defined extractors and transformers, or may supply custom ones. For more information on custom extractors and transformers, see Using Python with Streamliner and Using Scala with Streamliner.

Extractors

Extractors define the point of ingest for a pipeline. They specify how batches are constructed and pass an RDD to the Transformer.

Test

The Test Extractor is a special Extractor that reads data input by the user into a text box in the MemSQL Ops web interface. It treats each line as a record, and the collection of all lines as a batch. At the end of the batch interval, the Test Extractor will emit the same batch over again.

Kafka

The Kafka Extractor subscribes to a Kafka topic and extracts messages as records. Note that this Extractor simply outputs the message data in the format it receives from Kakfa. You may need to write a custom extractor to, for instance, deserialize data from a given format.

Custom Extractor

Streamliner allows users to write custom Extractors and upload them to MemSQL Ops.

Transformers

Transformers take an RDD (output by an Extractor), process it, and output a DataFrame. Transformers can serve many purposes including data enrichment, filtering, and aggregation.

JSON Emitter

The JSON Transformer takes an RDD of strings and produces a DataFrame where each record is a JSON object. Note that the incoming strings must be valid JSON, or Streamliner will give an error.

CSV Transformer

The CSV Transformer takes an RDD of strings in CSV format and produces a DataFrame which maps each item in the CSV to a column in the database. Users have the option to define the CSV delimiter that separates each column, the escape key, the character used as a quote for strings, and the string that represents NULL fields.

The CSV Transformer requires a Column Spec in the form of a JSON array with name and column_type pairs defined for each value in the CSV. Each name,column_type pair is applied in the order they are defined to the CSV entry in the order its values appear. For instance, say the CSV entry is 1,"Stephen","Jones". The first name,column_type pair in the JSON Column Spec will be applied to 1, followed by Stephen, then Jones. A logical Column Spec in this case could be:

   [
     { "name": "id", "column_type" : "integer" },
     { "name": "first_name", "column_type" : "string" },
     { "name": "last_name", "column_type" : "string" }
   ]

Users can also define "skip" : true entries to skip over CSV values, not inserting them into MemSQL. For example, this Custom Spec will make the Transformer skip over the last two items of the CSV.

   [
     { "name":"id", "column_type": "integer"},
     { "skip" : true },
     { "skip" : true }
   ]

If no column_type is specified, it is assumed to be String. The full list of possible column_type values are listed in the memsql-spark-connector repository.

Custom Transformer

Streamliner allows users to write custom Transformers and upload them to MemSQL Ops.

Loaders

The Streamliner load phase provides configuration options for writing data to MemSQL. The user provides a database name and a table name as the destination for the load. If either the database or the table does not exist, Streamliner will create it automatically. If both the database and table exist, the DataFrame produced by the Transform phase must match the schema of the table or else Streamliner will produce an error.

The user can also make the load phase a “dry run” by selecting “Enabled” under the Dry Run dropdown. When Dry Run is enabled, Streamliner will perform the extract and transform phases, but will not actually load the data into MemSQL. After starting the pipeline and clicking on a batch, you will see that the Load phase wrote zero records. Using Dry Run in combination with the batch tracing feature allows users to test and debug Extractors and Transformers without actually loading any data.

Users have three options for Table Type - Row, Column and Custom.

Row Loader

Streamliner allows the user to load data into a MemSQL in-memory rowstore table.

Column Loader

Streamliner allows the user to load data into a MemSQL disk-based columnstore table. For more information on the columnstore, see :ref:columnar.

Custom Loader

Streamliner also allows the user to specify a custom load configuration. The configuration is specified using JSON. The settings are the following:

  • duplicate_key_behavior defines behavior for when Streamliner attempts to insert a duplicate unique key. If the value is empty or null, the server will throw an error on duplicate keys. Ignore will do nothing - the row that exists in MemSQL will remain the same. Replace will replace the entire existing row with the new row. Update will trigger the behavior specified in on_duplicate_key_sql.

  • on_duplicate_key_sql defines the update behavior on duplicate key when duplicate_key_behavior is set to Update. The value for on_duplicate_key_sql will be inserted into an INSERT statement in the following way: INSERT INTO ... ON DUPLICATE KEY UPDATE <on_duplicate_key_sql>. For more information see :ref:UPDATE.

  • upsert_batch_size sets the batch size for when duplicate_key_behavior is set to Update.

  • table_extra_columns takes a list of objects that define “extra” columns to include when creating a new table. The objects include the following parameters:

    • name is the column name.
    • col_type is the column type. For more information, see the Datatypes topic.
    • nullable defines whether the column is nullable (takes true or false).
    • default_value specifies a default value for the column using the default_value behavior in CREATE TABLE.
    • persisted is an optional parameter that takes an expression that defines a computed column. The value given for persisted will be inserted into the CREATE TABLE statement as ...[column_name] AS <persisted> PERSISTED [type]....
  • table_keys takes a list of objects that define keys for the table. For more information on types of keys, see [CREATE TABLE] and [Distributed SQL]. The objects contain the following parameters:

    • key_type specifies the key type. Possible values are: Shard, Key, PrimaryKey, UniqueKey, and KeyUsingClusteredColumnStore.
    • column_names takes a list of column names to be included in the key. For example, "column_names": [ "a", "b" ] will add the key on columns a and b.