You are viewing an older version of this section. View current production version.
CREATE PIPELINE clause creates a new pipeline in a MemSQL database. The complete syntax with all possible clauses is shown below:
CREATE PIPELINE [IF NOT EXISTS] pipeline_name AS LOAD DATA KAFKA 'kafka_topic_endpoint' [BATCH_INTERVAL milliseconds] [WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ] [REPLACE | IGNORE] INTO TABLE table_name [FIELDS | COLUMNS] [TERMINATED BY 'string' [[OPTIONALLY] ENCLOSED BY 'char'] [ESCAPED BY 'char'] ] [LINES [STARTING BY 'string'] [TERMINATED BY 'string'] ] [IGNORE number LINES] [ (column_name, ... ) ] [ON DUPLICATE KEY UPDATE column_name = expression, [...]]
CREATE PIPELINE clause uses standard
LOAD DATA syntax options. For more information, see the LOAD DATA page.
The following example statement demonstrates the minimum way to create a pipeline:
CREATE PIPELINE mypipeline AS LOAD DATA KAFKA '127.0.0.1/my-topic' INTO TABLE `my_table`;
This statement creates a new pipeline named
my_pipeline, uses a Kafka cluster as the data source, points to the location of the
my-topic topic at the Kafka cluster’s endpoint, and will start ingesting data into
Each of the clauses in a
CREATE PIPELINE statement are described below.
AS LOAD DATA: You can load data by specifying Kafka or a file as the pipeline’s data source.
AS LOAD DATA KAFKA ['topic_endpoint']: To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster.
[BATCH_INTERVAL milliseconds]: You can specify a batch interval in milliseconds, which is the time duration between the end of a batch operation and the start of the next one. If a batch interval is not specified, the default value is
0, which results in the extractor batching data as quickly as possible.
LOAD DATA KAFKA '127.0.0.1/my-topic'
LOAD DATA KAFKA '127.0.0.1/my-topic' BATCH_INTERVAL 500
Transforms are currently a preview feature of Pipelines. Their syntax and functionality may change in future MemSQL versions.
Pipeline source data can be transformed by specifying an executable program. The data is transformed after the extraction process and before it is loaded into the database. For more information, see Transforms.
WITH TRANSFORM ('uri', 'executable', 'arguments [...]'): Each of the transform’s parameters are required, and they are described below:
uri: The transform’s URI is the location from where the executable program can be downloaded, which is specified as either an
file://endpoint. If the URI points to a tarball with a
.tgzextension, its contents will be automatically extracted. Additionally, the
executableparameter must be specified if a the
uriis a tarball. If the URI specifies an executable file itself, the
argumentsparameters can be empty.
executable: The filename of the transform executable to run. This parameter is required if a tarball was specified as the endpoint for the transform’s
url. If the
urlitself specifies an executable, this parameter can be empty.
arguments: A series of arguments that are passed to the transform executable at runtime. Each argument must be delimited by a space.
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py','')
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py', '-arg1 -arg1')