You are viewing an older version of this section. View current production version.
CREATE PIPELINE
Create a new Pipeline to continuously extract, transform, and then load data into a table or stored procedure.
Syntax
CREATE [OR REPLACE] [AGGREGATOR] PIPELINE [IF NOT EXISTS] pipeline_name AS
LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration
| azure_blob_configuration | hdfs_configuration }
[BATCH_INTERVAL milliseconds]
[MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]
[(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION]
[WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ]
[REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY } ERRORS]
{ INTO TABLE table_name | INTO PROCEDURE proc_name }
{ json_format_options | avro_format_options | csv_format_options }
[ (column_name, ... ) ]
[SET col_name = expr,... | pipeline_source_file()]
[WHERE expr,...]
[ON DUPLICATE KEY UPDATE column_name = expression, [...]]
kafka_configuration:
KAFKA 'kafka_topic_endpoint'
s3_configuration:
S3 { 'bucket-name' | 'bucket-name/object-name' | 'bucket-name/prefix/object-name' }
[CONFIG 'configuration_json']
CREDENTIALS 'credentials_json'
filesystem_configuration:
FS 'path'
azure_blob_configuration:
AZURE { 'container-name' | 'container-name/object-name' | 'container-name/prefix/object-name' }
CREDENTIALS 'credentials_json'
[CONFIG 'configuration_json']
hdfs_configuration:
HDFS 'hdfs://<namenode DNS | IP address>:<port>/<directory>'
[CONFIG 'configuration_json']
json_format_options:
FORMAT JSON
( {col_name | @variable_name} <- subvalue_path [DEFAULT literal_expr], ...)
avro_format_options:
FORMAT AVRO
( {col_name | @variable_name} <- subvalue_path, ...)
[SCHEMA 'avro_schema']
subvalue_path:
{% | [%::]ident [::ident ...]}
csv_format_options:
[FORMAT CSV]
[{FIELDS | COLUMNS}
TERMINATED BY 'string'
[[OPTIONALLY] ENCLOSED BY 'char']
[ESCAPED BY 'char']
]
[LINES
[STARTING BY 'string']
[TERMINATED BY 'string']
]
[IGNORE number LINES]
[ ({col_name | @variable_name}, ...) ]
The REPLACE
capability of pipelines maintains the cursor positions when replacing existing pipelines.
Kafka Pipeline Syntax
The following example statement demonstrate how to create a Kafka pipeline using the minimum required syntax:
Minimum Required Kafka Pipeline Syntax:
CREATE PIPELINE mypipeline AS
LOAD DATA KAFKA '127.0.0.1/my-topic'
INTO TABLE `my_table`;
START PIPELINE mypipeline;
This statement creates a new pipeline named my_pipeline
, uses a Kafka cluster as the data source, points to the location of the my-topic
topic at the Kafka cluster’s endpoint, and will start ingesting data into my_table
. For more information about Kafka Pipelines, see Kafka Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.
Unless otherwise specified, Kafka records may be processed out of order if the MemSQL cluster is 1.5 * pipelines_max_partitions_per_batch
behind in a single Kafka partition; however, records will be committed in order in all cases. This is an optimization specific to Kafka pipelines and is enabled by default. If you require the records to be processed in order (e.g. in upsert scenarios), create your pipeline with DISABLE OUT_OF_ORDER OPTIMIZATION
specified.
Compressed topics are supported without requiring any additional configuration in MemSQL.
S3 Pipeline Syntax
The following example statement demonstrate how to create an S3 pipeline using the minimum required syntax:
Minimum Required S3 Pipeline Syntax:
CREATE PIPELINE mypipeline AS
LOAD DATA S3 'my-bucket-name'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"}, ["role_arn":"replace_with_your_role_arn"]'
INTO TABLE `my_table`
START PIPELINE mypipeline;
This statement creates a new pipeline named my_pipeline
, uses an S3 bucket named my-bucket-name
as the data source, and will start ingesting the bucket’s objects into my_table
. Credentials for your S3 bucket can either be in the form of an AWS access key or an Amazon Resource Name (ARN).
No CONFIG
clause is required to create an S3 pipeline. This clause is used to specify the Amazon S3 region where the source bucket is located. If no CONFIG
clause is specified, MemSQL will automatically use the us-east-1
region, also known as US Standard
in the Amazon S3 console. To specify a different region, such as us-west-1
, include a CONFIG
clause as shown in the example below.
For more information about S3 Pipelines, see S3 Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.
S3 Pipeline Using Specified Region
CREATE PIPELINE mypipeline AS
LOAD DATA S3 'my-bucket-name'
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key", ["role_arn":"replace_with_your_role_arn"]}'
INTO TABLE `my_table`
Azure Blob Pipeline Syntax
The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.
Minimum Required Azure Pipeline Syntax:
CREATE PIPELINE mypipeline AS
LOAD DATA AZURE 'my-container-name'
CREDENTIALS '{"account_name": "my_account_name", "account_key":
"my_account_key"}'
INTO TABLE `my_table`
START PIPELINE mypipeline;
This statement creates a new pipeline named my_pipeline
, uses an Azure
Blob container named my-container-name
as the data source, and will
start ingesting the bucket’s objects into my_table
. For more
information about Azure Pipelines, see Azure Blob Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.
Note that no CONFIG
clause is required to create an Azure pipeline.
Each of the clauses in a CREATE PIPELINE
statement are described below.
Filesystem Pipeline Syntax
SingleStore Managed Service does not support Filesystem Pipelines.
The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:
CREATE PIPELINE my_pipeline
AS LOAD DATA FS '/path/to/files/*'
INTO TABLE `my_table`
FIELDS TERMINATED BY ',';
START PIPELINE my_pipeline;
This statement creates a new pipeline named my_pipeline
, uses a directory as the data source, and will start ingesting data into my_table
. For more information about Filesystem Pipelines, see Filesystem Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.
HDFS Pipeline Syntax
The following example statement demonstrates how to create an HDFS pipeline using the minimum required syntax:
CREATE PIPELINE my_pipeline
AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE `my_table`
FIELDS TERMINATED BY '\t';
START PIPELINE my_pipeline;
This example creates a new pipeline named my_pipeline
and references the HDFS path /path/to/files
as the data source. Once the pipeline is started, the HDFS extractor recursively walks the directory tree in the HDFS path, looking for MapReduce output files. Any files with names matching the regular expression part-(m|r)-(.*)
are imported into my_table
. The import occurs only if there are additional files in the directory that don’t match the regular expression; this check confirms that MapReduce created a SUCCESS
file.
If you would like to create a pipeline that imports Hive output files, set the disable_partial_check
attribute in your CREATE PIPELINE
's CONFIG
JSON to true
. When the pipeline runs, the extractor will import files as described in the previous paragraph, but will not perform the check for additional files in the directory.
You also specify attributes in CREATE PIPELINE
's CONFIG
clause when you use advanced HDFS pipelines mode. In this mode, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos.
For more information about HDFS Pipelines, see HDFS Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.
LOAD DATA
CREATE PIPELINE
shares many options with LOAD DATA
, including its error handling options. It processes extracted and transformed records as if it were a LOAD DATA
with the same options.
Like LOAD DATA
, Pipelines natively loads JSON, Avro, and CSV input data.
For more information, see the LOAD DATA page.
AS LOAD DATA
: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.
-
AS LOAD DATA KAFKA 'topic_endpoint'
: To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster. For example:LOAD DATA KAFKA '127.0.0.1/my-topic'
-
AS LOAD DATA S3 'bucket-name'
: To use S3 as a data source, you must specify a bucket name or a bucket name and object name. For example:LOAD DATA S3 'my-bucket-name'
-
[BATCH_INTERVAL milliseconds]
: You can specify a batch interval in milliseconds, which is the time duration between the end of a batch operation and the start of the next one. If a batch interval is not specified, the default value is2500
. For example:LOAD DATA KAFKA '127.0.0.1/my-topic' BATCH_INTERVAL 500
-
[MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]
: Specifies the maximum number of batch partitions that can be scheduled into a single batch. Useful for limiting the parallelism of a pipeline on large clusters to prevent the pipeline from throttling system resources. -
[AGGREGATOR]
: SpecifyingCREATE AGGREGATOR PIPELINE
tells MemSQL to pull data through the aggregator, instead of directly to the leaves. This option can be more efficient for low parallelism pipelines, like single fileS3
loads or single partitionKafka
topics, and is required for pipelines into reference tables and tables with auto increment columns.
WITH TRANSFORM
Pipeline source data can optionally be transformed by specifying an executable program. The data is transformed after the extraction process and before it is loaded into the database.
For more information, see Transforms.
You must install any required dependencies for your transforms (such as Python) on each leaf node in your cluster. Test out your pipeline by running TEST PIPELINE
before running START PIPELINE
to make sure your nodes are set up properly.
WITH TRANSFORM ('uri', 'executable', 'arguments [...]')
: Each of the transform’s parameters are required, and they are described below:uri
: The transform’s URI is the location from where the executable program can be downloaded, which is specified as either anhttp://
orfile://
endpoint. If the URI points to a tarball with a.tar.gz
or.tgz
extension, its contents will be automatically extracted. Additionally, theexecutable
parameter must be specified if a theuri
is a tarball. If the URI specifies an executable file itself, theexecutable
andarguments
parameters can be empty.executable
: The filename of the transform executable to run. This parameter is required if a tarball was specified as the endpoint for the transform’surl
. If theurl
itself specifies an executable, this parameter can be empty.arguments
: A series of arguments that are passed to the transform executable at runtime. Each argument must be delimited by a space.
WITH TRANSFORM('http://memsql.com/my-transform.py','','')
WITH TRANSFORM('file://localhost/root/path/to/my-transform.py','','')
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py','')
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py', '-arg1 -arg1')
INTO PROCEDURE
This feature allows updating multiple tables from one pipeline by specifying a stored procedure.
MemSQL does not allow DDL commands in stored procedures that are called from pipelines.
CREATE OR REPLACE PROCEDURE procedure_name (query_name QUERY(field_name data_type, ...))
AS
BEGIN
procedure_body
END
CREATE PIPELINE pipeline_name
AS LOAD DATA load_data_options
INTO PROCEDURE procedure_name
The query type variable must be the only parameter in the stored procedure used with your pipeline.
Remarks
The following are a list of restrictions and recommendations that you should follow when using a stored procedure with your pipeline:
- The field list in your QUERY type variable must conform to the schema of the pipeline source (Kafka, S3, etc.).
- For more information on loading data, see the LOAD DATA topic.
- Use the
pipeline_source_file()
built-in function in theSET
clause to set a table column to the pipeline data source file. - The
SET
andWHERE
clauses are executed before the stored procedure. IGNORE
andSKIP ... ERRORS
only recognize parsing errors. You must specify the desired behavior in the event of constraint errors in the body of the stored procedure.- Transactions (e.g
BEGIN TRANSACTION
,COMMIT
, etc.) are not allowed because the pipeline manages the transaction state for you. - Pipelines into stored procedures maintain the same exactly-once semantics as other pipelines, but this means certain read-write patterns (reads after writes in reshuffles or reference tables) are not allowed.
- Duplicate key behavior must be specified and enforced inside the stored procedure itself. Each
INSERT
statement can have a duplicate key policy (or none), if it wants. - The stored procedure runs on an aggregator instead of on each partition; however, that doesn’t mean the data flows through the aggregator (unless you explicitly use
COLLECT
). SQL queries issued from the stored procedure are distributed to the leaves as normal. Data will only be processed on the leaf, unless the scheme of one of the tables involved demands otherwise. Data is processed on a per-batch basis from the source pipeline. - Unlike
CREATE PIPELINE ... INTO TABLE
,AUTO INCREMENT
is allowed in the stored procedure; however, caution should be exercised as this may drastically reduce your pipeline ingest rate because the data will flow through an aggregator. OUT_OF_ORDER OPTIMIZATION
cannot be disabled when using pipelines into stored procedures. If needed, you should enforce ordering in the stored procedure.
Examples
Example 1
The following example sets the table column source_file
to the data source file of the pipeline using the pipeline_source_file()
built-in function.
CREATE TABLE t (
col1 bigint(20) DEFAULT NULL,
col2 bigint(20) DEFAULT NULL,
source_file blob);
CREATE PIPELINE p
AS LOAD DATA FS '/tmp/test.csv'
INTO TABLE t
FIELDS TERMINATED BY ',' ENCLOSED BY '' ESCAPED BY '\\'
LINES TERMINATED BY '\n' STARTING BY ''
(
t.col1,
t.col2
)
SET
'source_file' = pipeline_source_file();
cat test.csv
****
1,2 \n
3,4 \n
4,5 \n
START PIPELINE p;
****
Query OK, 0 rows affected (0.01 sec)
select * from t;
****
+------+------+---------------+
| col1 | col2 | source_file |
+------+------+---------------+
| 1 | 2 | /tmp/test.csv |
| 3 | 4 | /tmp/test.csv |
| 4 | 5 | /tmp/test.csv |
+------+------+---------------+
3 rows in set (0.00 sec)
Example 2
Suppose you have a stream of tweets and want to split them into two tables. If you have CSV data in your Kafka pipeline and want to put it into two tables, you can do the following
CREATE OR REPLACE PIPELINE pipeline_name
AS LOAD DATA kafka 'kafka-host/tweet_data'
INTO PROCEDURE proc
FIELDS TERMINATED BY ',';
DELIMITER //
CREATE OR REPLACE PROCEDURE proc(batch query(tweet json))
AS
BEGIN
INSERT INTO tweets(tweet_id, user_id, text)
SELECT tweet::tweet_id, tweet::user_id, tweet::text
FROM batch;
INSERT INTO retweets_counter(user_id, num_retweets)
SELECT tweet::retweeted_user_id, 1
FROM batch
ON DUPLICATE KEY UPDATE num_retweets = num_retweets + 1
WHERE tweet::retweeted_user_id is not null;
END //
DELIMITER ;
Example 3
Another example is loading into a table and updating some aggregate values.
CREATE PIPELINE pipeline_name
AS LOAD DATA kafka 'kafka-host/tweet_data'
INTO PROCEDURE proc
FIELDS TERMINATED BY ',';
DELIMITER //
CREATE OR REPLACE PROCEDURE proc(split query(tweet json))
AS
BEGIN
INSERT INTO tweets(tweet_id, user_id, text)
SELECT tweet::tweet_id, tweet::user_id, tweet::text
FROM batch;
INSERT INTO retweets_counter(user_id, num_retweets)
SELECT tweet::retweeted_user_id, 1
FROM batch
ON DUPLICATE KEY UPDATE num_retweets = num_retweets + 1
WHERE tweet::retweeted_user_id is not null;
END //
DELIMITER ;
Example 4
This example shows how to maintain user-specified transactional boundaries.
DELIMITER //
CREATE OR REPLACE PROCEDURE proc(batch query(pos_id bigint, ..., commit bool))
AS
BEGIN
INSERT INTO positions_staging SELECT * FROM batch;
INSERT INTO positions_finalized
SELECT * FROM positions_staging
WHERE pos_id IN (SELECT pos_id FROM positions_staging where commit)
AND NOT commit;
DELETE FROM positions_staging
WHERE pos_id IN (SELECT pos_id FROM positions_staging where commit);
END //
DELIMITER ;
Example 5
This example shows how to implement change data capture (CDC).
DELIMITER //
CREATE OR REPLACE PROCEDURE proc(batch query(matchKey bigint, value bigint, doDelete bool))
AS
DECLARE
_matchKey bigint;
_value bigint;
BEGIN
FOR r IN collect(batch) LOOP
IF r.doDelete then
_matchKey = r.matchKey;
DELETE FROM t WHERE t.key = _matchKey;
ELSE
_matchKey = r.matchKey;
_value = r.value;
INSERT INTO t VALUES(_matchKey, _value);
END IF;
END LOOP;
END //
DELIMITER ;
Example 6
This example shows how to transform incoming data during the loading process.
DELIMITER //
CREATE OR REPLACE PROCEDURE proc(batch query(ip varchar, ...))
AS
BEGIN
INSERT INTO t
SELECT batch.*, ip_to_point_table.geopoint
FROM batch
JOIN ip_to_point_table
ON ip_prefix(ip) = ip_to_point_table.ip;
END //
DELIMITER ;
Example 7
This example shows how to call other stored procedures from within your stored procedure.
DELIMITER //
CREATE OR REPLACE PROCEDURE proc(batch query(...))
AS
BEGIN
CALL run_machine_learning_iteration(batch);
INSERT INTO t SELECT * FROM batch;
END //
DELIMITER ;
Example 8
The following example shows how to handle duplicate key errors in a Pipeline using the ON DUPLICATE KEY UPDATE
clause.
cat /tmp/cst-list.csv
****
Sam,7214,7
Sasha,5296,8
DESC cust;
****
+--------+-------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+-------------+------+------+---------+-------+
| NAME | varchar(32) | YES | | NULL | |
| ID | int(11) | NO | PRI | 0 | |
| ORDERS | int(11) | YES | | NULL | |
+--------+-------------+------+------+---------+-------+
SELECT * FROM cust;
****
+-------+------+--------+
| NAME | ID | ORDERS |
+-------+------+--------+
| Chris | 7214 | 6 |
| Elen | 8301 | 4 |
| Adam | 3412 | 5 |
+-------+------+--------+
DELIMITER //
CREATE or REPLACE PROCEDURE dupKeyUpdate (batch query(
Name VARCHAR(32),
ID INT,
Orders INT)) AS
BEGIN
INSERT INTO cust(Name, ID, Orders)
SELECT Name, ID, Orders
FROM batch
ON DUPLICATE KEY UPDATE
Name = VALUES(Name),
Orders = VALUES(Orders);
END //
DELIMITER ;
CREATE PIPELINE p1
AS LOAD DATA FS '/tmp/cst-list.csv'
INTO PROCEDURE dupKeyUpdate
FIELDS TERMINATED BY ',';
START PIPELINE p1;
SELECT * FROM cust;
****
+-------+------+--------+
| NAME | ID | ORDERS |
+-------+------+--------+
| Sam | 7214 | 7 |
| Elen | 8301 | 4 |
| Sasha | 5296 | 8 |
| Adam | 3412 | 5 |
+-------+------+--------+