The SingleStore DB Kafka Connector is a Kafka Connect connector that allows you to easily ingest AVRO, JSON, and CSV messages from Kafka topics into SingleStore DB. More specifically, the SingleStore DB Kafka Connector is a Sink (target) connector designed to read data from Kafka topics and write that data to SingleStore DB tables.
To understand Kafka’s core concepts and how it works, please read the Kafka documentation. This guide assumes that you understand Kafka’s basic concepts and terminology, and that you have a working Kafka environment up and running.
The SingleStore DB Kafka Connector is available in two versions: via the Confluent Hub and as a download from SingleStore DB.
Note: After you have installed the version you want to use, you will need to configure the connector properties.
The rest of this page describes how the connector works.
- Installing the Confluent Kafka Connector via Confluent Hub
- Installing via SingleStore DB Kafka Connector via Download
- Configuring the Connector Properties
Note: You can also use a pipeline to load data from Kafka into SingleStore DB.
Connector Behavior
The README file contains the latest information about the connector.
Auto-creation of tables
While loading data, if the table does not exist in SingleStore DB, it will be created using the information from the first record.
The table name is the name of the topic. The table schema is taken from the record’s valueSchema
. If valueSchema
is not a struct, then a single column with name data will be created with the schema of the record. Table keys are taken from the tableKey
property.
If the table already exists, all records will be loaded directly into it. Automatic schema changes are not supported, so all records should have the same schema.
Exactly once delivery
To achieve exactly once delivery, set memsql.metadata.allow
to true
. The kafka_connect_transaction_metadata
table will then be created.
This table contains an identifier, count of records, and time of each transaction. The identifier consists of kafka-topic
, kafka-partition
, and kafka-offset
. This combination provides a unique identifier that prevents duplication of data in the SingleStore DB database. Kafka saves offsets and increases them only if the kafka-connect
job succeeds. If the job fails, Kafka will restart the job with the same offset. This means that if the data is written to the database, but the operation fails, Kafka will try to write data with the same offset and metadata identifier to prevent duplication of existing data and simply complete the work successfully.
Data is written to the table and to the kafka_connect_transaction_metadata
table in one transaction. Because of this, if an error occurs, no data is added to the database.
To overwrite the name of this table, use the memsql.metadata.table
property.
Data Types
The connector converts Kafka data types to SingleStore DB data types:
Kafka Type | SingleStore DB Type |
---|---|
STRUCT | JSON |
MAP | JSON |
ARRAY | JSON |
INT8 | TINYINT |
INT16 | SMALLINT |
INT32 | INT |
INT64 | BIGINT |
FLOAT32 | FLOAT |
FLOAT64 | DOUBLE |
BOOLEAN | TINYINT |
BYTES | TEXT |
STRING | VARBINARY(1024) |
Table keys
To add a column as a key in SingleStore DB, use the tableKey
property:
Suppose you have an entity:
{
"id" : 123,
"name" : "Alice"
}
If you want to add the id
column as a PRIMARY KEY to your SingleStore DB table, add "tableKey.primary": "id"
to your properties configuration.
Doing so will generate the following query during table creation:
CREATE TABLE IF NOT EXISTS `table` (
`id` INT NOT NULL,
`name` TEXT NOT NULL,
PRIMARY KEY (`id`)
)
You can also specify the name of a key by providing it like this: "tableKey.primary.someName" : "id"
.
This will create a key with a name:
CREATE TABLE IF NOT EXISTS `table` (
`id` INT NOT NULL,
`name` TEXT NOT NULL,
PRIMARY KEY `someName`(`id`)
)
Table Names
By default, the Kafka Connector maps data from topics into SingleStore DB tables by matching the topic name to the table name. For example, if the Kafka topic is called kafka-example-topic
then the connector will load it into the SingleStore DB table called kafka-example-topic
. The table will be created if it does not already exist.
To specify a custom table name, you can use the memsql.tableName.<topicName>
property.
{
...
"memsql.tableName.foo" : "bar",
...
}
In this example, data from the Kafka topic foo
will be written to the SingleStore DB table called bar
.
You can use this method to specify custom table names for multiple topics:
{
...
"memsql.tableName.kafka-example-topic-1" : "memsql-table-name-1",
"memsql.tableName.kafka-example-topic-2" : "memsql-table-name-2",
...
}