Outdated Version

You are viewing an older version of this section. View current production version.

Connecting to Spark

Apache Spark is an open-source data processing framework. Spark excels at iterative computation and includes numerous libraries for statistical analysis, graph computations, and machine learning. The MemSQL Spark Connector allows you to connect your Spark and MemSQL environments. You can use MemSQL and Spark together to accelerate workloads by taking advantage of computational power of spark in tandem with the fast ingest and persistent storage MemSQL has to offer.

The MemSQL Spark Connector integrates with Apache Spark 2.3 and 2.4 and supports both data loading and extraction from database tables and Spark DataFrames.

The connector is implemented as a native Spark SQL plugin, and supports Spark’s DataSource API. Spark SQL supports operating on a variety of data sources through the DataFrame interface, and the DataFrame API is the widely used framework for how Spark interacts with other systems.

In addition, the connector is a true Spark data source; it integrates with the Catalyst query optimizer, supports robust SQL pushdown, and leverages MemSQL LOAD DATA to accelerate ingest from Spark via compression.

You can download the Spark Connector from its GitHub repository and from Maven Central. The group is com.memsql and the artifact is memsql-spark-connector_2.11.

This topic discusses how to configure and start using the MemSQL Spark Connector 3.0.

Note: We’ve made significant changes between the Spark Connector 3.0 and Spark Connector 2.0. Please see the Migrating between the Spark Connector 2.0 and the Spark Connector 3.0 section.

Getting Started

The Spark Connector 3.0 library requires Apache Spark 2.3 or 2.4.

  • You need a running MemSQL cluster running versions 6.8 or higher, and a running Apache Spark environment.

  • You can find the latest version of the connector on Maven Central and spark-packages.org. The group is com.memsql and the artifact is memsql-spark-connector_2.11.

  • We release two versions of the memsql-spark-connector, one per Spark version. An example version number is: 3.0.0-spark-2.3.4 which is the 3.0.0 version of the connector, compiled and tested against Spark 2.3.4. Make sure you are using the most recent version of the connector.

  • In addition to adding the memsql-spark-connector, you will also need to have the MariaDB JDBC driver installed. The driver will be pulled in automatically as a dependency when you import the connector into your library from a package manager. If you plan to install the connector jar manually, you will need to install the dependency separately. This library is tested against the following MariaDB driver version:

    "org.mariadb.jdbc" % "mariadb-java-client"  % "2.+"
    

    Installation

    You can add the Spark Connector 3.0 your Spark application using Spark-Shell, PySpark, or spark-submit by running the following command. Make sure to update the command with the connector and spark version.

    $SPARK_HOME/bin/spark-shell --packages com.memsql:memsql-spark-connector_2.11:3.0.<insert-connector-version>spark-<insert-spark-version>
    

    Alternatively, you may use Maven or SBT.

    MemSQL integration with Spark Using Maven

    This topic describes how to integrate and connect Spark to MemSQL using Maven.

    1. Log in to the machine where the Maven project is to be created.

    2. Create an empty Maven project (will only contain pom.xml and the src directory):

      mvn archetype:generate -DgroupId=example
      -DartifactId=SparkMemSQLConnection
      -DarchetypeArtifactId=maven-archetype-quickstart
      -DinteractiveMode=false
      

      Note: Maven uses a set of identifiers, also called coordinates, to uniquely identify a project and specify how the project artifact should be packaged:

      • groupId – a unique base name of the company or group that created the project
      • artifactId – a unique name of the project
      • archetypeArtifactId – a project template that contains only a pom.xml file and src directory
    3. Update the pom.xml file in your project to include the MemSQL Spark Connector dependency. The following is an example pom.xml file with the MemSQL Spark Connector 3.0 dependency. Your pom.xml file might be different based on your project’s required dependencies and your version of Spark.

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>org.example</groupId>
          <artifactId>SparkMemSQLConnection</artifactId>
          <version>1.0-SNAPSHOT</version>
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <version>3.8.0</version>
                      <configuration>
                          <source>1.8</source>
                          <target>1.8</target>
                      </configuration>
                  </plugin>
                  <plugin>
                      <artifactId>maven-shade-plugin</artifactId>
                      <version>2.4.1</version>
                      <executions>
                          <execution>
                              <phase>package</phase>
                              <goals>
                                  <goal>shade</goal>
                              </goals>
                              <configuration>
                                  <filters>
                                      <filter>
                                          <artifact>*:*</artifact>
                                          <excludes>
                                              <exclude>META-INF/*.RSA</exclude>
                                              <exclude>META-INF/*.SF</exclude>
                                              <exclude>META-INF/*.inf</exclude>
                                          </excludes>
                                      </filter>
                                  </filters>
                                  <transformers>
                                      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                      <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                          <resource>reference.conf</resource>
                                      </transformer>
                                      <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                          <mainClass>{main-class-name}</mainClass>
                                      </transformer>
                                  </transformers>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      
          <dependencies>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.11</artifactId>
                  <version>2.4.4</version>
              </dependency>
              <dependency>
                  <groupId>com.memsql</groupId>
                  <artifactId>memsql-spark-connector_2.11</artifactId>
                  <version>3.0.1-spark-2.4.4</version>
              </dependency>
          </dependencies>
      
      </project>
      
    4. Edit the pom.xml file (using names appropriate to your app/environment)

      • Change the name to your parent folder
      • Enter the target main class “{main-class-name}” in the tag
      • Build the project from the parent directory using the following command: mvn clean package

    You are ready to run the executable.

    MemSQL integration with Spark Using SBT

    This topic describes how to integrate and connect Spark to MemSQL using SBT.

    1. Log in to the machine where the SBT project is to be created.

    2. Create the following directory structure to encompass the SBT project:

      SparkmemSQLSBT
        |── build.sbt
        |── project
          |── plugins.sbt
        |── src
        |── main
        |── scala
          |── Reader.scala
          |── Writter.scala
      
    3. Add the following content to the file build.sbt, in addition to any other additional dependencies required for your project. This is an example dependency file with the MemSQL Spark Connector 3.0. Your file may be different based on your version of Spark and other required project dependencies.

      name := ""SparkMemSQLConnector""
      
      version := ""0.1""
      
      scalaVersion := ""2.11.12""
      
      mainClass := Some(""Reader"")
      
      val sparkVersion = ""2.4.4""
      
      libraryDependencies += ""org.apache.spark"" %% ""spark-sql"" % sparkVersion
      libraryDependencies += ""com.memsql"" % ""memsql-spark-connector_2.11"" % ""3.0.0-spark-2.4.4""
      
      
      assemblyMergeStrategy in assembly := {
        case PathList(""META-INF"", xs @ _*) =>
          xs map {_.toLowerCase} match {
            case ""manifest.mf"" :: Nil | ""index.list"" :: Nil | ""dependencies"" :: Nil =>
              MergeStrategy.discard
            case ps @ x :: xs if ps.last.endsWith("".sf"") || ps.last.endsWith("".dsa"") =>
              MergeStrategy.discard
            case ""plexus"" :: xs =>
              MergeStrategy.discard
            case ""services"" :: xs =>
              MergeStrategy.filterDistinctLines
            case ""spring.schemas"" :: Nil | ""spring.handlers"" :: Nil =>
              MergeStrategy.filterDistinctLines
            case _ => MergeStrategy.first
          }
        case ""application.conf"" => MergeStrategy.concat
        case ""reference.conf"" => MergeStrategy.concat
        case _ => MergeStrategy.first
      }
      
    4. Develop your Spark application including MemSQL as the datastore for load and sink.

    5. Package your application by setting the target main class on the file build.sbt

      • Choose the target main class in the mainClass := Some("target_main_class_name")
      • Build the project from the parent directory using the following command: Set clean assembly

    You are ready to run the executable.

Configuration Settings

The MemSQL Spark Connector leverages Spark SQL’s Data Sources API. The connection to MemSQL relies on the following Spark configuration settings:

The memsql-spark-connector is configurable globally via Spark options and locally when constructing a DataFrame. The global and local options use the same names; however the global options have the prefix spark.datasource.memsql.:

Option Description
ddlEndpoint (required) Hostname or IP address of the MemSQL Master Aggregator in the format host[:port](port is optional). Example: master-agg.foo.internal:3308 or master-agg.foo.internal
dmlEndPoint Hostname or IP address of MemSQL Aggregator nodes to run queries against in the format host[:port],host[:port],... (:port is optional, multiple hosts separated by comma). Example: child-agg:3308,child-agg2 (default: ddlEndpoint)
user (required) MemSQL username.
password (required) MemSQL password.
query The query to run (mutually exclusive with database table).
dbtable The table to query (mutually exclusive with query).
database If set, all connections will default to using this database (default:empty).
overwriteBehavior Specify the behavior during Overwrite; one of dropAndCreate, truncate, merge (default: dropAndCreate).
truncate Deprecated option, please use overwriteBehavior instead. Truncate instead of drop an existing table during Overwrite (default: false).
loadDataCompression Compress data on load; one of three options: GZip, LZ4, Skip. (default:GZip).
disablePushdown Disable SQL Pushdown when running queries (default:false).
enableParallelRead Enable reading data in parallel for some query shapes (default: false).
tableKey Specify additional keys to add to tables created by the connector (See below for more details).
onDuplicateKeySQL If this option is specified, and a row is to be inserted that would result in a duplicate value in a PRIMARY KEY or UNIQUE index, MemSQL will instead perform an UPDATE of the old row. See examples below.
insertBatchSize Size of the batch for row insertion (default: 10000).
loadDataFormat Serialize data on load; either (Avro, CSV) (default: CSV).
maxErrors The maximum number of errors in a single LOAD DATA request. When this limit is reached, the load fails. If this property is set to 0, no error limit exists (Default: 0)

Examples

Example 1: Configuring the memsql-spark-connector globally

spark.conf.set("spark.datasource.memsql.ddlEndpoint", "memsql-master.cluster.internal")
spark.conf.set("spark.datasource.memsql.dmlEndpoints", "memsql-master.cluster.internal,memsql-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.memsql.user", "admin")
spark.conf.set("spark.datasource.memsql.password", "s3cur3-pa$$word")
SparkConf conf = new SparkConf();
conf.set("spark.datasource.memsql.ddlEndpoint", "memsql-master.cluster.internal")
conf.set("spark.datasource.memsql.dmlEndpoints", "memsql-master.cluster.internal,memsql-child-1.cluster.internal:3307")
conf.set("spark.datasource.memsql.user", "admin")
conf.set("spark.datasource.memsql.password", "s3cur3-pa$$word")
spark.conf.set("spark.datasource.memsql.ddlEndpoint", "memsql-master.cluster.internal")
spark.conf.set("spark.datasource.memsql.dmlEndpoints", "memsql-master.cluster.internal,memsql-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.memsql.user", "admin")
spark.conf.set("spark.datasource.memsql.password", "s3cur3-pa$$word")

Example 2: Reading a MemSQL table (foo) into a DataFrame

val df = spark.read
    .format("memsql")
    .option("ddlEndpoint", "memsql-master.cluster.internal")
    .option("user", "admin")
    .load("foo")
DataFrame df = spark
  .read()
  .format("memsql")
  .option("ddlEndpoint", "memsql-master.cluster.internal")
  .option("user", "admin")
  .load("foo");
df = spark \
  .read \
  .format("memsql") \
  .option("ddlEndpoint", "memsql-master.cluster.internal") \
  .option("user", "admin") \
  .load("foo")

Example 3: Reading a MemSQL table (foo) into a DataFrame and applying Data Frame operations

val x = spark.read
    .format("memsql")
    .option("ddlEndpoint")
    .option("user", "admin")
    .load("foo")
    .withColumn("hello", lit(2))
    .filter(col("id") > 1)
    .limit(1000)
    .groupBy(col("id"))
    .agg(count("*"))

Example 4: Configuring the memsql-spark-connector using an external table in Spark SQL and selecting from it

spark.sql("CREATE TABLE bar USING memsql OPTIONS ('ddlEndpoint'='memsql-master.cluster.internal','dbtable'='foo.bar')")
spark.sql("select * from bar limit 10").show()

Example 5: Using the Spark write API to save a Data Frame to MemSQL

df.write
    .format("memsql")
    .option("loadDataCompression", "LZ4")
    .option("overwriteBehavior", "dropAndCreate")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

Note: This is the same for Java and Python.

If the target table (“foo” in the example above) does not exist in MemSQL the memsql-spark-connector will automatically attempt to create the table. If you specify SaveMode.Overwrite, if the target table already exists, it will be recreated or truncated before load. Specify overwriteBehavior = truncate to truncate rather than re-create.

Save Modes

Save operations can optionally take a SaveMode setting (the above example uses Overwrite). Save Modes specify how to handle existing data if present.

It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an overwrite, the data will be deleted before writing out the new data.

  1. SaveMode.Append means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

  2. SaveMode.Overwrite means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

    Overwrite mode depends on overwriteBehavior option, for better understanding look at the section Merging On Save.

  3. SaveMode.ErrorIfExists means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

  4. SaveMode.Ignore means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.

Example of SaveMode option

df.write
    .mode(SaveMode.Append)
    .save("foo.bar")

Retrieving the number of written rows from taskMetrics

It is possible to add the listener and get the number of written rows.

spark.sparkContext.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    println("Task id: " + taskEnd.taskInfo.id.toString)
    println("Records written: " + taskEnd.taskMetrics.outputMetrics.recordsWritten.toString)
  }
})


df.write.format("memsql").save("example")

Specifying keys for tables created by the Spark Connector

When creating a table, the memsql-spark-connector will read options prefixed with tableKey. These options must be formatted in a specific way in order to correctly specify the keys.

Info

The default table type is a MemSQL columnstore. If you want to use a rowstore table, you will need to specify a primary key using the tableKey option.

To explain we will refer to the following example:

df.write
    .format("memsql")
    .option("tableKey.primary", "id")
    .option("tableKey.key.created_firstname", "created, firstname")
    .option("tableKey.unique", "username")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

In this example, we are creating three keys:

  1. A primary key on the id column
  2. A regular key on the combination of the firstname and created columns, with the key name created_firstname
  3. A unique key on the username column Note on (2): Any key can optionally specify a name, just put it after the key type. Key names must be unique.

To change the default ColumnStore sort key you can specify it explicitly:

df.write
    .option("tableKey.columnstore", "id")

You can also customize the shard key like so.

df.write
    .option("tableKey.shard", "id, lastname")

Inserting rows into the table with ON DUPLICATE KEY UPDATE

When updating a rowstore table it is possible to insert rows via the ON DUPLICATE KEY UPDATE option. See sql reference for more details.

df.write
    .option("onDuplicateKeySQL", "age = age + 1")
    .option("insertBatchSize", 300)
    .mode(SaveMode.Append)
    .save("foo.bar")

As a result of the following query, all new rows will be appended without changes. If a row with the same PRIMARY KEY or UNIQUE index already exists then the corresponding age value will be increased.

When you use ON DUPLICATE KEY UPDATE, all rows of the data frame are split into batches, and every insert query will contain no more than the specified insertBatchSize rows setting.

Using the onDuplicateKeySQL setting to Perform a Partial Update

You can also use the OnDuplicateKeySQL setting to do a partial update on rows matching a primary key. For example, consider a data frame df, with a primary key id that we created and loaded into MemSQL from a Spark Dataframe.

df.write
      .format("memsql")
      .option("tableKey.primary", "id")
      .save("test")
spark.read.format("memsql").load("test").show()

Output:

+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
|  2|   2|           2|
|  1|   1|           1|
|  3|   3|           3|
+---+----+------------+

In this case, the user only wants to update the column dataToUpdate, but not the column data. Note that if you would like to update both, you should instead use the overWriteBehavior setting and set it to merge.

First, we create a sample data frame that will hold the data which reflects our updates to the table. Note how both columns data and datatoUpdate in this updated data frame are different compared to the originally created object.

val updateDf = spark.createDF(
      List(
        (1, -1, 20),
        (2, -1, 100)
      ),
      List(
        ("id", IntegerType, true),
        ("data", IntegerType, true),
        ("dataToUpdate", IntegerType, true)
      )
)

The following command will only update the dataToUpdate field columns in the MemSQL table, not the data field.

updateDf.write
      .format("memsql")
      .option("onDuplicateKeySQL", "dataToUpdate = VALUES(dataToUpdate)")
      .mode(SaveMode.Append)
      .save("test")
spark.read.format("memsql").load("test").show()

The final output of the partial update looks like this:

+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
|  2|   2|         100|
|  1|   1|          20|
|  3|   3|           3|
+---+----+------------+

Merging on save

When saving dataframes or datasets to MemSQL, you can manage how SaveMode.Overwrite is interpreted by the connector via the option overwriteBehavior. This option can take one of the following values:

  1. dropAndCreate(default) - drop and create the table before writing new values.
  2. truncate - truncate the table before writing new values.
  3. merge - replace rows with new rows by matching on the primary key. (Use this option only if you need to fully rewrite existing rows with new ones. If you need to specify some rule for the update, use the onDuplicateKeySQL option instead.)

All these options are case-insensitive.

Example of merge option

Suppose you have the following table t, and the Id column is the primary key.

SELECT * FROM t;
Id Name Age
1 Alice 20
2 Bob 25
3 Charlie 30

If you save the following dataframe with overwriteBehavior = merge:

Id Name Age
2 Daniel 22
3 Eve 27
4 Franklin 35
df.write
    .format("memsql")
    .option("overwriteBehavior", "merge")
    .mode(SaveMode.Overwrite)
    .save("db.t")

After the save is complete, the table will look like this:

note: rows with Id=2 and Id=3 were overwritten with new rows
note: the row with Id=1 was not touched and still exists in the result

SELECT * FROM t;
Id Name Age
1 Alice 20
2 Daniel 22
3 Eve 27
4 Franklin 35

Supported Operations

For all supported operations in Spark, refer to Spark’s data source APIs.

SQL Pushdown

The memsql-spark-connector has extensive support for rewriting Spark SQL and dataframe operation query plans into standalone MemSQL queries. This allows most of the computation to be pushed into the MemSQL distributed system without any manual intervention. The SQL rewrites are enabled automatically but can be disabled using the disablePushdown option. We also support partial pushdown in the case where certain parts of a query can be evaluated in MemSQL and certain parts need to be evaluated in Spark.

SQL Pushdown is either enabled or disabled on the entire Spark Session. If you want to run multiple queries in parallel with different values of disablePushdown, make sure to run them on separate Spark Sessions.

We currently support most of the primary Logical Plan nodes in Spark SQL including: Project, Filter, Aggregate, Window, Join, Limit, Sort.

We also support most Spark SQL expressions. A full list of supported operators/functions can be found in the file ExpressionGen.scala.

The best place to look for examples of fully supported queries is in the tests. Check out this file as a starting point: SQLPushdownTest.scala.

SQL Pushdown Incompatibilities

  • ToUnixTimestamp and UnixTimestamp handle only time less then 2038-01-19 03:14:08, if they get DateType or TimestampType as a first argument
  • FromUnixTime with default format (yyyy-MM-dd HH:mm:ss) handle only time less then 2147483648 (2^31)

Parallel Read Support

If you enable parallel reads via the enableParallelRead option, the memsql-spark-connector will attempt to read results directly from MemSQL leaf nodes. This can drastically improve performance in some cases.

Note: Parallel reads are not consistent

Parallel reads read directly from partitions on the leaf nodes which skips our entire transaction layer. This means that the individual reads will see an independent version of the databases distributed state. Make sure to take this into account when enabling parallel read.

Note: Parallel reads transparently fallback to single stream reads

Parallel reads currently only work for query-shapes which do no work on the Aggregator and thus can be pushed entirely down to the leaf nodes. To determine if a particular query is being pushed down you can ask the dataframe how many partitions it has like so:

df.rdd.getNumPartitions

If this value is > 1 then we are reading in parallel from leaf nodes.

Note: Parallel reads require consistent authentication and connectible leaf nodes

In order to use parallel reads, the username and password provided to the memsql-spark-connector must be the same across all nodes in the cluster.

In addition, the hostnames and ports listed by SHOW LEAVES must be directly connectible from Spark.

Load Balancing Requests

DML and DDL endpoints in the configuration refer to your child aggregators and your master aggregator, respectively. Use the DDL endpoint (master aggregator) for running DDL operations (e.g. CREATE TABLE). Use DML endpoints (child aggregators) to read data and perform other DML operations. When you provide both endpoints to the connector, JDBC will automatically load balance non-DDL queries across your aggregators.

Data Type Conversions

Spark Type MemSQL Type
LongType BIGINT
IntegerType TINYINT
ShortType SMALLINT
FloatType FLOAT
DoubleType DOUBLE
ByteType TINYINT
StringType TEXT
BinaryType BLOB
DecimalType DECIMAL
BooleanType TINYINT
TimeStampType TIMESTAMP(6)
DateType DATE

When reading a MemSQL table as a Spark DataFrame, the MemSQL column type will be converted to the following SparkType:

MemSQL Type SparkType
TINYINT ShortType
SMALLINT ShortType
INT IntegerType
BIGINT LongType
DOUBLE DoubleType
FLOAT FloatType
DECIMAL DecimalType
TIMESTAMP TimeStampType
TIMESTAMP(6) TimeStampType
DATE DateType
TEXT StringType
JSON StringType
TIME TimeStampType
BIT BinaryType
BLOB BinaryType

Data Type Conversion Remarks

  • When using the onDuplicateKeySQL option, the connector will error when writing a null-terminated StringType (i.e., ‘\0’).
  • DECIMAL in MemSQL and DecimalType in Spark have different maximum scales and precisions. An error will occur if you perform a read or write from a table or dataFrame with unsupported precision or scale. MemSQL’s maximum scale for a type DECIMAL is 30, while Spark’s maximum scale for DecimalType is 38. Similarly, MemSQL’s maximum precision is 65, while Spark’s maximum precision is 38.
  • TIMESTAMP in MemSQL supports values from 1000 to 2147483647999. MemSQL treats a null value in the TIMESTAMP column as the current time.
  • The Avro format does not support writing of TIMESTAMP and DATE types. As a result, the MemSQL Spark Connector currently does not support these types with Avro serialization.

Security and Permissions

SQL Permissions

The spark user must have access to the Master Aggregator.

Additionally, MemSQL has a permission matrix which describes the permissions required to run each command.

To make any SQL operations through Spark connector you should have different permissions for different type of operation. The matrix below describes the minimum permissions you should have to perform some operation. As alternative to minimum required permissions, ALL PRIVILEGES allow you to perform any operation.

Operation Min. Permission Alternative Permission
READ from collection SELECT ALL PRIVILEGES
WRITE to collection SELECT, INSERT ALL PRIVILEGES
DROP database or collection SELECT, INSERT, DROP ALL PRIVILEGES
CREATE database or collection SELECT, INSERT, CREATE ALL PRIVILEGES

SSL Support

The MemSQL Spark Connector uses the MariaDB JDBC Driver under the hood and thus supports SSL configuration out of the box. In order to configure SSL, first ensure that your MemSQL cluster has SSL configured. Documentation on how to set this up can be found here:

SSL Configuration

Once you have setup SSL on your server, you can enable SSL via setting the following options:

spark.conf.set("spark.datasource.memsql.useSSL", "true")
spark.conf.set("spark.datasource.memsql.serverSslCert", "PATH/TO/CERT")

Note: the serverSslCert option may be server’s certificate in DER form, or the server’s CA certificate. Can be used in one of 3 forms:

  • serverSslCert=/path/to/cert.pem (full path to certificate)
  • serverSslCert=classpath:relative/cert.pem (relative to current classpath)
  • or as verbatim DER-encoded certificate string ------BEGIN CERTIFICATE-----...

You may also want to set these additional options depending on your SSL configuration:

spark.conf.set("spark.datasource.memsql.trustServerCertificate", "true")
spark.conf.set("spark.datasource.memsql.disableSslHostnameVerification", "true")

More information on the above parameters can be found at MariaDB’s documentation for their JDBC driver here: https://mariadb.com/kb/en/about-mariadb-connector-j/#tls-parameters

Connecting with a Kerberos-authenticated User

You can use the MemSQL Spark Connector with a Kerberized user without any additional configuration. To use a Kerberized user, you need to configure the connector with the given MemSQL database user that is authenticated with Kerberos (via the user option). Please visit our documentation here to learn about how to configure MemSQL users with Kerberos.

Here is an example of configuring the Spark connector globally with a Kerberized MemSQL user called krb_user.

spark = SparkSession.builder()
    .config(“spark.datasource.memsql.user”, “krb_user”)
    .getOrCreate()

You do not need to provide a password when configuring a Spark Connector user that is Kerberized. The connector driver (MariaDB) will be able to authenticate the Kerberos user from the cache by the provided username. Other than omitting a password with this configuration, using a Kerberized user with the Connector is no different than using a standard user. Note that if you do provide a password, it will be ignored.

Debugging

SQL Pushdown

If you want to know whether your queries are pushed down, df.explain() can show what parts of the query were pushed down and can also be used for debugging if you encounter an issue. Additionally, if you pass the argument true, you will get a lot more output that includes pre- and post-optimization passes.

Other

In addition, the memsql-spark-connector outputs a lot of helpful information when the TRACE log level is enabled for the com.memsql.spark package. You can do this in your log4j configuration by adding the following line: log4j.logger.com.memsql.spark=TRACE Make sure not to leave it in place since it generates a huge amount of tracing output.

Migrating between the Spark Connector 2.0 and the Spark Connector 3.0

You may have previously used the MemSQL Spark Connector 2.0. There are many enhancements between the two versions, and the sections below describe the differences in configuration and functionality between the MemSQL Spark Connector versions 3.0 and 2.0.

Configuration Differences

  • If you are only using the Spark reader API, using version 3.0 of the connector will only require a change in the configuration options. Please see below for details.
  • If you are using the previous Spark connector to write data to MemSQL, the existing savetoMemSQL function is deprecated and has been replaced with Spark’s df.write and .save() functions.

Configuration Comparisons

2.0 Option Related 3.0 Option (if applicable) Details
masterHost ddlEndpoint Master host to connect to.
N/A dmlEndpoint Spark Connector 3.0 allows you to specify DML endpoints for load balancing for non-DDL queries. Load balancing is supported through different mechanism in Spark 2.0
masterPort N/A For the 3.0 version, the port is specified in the ddl/dml endpoint, respectively.
user user User to connect with.
password password Password for user.
defaultDatabase database Database to connect to.
N/A query The query to run (optional in 3.0).
N/A dbtable The table to query (optional in 3.0).
defaultSaveMode N/A Spark streaming is not available in 3.0. This is for Spark streaming in version 2.0 and allows a user to specify options for overriding duplicate keys.
disablePartitionPushdown enableParallelRead Spark connector 3.0 provides opt-in parallel read option.
defaultCreateMode N/A Controls whether databases and tables are created if they don’t exist. In 3.0 , we will automatically create a table if it doesn’t exist, but we will not create a database.
CompressionType loadDataCompression Compression options (there are more compression options available in the 3.0 connector).
defaultInsertBatchSize insertBatchSize This is for limiting insert batches when using multi-insert statements.
N/A disablePushdown Controls whether SQL pushdown is Enabled or Disabled. The previous connector did not support robust SQL pushdown.

Functionality Differences

Version 2.0 of the MemSQL Spark Connector contains the following functionality that is not available in version 3.0:

  • The SaveToMemSQL() function to write to MemSQL; this is replaced with using df.write directly
  • Adding indexes to automatically created tables; this can be done via a JDBC query
  • No formal Spark Streaming API integration.