Outdated Version

You are viewing an older version of this section. View current production version.

CREATE PIPELINE

Create a new Pipeline to continuously extract, transform, and then load data into a table or stored procedure.

Syntax

CREATE [OR REPLACE] [AGGREGATOR] PIPELINE [IF NOT EXISTS] pipeline_name AS
  LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration
            | azure_blob_configuration | hdfs_configuration | gcs_configuration}
    [BATCH_INTERVAL milliseconds]
    [MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]
    [RESOURCE POOL pool_name]
    [(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION]
    [WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ]
  [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY | PARSER } ERRORS]
  { INTO TABLE table_name | INTO PROCEDURE proc_name }
  { json_format_options | avro_format_options | parquet_format_options | csv_format_options }
  [ (column_name, ... ) ]
  [SET  col_name = expr,... | pipeline_source_file()]
  [WHERE expr,...]
  [ON DUPLICATE KEY UPDATE column_name = expression, [...]]

  kafka_configuration:
    KAFKA 'kafka_topic_endpoint'

  s3_configuration:
    S3 { 'bucket-name' | 'bucket-name/object-name' | 'bucket-name/prefix/object-name' }
      [CONFIG 'configuration_json']
      CREDENTIALS 'credentials_json'

  filesystem_configuration:
    FS 'path'
      [CONFIG 'configuration_json']

  azure_blob_configuration:
    AZURE { 'container-name' | 'container-name/object-name' | 'container-name/prefix/object-name' }
      CREDENTIALS 'credentials_json'
      [CONFIG 'configuration_json']

  hdfs_configuration:
    HDFS 'hdfs://<namenode DNS | IP address>:<port>/<directory>'
      [CONFIG 'configuration_json']

  gcs_configuration:
   GCS { 'bucket-name' | 'bucket-name/object-name' | 'bucket-name/prefix/object-name' }
     CREDENTIALS 'credentials_json'
     [CONFIG 'configuration_json']

  json_format_options:
    FORMAT JSON
    ( {col_name | @variable_name} <- subvalue_path [DEFAULT literal_expr], ...)

  avro_format_options:
    FORMAT AVRO SCHEMA REGISTRY {<IP address> | <hostname>}:<port>
    ( {col_name | @variable_name} <- subvalue_path, ...)
    [SCHEMA 'avro_schema']

  subvalue_path:
    {% | [%::]ident [::ident ...]}

  parquet_format_options:
    FORMAT PARQUET parquet_subvalue_mapping [TIMEZONE 'time_zone_name']

  parquet_subvalue_mapping:
    ( {col_name | @variable_name} <- subvalue_path, ...)

  parquet_subvalue_path:
    {ident [::ident ...]}

  csv_format_options:
    [FORMAT CSV]
    [{FIELDS | COLUMNS}
     TERMINATED BY 'string'
       [[OPTIONALLY] ENCLOSED BY 'char']
       [ESCAPED BY 'char']
    ]
    [LINES
      [STARTING BY 'string']
      [TERMINATED BY 'string']
    ]
    [IGNORE number LINES]
    [ ({col_name | @variable_name}, ...) ]

Remarks

  • If the OR REPLACE clause is provided and a pipeline with pipeline_name already exists, then the CREATE query will alter that pipeline to match the new definition. Its state, including its cursor positions, will be preserved by the CREATE.
  • SKIP CONSTRAINT ERRORS and SKIP DUPLICATE KEY ERRORS are unsupported with pipelines into stored procedures.
  • IGNORE, SKIP PARSER ERRORS, and SKIP ALL ERRORS are unsupported with non-CSV pipelines.
  • REPLACE, SKIP CONSTRAINT ERRORS, and SKIP DUPLICATE KEY ERRORS are supported with non-CSV pipelines.

Kafka Pipeline Syntax

The following example statement demonstrate how to create a Kafka pipeline using the minimum required syntax:

Minimum Required Kafka Pipeline Syntax:

CREATE PIPELINE mypipeline AS
LOAD DATA KAFKA '127.0.0.1/my-topic'
INTO TABLE `my_table`;

START PIPELINE mypipeline;

This statement creates a new pipeline named my_pipeline, uses a Kafka cluster as the data source, points to the location of the my-topic topic at the Kafka cluster’s endpoint, and will start ingesting data into my_table. For more information about Kafka Pipelines, see Kafka Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

Info

Unless otherwise specified, Kafka records may be processed out of order if the cluster is 1.5 * pipelines_max_offsets_per_batch_partition behind in a single Kafka partition; however, records will be committed in order in all cases. This is an optimization specific to Kafka pipelines and is enabled by default. If you require the records to be processed in order (e.g. in upsert scenarios), create your pipeline with DISABLE OUT_OF_ORDER OPTIMIZATION specified.

Compressed topics are supported without requiring any additional configuration in SingleStore DB.

S3 Pipeline Syntax

The following example statement demonstrate how to create an S3 pipeline using the minimum required syntax:

Minimum Required S3 Pipeline Syntax:

CREATE PIPELINE mypipeline AS
LOAD DATA S3 'my-bucket-name'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
INTO TABLE `my_table`;

START PIPELINE mypipeline;

This statement creates a new pipeline named my_pipeline, uses an S3 bucket named my-bucket-name as the data source, and will start ingesting the bucket’s objects into my_table. Credentials for your S3 bucket can either be in the form of an AWS access key or an Amazon Resource Name (ARN).

No CONFIG clause is required to create an S3 pipeline. This clause is used to specify the Amazon S3 region where the source bucket is located. If no CONFIG clause is specified, SingleStore DB will automatically use the us-east-1 region, also known as US Standard in the Amazon S3 console. To specify a different region, such as us-west-1, include a CONFIG clause as shown in the example below. The CONFIG clause can also be used to specify the suffixes for files to load or to specify the disable_gunzip condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE only loads files that have the specified suffix. Suffixes in the CONFIG clause can be specified without a . before them, for example, CONFIG '{"suffixes": ["csv"]}'. When enabled, the disable_gunzip option disables the decompression of files with the .gz extension. If this option is disabled or missing, files with the .gz extension will be decompressed.

For more information about S3 Pipelines, see S3 Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

S3 Pipeline Using Specified Region

CREATE PIPELINE mypipeline AS
LOAD DATA S3 'my-bucket-name'
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
INTO TABLE `my_table`;

Azure Blob Pipeline Syntax

The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.

Minimum Required Azure Pipeline Syntax:

CREATE PIPELINE mypipeline AS
LOAD DATA AZURE 'my-container-name'
CREDENTIALS '{"account_name": "my_account_name", "account_key": "my_account_key"}'
INTO TABLE `my_table`;

START PIPELINE mypipeline;

This statement creates a new pipeline named my_pipeline, uses an Azure Blob container named my-container-name as the data source, and will start ingesting the bucket’s objects into my_table. For more information about Azure Pipelines, see Azure Blob Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

Note that no CONFIG clause is required to create an Azure pipeline unless you need to specify the suffixes for files to load or the disable_gunzip condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE only loads files that have the specified suffix. Suffixes in the CONFIG clause can be specified without a . before them, for example, CONFIG '{"suffixes": ["csv"]}'. When enabled, the disable_gunzip option disables the decompression of files with the .gz extension. If this option is disabled or missing, files with the .gz extension will be decompressed.

Each of the clauses in a CREATE PIPELINE statement are described below.

Filesystem Pipeline Syntax

Info

SingleStore Managed Service does not support Filesystem Pipelines.

The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:

CREATE PIPELINE my_pipeline
AS LOAD DATA FS '/path/to/files/*'
INTO TABLE `my_table`
FIELDS TERMINATED BY ',';

START PIPELINE my_pipeline;

This statement creates a new pipeline named my_pipeline, uses a directory as the data source, and will start ingesting data into my_table.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" = "true"}'

For more information about Filesystem Pipelines, see Filesystem Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

HDFS Pipeline Syntax

The following example statement demonstrates how to create an HDFS pipeline using the minimum required syntax:

CREATE PIPELINE my_pipeline
AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE `my_table`
FIELDS TERMINATED BY '\t';

START PIPELINE my_pipeline;

This example creates a new pipeline named my_pipeline and references the HDFS path /path/to/files as the data source. Once the pipeline is started, the HDFS extractor recursively walks the directory tree in the HDFS path, looking for MapReduce output files. Any files with names matching the regular expression part-(m|r)-(.*) are imported into my_table. The import occurs only if there are additional files in the directory that don’t match the regular expression; this check confirms that MapReduce created a SUCCESS file.

If you would like to create a pipeline that imports Hive output files, set the disable_partial_check attribute in your CREATE PIPELINE's CONFIG JSON to true. When the pipeline runs, the extractor will import files as described in the previous paragraph, but will not perform the check for additional files in the directory.

You also specify attributes in CREATE PIPELINE's CONFIG clause when you use advanced HDFS pipelines mode. In this mode, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" = "true"}'

For more information about HDFS Pipelines, see HDFS Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

GCS Pipeline Syntax

The following example statement demonstrates how to create a GCS pipeline using the minimum required syntax:

CREATE PIPELINE mypipeline
AS LOAD DATA GCS 'my-bucket-name'
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
INTO TABLE `my_table`
FIELDS TERMINATED BY ',';

START PIPELINE mypipeline;

This example creates a new pipeline named mypipeline, uses a GCS bucket named my-bucket-name as the data source, and ingests the bucket’s objects into my_table. A GCS pipeline requires authentication to Google before it can start reading objects from a bucket. It supports only HMAC keys and requires a GCS access key and secret key.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" = "true"}'

For more information about GCS pipelines, see GCS Pipelines Overview.

Creating a Parquet Pipeline

The CREATE PIPELINE .. FORMAT PARQUET statement extracts specified fields from records in Parquet files. These files may be located in an Azure, HDFS, S3, or filesystem data source.

The statement assigns the extracted fields to the columns of a new row to be inserted into table_name or passed to proc_name. The fields can also be assigned to temporary values in the SET clause, for use in SQL transformations.

Rows that don’t match the WHERE clause are not included.

Parquet 2.0 encodings are supported.

Parquet Pipelines do not support Kafka.

When you write a CREATE PIPELINE .. FORMAT PARQUET statement, you include the LOAD DATA clause. This clause supports a subset of the error recovery options that are supported by CSV LOAD DATA.

Extracting Parquet Values

parquet_subvalue_mapping specifies the mapping between fields in an input Parquet file and columns in the destination table or temporary variables that are specified in the SET clause.

CREATE PIPELINE .. FORMAT PARQUET uses the ::-separated list of field names in a parquet_subvalue_path to perform successive field name name lookups in nested Parquet group schemas. The last field in parquet_subvalue_path must not itself have group type. Additionally, there must be no fields with repeated type along parquet_subvalue_path; unlike with JSON and Avro, extracting sub-records or sub-lists of Parquet records into SQL JSON columns is not supported with Parquet.

If an optional field along a parquet_subvalue_path is omitted in a given record, the extracted value for that column will be NULL.

parquet_subvalue_path components containing whitespace or punctuation must be surrounded by backticks.

For example, consider a Parquet record with the following schema:

message m {
  required int64 f1;
  optional group g1 {
    optional int64 f2;
    optional int64 f3;
  }
}

In an instance of this schema whose JSON equivalent is {"f1":1, {"g1":{"f2":2, "f3":None}}}, the parquet_subvalue_path f1, g1::f2, g1::f3 will extract the values 1, 2, and NULL, respectively. If the JSON equivalent is instead {"f1":1, {"g1":None}}, the parquet_subvalue_path will extract 1, NULL, and NULL, respectively.

If you are loading .parquet data from an S3 bucket, ensure that non-empty _SUCCESS, _committed, and _started files in the S3 folder are not deleted before the data in the files is loaded into the destination table.

{

Info

SingleStore supports shell globs in bucket names. So you can use wildcards while specifying the S3 path, for example, s3://my_bucket/test_data.parquet/*.parquet.

}

Converting Parquet Values

After a parquet_subvalue_path is evaluated, its value is converted to an unspecified SQL type which may be further explicitly or implicitly converted as if from a SQL string, as shown in the following table. The converted value is written to a column in the destination table or is written to a temporary variable specified in the SET clause.

Parquet Type Converted Value
boolean "1"/"0"
int32 The string representation of the integer.
int64 The string representation of the integer.
int96 See Converting Parquet Time Values.
float SQL NULL if not finite. Otherwise, a string convertible without loss of precision to FLOAT
double SQL NULL if not finite. Otherwise, a string convertible without loss of precision to DOUBLE
binary Verbatim, from input bytes
fixed_len_byte_array Verbatim, from input bytes

Converting Parquet Logical Types

Unsigned integer logical type annotations on int32 and int64 types are respected.

The decimal annotation on binary and fixed_len_byte_array types will trigger conversion as if to a numeric literal compatible with a SQL DECIMAL type of the same scale and precision.

All other logical type annotations are ignored and have no effect on conversion.

Converting Parquet Time Values

Values of int96 type will be converted as if to datetime literals truncated to microsecond precision. The underlying value will be converted to the cluster’s time zone according to the TIMEZONE clause, with a default heuristic in place when the clause is omitted.

Time zone conversions may be necessary because some Parquet writers, including Hive, convert time values to UTC before encoding them as int96. Others, including Impala, perform no conversion and write values as they appear in the writer’s local time zone. Parquet files provide no definitive information about the time zone of int96 data.

When the TIMEZONE clause is omitted, SingleStore DB will attempt to automatically perform time zone conversions based on imperfect information in file metadata. This default heuristic may produce incorrect results. Providing the value of @@time_zone to the TIMEZONE clause will disable this behavior and guarantee no conversion.

When the TIMEZONE clause is provided, SingleStore DB will assume that encoded data is in the specified time zone, converting it to the SingleStore DB time zone.

Times outside years 0001 through 9999 will be converted to NULL and a warning will be emitted. In addition, if the pipeline is performing time zone conversions, then the valid range is further restricted to times between 1970-01-01 and 2038-01-19 03:14:07. The validity check is performed after converting to the SingleStore DB time zone.

No automatic integer-to-datetime conversion occurs for time values encoded as int64 or int32, even if the logical type is a time type like timestamp. Use the SET clause to explicitly convert such values to a DATETIME compatible form.

Example

Consider a Parquet file with the following schema:

message m1 {
  required boolean f1;
  required fixed_len_byte_array(4) f2;
  optional group g1 {
    optional binary f3 (STRING);
    required int64 f4;
  }
}

example.parquet contains four records whose JSON representation is:

{"f1": false, "f2": "rec1", "g1": null}
{"f1": true, "f2": "rec2", "g1": null}
{"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}}
{"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}

Create a pipeline that ingests example.parquet into table t:

CREATE TABLE t(c2 BLOB, c3 BLOB, c4 BIGINT UNSIGNED);

CREATE PIPELINE p
  AS LOAD DATA FS "example.parquet"
  INTO TABLE t
  (@v1 <- f1,
  @v2 <- f2,
  c3 <- g1::f3,
  c4 <- g1::f4)
  FORMAT PARQUET
  SET c2 = UPPER(CONVERT(@v2, CHAR))
  WHERE @v1 = TRUE;

Test the pipeline:

TEST PIPELINE p;
****
+------+------+------+
| c3   | c4   | c2   |
+------+------+------+
| NULL | NULL | REC2 |
| NULL |    3 | REC3 |
| four |    4 | REC4 |
+------+------+------+

Note the following about the output of TEST PIPELINE p;:

  • The first record in the example.parquet .json representation was not included in the output because the WHERE clause, using the temporary variable v1, filters out rows where f1 is false.
  • The second record of the output contains NULL for columns c3 and c4 because the optional group g1 is null in that record in the .json representation.
  • The third record of the output contains NULL for column c3 because the optional field g1::f3 is null in that record in the .json representation.
  • The column c2 in the output contains uppercase values. This is because the column is set to the uppercase value of the temporary variable f2 , which is set to the value of v2 in each record in the .json representation.

LOAD DATA

CREATE PIPELINE shares many options with LOAD DATA, including its error handling options. It processes extracted and transformed records as if it were a LOAD DATA with the same options.

Like LOAD DATA, pipelines natively load JSON, Avro, and CSV input data.

The SCHEMA REGISTRY {"IP" | "Hostname"} option allows LOAD DATA to pull an Avro schema from a schema registry. For more information, see the Avro Schema Evolution with Pipelines topic.

AS LOAD DATA: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.

  • AS LOAD DATA KAFKA 'topic_endpoint': To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    
  • AS LOAD DATA S3 'bucket-name': To use S3 as a data source, you must specify a bucket name or a bucket name and object name. For example:

    LOAD DATA S3 'my-bucket-name'
    
  • [BATCH_INTERVAL milliseconds]: You can specify a batch interval in milliseconds, which is the amount of time that the pipeline waits before checking the data source for new data, once all of the existing data has been loaded from the data source. If a batch interval is not specified, the default value is 2500. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    BATCH_INTERVAL 500
    
  • [MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]: Specifies the maximum number of batch partitions that can be scheduled into a single batch. Useful for limiting the parallelism of a pipeline on large clusters to prevent the pipeline from throttling system resources.

  • [RESOURCE POOL pool_name]: Specifies the resource pool that is used to load pipeline data.

    • If the resource pool is not specified at the time of pipeline creation, then the pipeline uses the user’s default resource pool. If no default resource pool has been set for the user, then the pipeline uses the value of the resource_pool engine variable as its resource pool.
    • The background tasks of a pipeline runs in its resource pool. Therefore, a resource pool used by a pipeline cannot be dropped.
    • The user who creates the pipeline must have permissions to use the resource pool. The pipeline will continue to use the pool even if the right to use the pool for the pipeline is revoked for the user who created the pool.
    • For more information on resource pools, see Setting Resource Limits.
  • [AGGREGATOR]: Specifying CREATE AGGREGATOR PIPELINE tells SingleStore DB to pull data through the aggregator, instead of directly to the leaves. This option can be more efficient for low parallelism pipelines, like single file S3 loads or single partition Kafka topics, and is required for pipelines into reference tables and tables with auto increment columns.

Info

SingleStore DB does not support the LOAD DATA ... [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY } ERRORS] semantics for ingesting data into columnstore tables with unique keys. As an alternative, you can write a stored procedure that handles duplicate key errors (see Example 7), and have the Pipeline call this procedure.

WITH TRANSFORM

Pipeline source data can optionally be transformed by specifying an executable program. The data is transformed after the extraction process and before it is loaded into the database.

For more information, see Transforms.

Warning

You must install any required dependencies for your transforms (such as Python) on each leaf node in your cluster. Test out your pipeline by running TEST PIPELINE before running START PIPELINE to make sure your nodes are set up properly.

  • WITH TRANSFORM ('uri', 'executable', 'arguments [...]'): Each of the transform’s parameters are required, and they are described below:
  • uri: The transform’s URI is the location from where the executable program can be downloaded, which is specified as either an http:// or file:// endpoint. If the URI points to a tarball with a .tar.gz or .tgz extension, its contents will be automatically extracted. Additionally, the executable parameter must be specified if a the uri is a tarball. If the URI specifies an executable file itself, the executable and arguments parameters can be empty.
  • executable: The filename of the transform executable to run. This parameter is required if a tarball was specified as the endpoint for the transform’s url. If the url itself specifies an executable, this parameter can be empty.
  • arguments: A series of arguments that are passed to the transform executable at runtime. Each argument must be delimited by a space.
WITH TRANSFORM('http://singlestore.com/my-transform.py','','')
WITH TRANSFORM('file://localhost/root/path/to/my-transform.py','','')
WITH TRANSFORM('http://singlestore.com/my-transform-tarball.tar.gz', 'my-executable.py','')
WITH TRANSFORM('http://singlestore.com/my-transform-tarball.tar.gz', 'my-executable.py', '-arg1 -arg1')

INTO PROCEDURE

This feature allows updating multiple tables from one pipeline by specifying a stored procedure.

Info

SingleStore DB does not allow DDL commands in stored procedures that are called from pipelines.

CREATE OR REPLACE PROCEDURE procedure_name (query_name QUERY(field_name data_type, ...))
AS
BEGIN
	procedure_body
END

CREATE PIPELINE pipeline_name
AS LOAD DATA load_data_options
INTO PROCEDURE procedure_name
Info

The query type variable must be the only parameter in the stored procedure used with your pipeline.

Remarks

The following are a list of restrictions and recommendations that you should follow when using a stored procedure with your pipeline:

  • The field list in your QUERY type variable must conform to the schema of the pipeline source (Kafka, S3, etc.).
  • For more information on loading data, see the LOAD DATA topic.
  • Use the pipeline_source_file() built-in function in the SET clause to set a table column to the pipeline data source file.
  • The SET and WHERE clauses are executed before the stored procedure.
  • IGNORE and SKIP ... ERRORS only recognize parsing errors. You must specify the desired behavior in the event of constraint errors in the body of the stored procedure.
  • Transactions (e.g BEGIN TRANSACTION, COMMIT, etc.) are not allowed because the pipeline manages the transaction state for you.
  • Pipelines into stored procedures maintain the same exactly-once semantics as other pipelines, but this means certain read-write patterns (reads after writes in reshuffles or reference tables) are not allowed.
  • Duplicate key behavior must be specified and enforced inside the stored procedure itself. Each INSERT statement can have a duplicate key policy (or none), if it wants.
  • The stored procedure runs on an aggregator instead of on each partition; however, that doesn’t mean the data flows through the aggregator (unless you explicitly use COLLECT). SQL queries issued from the stored procedure are distributed to the leaves as normal. Data will only be processed on the leaf, unless the scheme of one of the tables involved demands otherwise. Data is processed on a per-batch basis from the source pipeline.
  • Unlike CREATE PIPELINE ... INTO TABLE, AUTO INCREMENT is allowed in the stored procedure; however, caution should be exercised as this may drastically reduce your pipeline ingest rate because the data will flow through an aggregator.
  • OUT_OF_ORDER OPTIMIZATION cannot be disabled when using pipelines into stored procedures. If needed, you should enforce ordering in the stored procedure.

Examples

Example 1

The following example sets the table column source_file to the data source file of the pipeline using the pipeline_source_file() built-in function.

CREATE TABLE t (
  col1 bigint(20) DEFAULT NULL,
  col2 bigint(20) DEFAULT NULL,
  source_file blob);
CREATE PIPELINE p
AS LOAD DATA FS '/tmp/test.csv'
INTO TABLE t
FIELDS TERMINATED BY ',' ENCLOSED BY '' ESCAPED BY '\\'
LINES TERMINATED BY '\n' STARTING BY ''
(
    t.col1,
    t.col2
)
SET
    'source_file' = pipeline_source_file();
cat test.csv
****
1,2 \n
3,4 \n
4,5 \n
START PIPELINE p;
****
Query OK, 0 rows affected (0.01 sec)
select * from t;
****
+------+------+---------------+
| col1 | col2 | source_file   |
+------+------+---------------+
|    1 |    2 | /tmp/test.csv |
|    3 |    4 | /tmp/test.csv |
|    4 |    5 | /tmp/test.csv |
+------+------+---------------+
3 rows in set (0.00 sec)

Example 2

Suppose you have a stream of tweets and want to split them into two tables. If you have CSV data in your Kafka pipeline and want to put it into two tables, you can do the following

CREATE PIPELINE pipeline_name
AS LOAD DATA kafka 'kafka-host/tweet_data'
INTO PROCEDURE proc
FIELDS TERMINATED BY ',';

DELIMITER //

CREATE OR REPLACE PROCEDURE proc(batch query(tweet json))
AS
BEGIN
  INSERT INTO tweets(tweet_id, user_id, text)
  SELECT tweet::tweet_id, tweet::user_id, tweet::text
  FROM batch;

  INSERT INTO retweets_counter(user_id, num_retweets)
    SELECT tweet::retweeted_user_id, 1
    FROM batch
    ON DUPLICATE KEY UPDATE num_retweets = num_retweets + 1
    WHERE tweet::retweeted_user_id IS NOT NULL;
END //

DELIMITER ;

Example 3

Another example is loading into a table and updating some aggregate values.

CREATE OR REPLACE PIPELINE pipeline_name
AS LOAD DATA kafka 'kafka-host/tweet_data'
INTO PROCEDURE proc
FIELDS TERMINATED BY ',';

DELIMITER //

CREATE OR REPLACE PROCEDURE proc(split query(tweet json))
AS
BEGIN
  INSERT INTO tweets(tweet_id, user_id, text)
    SELECT tweet::tweet_id, tweet::user_id, tweet::text
    FROM batch;

  INSERT INTO retweets_counter(user_id, num_retweets)
    SELECT tweet::retweeted_user_id, 1
    FROM batch
    ON DUPLICATE KEY UPDATE num_retweets = num_retweets + 1
    WHERE tweet::retweeted_user_id IS NOT NULL;
END //

DELIMITER ;

Example 4

This example shows how to maintain user-specified transactional boundaries.

DELIMITER //

CREATE OR REPLACE PROCEDURE proc(batch query(pos_id bigint, value bigint, commit bool))
AS
BEGIN
    INSERT INTO positions_staging SELECT * FROM batch;

    INSERT INTO positions_finalized
      SELECT * FROM positions_staging
      WHERE pos_id IN (SELECT pos_id FROM positions_staging where commit)
      AND NOT commit;

    DELETE FROM positions_staging
      WHERE pos_id IN (SELECT pos_id FROM positions_staging where commit);
END //

DELIMITER ;

Example 5

This example shows how to implement change data capture (CDC).

DELIMITER //

CREATE OR REPLACE PROCEDURE proc(batch query(matchKey bigint, value bigint, doDelete bool))
AS
DECLARE
    _matchKey bigint;
    _value bigint;
BEGIN
    FOR r IN collect(batch) LOOP
        IF r.doDelete then
            _matchKey = r.matchKey;
            DELETE FROM t WHERE t.key = _matchKey;
        ELSE
            _matchKey = r.matchKey;
            _value = r.value;
            INSERT INTO t VALUES(_matchKey, _value);
        END IF;
    END LOOP;
END //

DELIMITER ;

Example 6

This example shows how to transform incoming data during the loading process.

DELIMITER //

CREATE OR REPLACE PROCEDURE proc(
  batch query(ip varchar(64), value varchar(255)))
AS
BEGIN
    INSERT INTO t
      SELECT batch.*, ip_to_point_table.geopoint
      FROM batch
      JOIN ip_to_point_table
      ON ip_prefix(ip) = ip_to_point_table.ip;
END //

DELIMITER ;

Example 7

This example shows how to call other stored procedures from within your stored procedure.

DELIMITER //

CREATE OR REPLACE PROCEDURE proc(batch query(id bigint, payload json))
AS
BEGIN
    CALL run_machine_learning_iteration(batch);
    INSERT INTO t SELECT * FROM batch;
END //

DELIMITER ;

Example 8

The following example shows how to handle duplicate key errors in a Pipeline.

DELIMITER //

CREATE OR REPLACE PROCEDURE ondupkeyupdate_t (batch query(a INT, b INT)) AS
BEGIN
  FOR row in COLLECT(batch) LOOP
    BEGIN
      INSERT t VALUES(row.a, row.b);
    EXCEPTION
      WHEN ER_DUP_ENTRY THEN
        UPDATE t SET t.b = row.b WHERE t.a = row.a;
    END;
  END LOOP;
END //

DELIMITER ;

Example 9

This example shows how to handle duplicate key errors in a Pipeline using the ON DUPLICATE KEY UPDATE clause.

cat /tmp/cst-list.csv
****
Sam,7214,7
Sasha,5296,8
DESC cust;
****
+--------+-------------+------+------+---------+-------+
| Field  | Type        | Null | Key  | Default | Extra |
+--------+-------------+------+------+---------+-------+
| NAME   | varchar(32) | YES  |      | NULL    |       |
| ID     | int(11)     | NO   | PRI  | 0       |       |
| ORDERS | int(11)     | YES  |      | NULL    |       |
+--------+-------------+------+------+---------+-------+
SELECT * FROM cust;
****
+-------+------+--------+
| NAME  | ID   | ORDERS |
+-------+------+--------+
| Chris | 7214 |      6 |
| Elen  | 8301 |      4 |
| Adam  | 3412 |      5 |
+-------+------+--------+
DELIMITER //

CREATE or REPLACE PROCEDURE dupKeyUpdate (batch query(
   Name VARCHAR(32),
   ID INT,
   Orders INT)) AS
BEGIN
  INSERT INTO cust(Name, ID, Orders)
  SELECT Name, ID, Orders
  FROM batch
  ON DUPLICATE KEY UPDATE
    Name = VALUES(Name),
    Orders = VALUES(Orders);
END //

DELIMITER ;

CREATE PIPELINE p1
AS LOAD DATA FS '/tmp/cst-list.csv'
INTO PROCEDURE dupKeyUpdate
FIELDS TERMINATED BY ',';

START PIPELINE p1;

SELECT * FROM cust;
****
+-------+------+--------+
| NAME  | ID   | ORDERS |
+-------+------+--------+
| Sam   | 7214 |      7 |
| Elen  | 8301 |      4 |
| Sasha | 5296 |      8 |
| Adam  | 3412 |      5 |
+-------+------+--------+

Example 10

The following example shows how to use multiple SET clauses for ingesting values to columns using variables. Consider the following file:

echo "1,NULL,2020-08-06 07:53:09" >> /pipeline_test_infile/infile.csv
echo "2,2020-08-06 07:53:09,2020-09-06 07:53:09" > /pipeline_test_infile/infile.csv
echo "3,2020-08-06 07:53:09,NULL" >> /pipeline_test_infile/infile.csv

Create a pipeline that ingests infile.csv into the following table orders.

CREATE TABLE orders (
  ID INT,
  del_t1 DATETIME,
  del_t2 DATETIME);
CREATE PIPELINE order_load AS
	LOAD DATA FS '/pipeline_test_infile/'
	INTO TABLE orders
	FIELDS TERMINATED BY ','
	(ID, @del_t1, @del_t2)
	SET del_t1 = IF(@del_t1='NULL',NULL,@del_t1),
		  del_t2 = IF(@del_t2='NULL',NULL,@del_t2);

Test the pipeline:

TEST PIPELINE order_load;
****
+------+---------------------+---------------------+
| ID   | del_t1              | del_t2              |
+------+---------------------+---------------------+
|    1 | NULL                | 2020-08-06 07:53:09 |
|    2 | 2020-08-07 07:53:09 | 2020-09-08 07:53:09 |
|    3 | 2020-08-05 07:53:09 | NULL                |
+------+---------------------+---------------------+