You are viewing an older version of this section. View current production version.
Spark Connector
The MemSQL Spark Connector integrates with Apache Spark 2.0 and 2.1 and supports both data loading and extraction from database tables and Spark DataFrames.
You can download the Spark Connector from its GitHub repository.
This topic discusses how to configure and start using the Spark Connector 2.0. For information on version 1.0 of the Spark Connector, see the 1.33 branch on GitHub.
The 79 page guide covers how to design, build, and deploy Spark applications using the MemSQL Spark Connector. Inside, you will find code samples to help you get started and performance recommendations for your production-ready Apache Spark and MemSQL implementations. Download the free MemSQL Spark Connector Guide today.
Prerequisites
When using MemSQL 6.0 or later, you must use Spark Connector 2.0.5 or later. Also, the Spark Connector 2.0 library requires Apache Spark 2.0+ and has been primarily tested against Spark version 2.0.2. For support with Spark 1.x, please check the 1.x branch.
Installation
Inside your project definition, add a dependency for the MemSQL Connector using either sbt or Maven:
sbt Configuration:
libraryDependencies += "com.memsql" %% "memsql-connector" % "2.0.2"
Maven Configuration:
<dependency>
<groupId>com.memsql</groupId>
<artifactId>memsql-connector_2.11</artifactId>
<version>2.0.2</version>
</dependency>
Configuration Settings
The MemSQL Spark Connector leverages Spark SQL’s Data Sources API. The connection to MemSQL relies on the following Spark configuration settings:
Setting Name | Default Value |
---|---|
spark.memsql.host | localhost |
spark.memsql.port | 3306 |
spark.memsql.user | root |
spark.memsql.password | None |
spark.memsql.defaultDatabase | None |
spark.memsql.defaultSaveMode | “error” (see description below) |
spark.memsql.disablePartitionPushdown | false |
spark.memsql.defaultCreateMode | DatabaseAndTable |
When data is loaded to a MemSQL table, defaultCreateMode
specifies whether the connector will create the database and/or table if it doesn’t already exist. The possible values are DatabaseAndTable
, Table
, and Skip
. The user will need the corresponding create permissions if the value is not Skip
.
Note that all MemSQL credentials have to be the same on all nodes to take advantage of partition pushdown, which queries leaves directly.
Loading Data from MemSQL
The following example creates a DataFrame from the table illinois
in the database customers
. To use the library, pass in com.memsql.spark.connector
as the format
parameter to ensure that Spark calls the MemSQL Spark Connector code. The path
option is the full path of the table using the syntax $database_name
.$table_name
. If there is only a table name, the connector will look for the table in the default database set in the configuration.
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val conf = new SparkConf()
.setAppName("MemSQL Spark Connector Example")
.set("spark.memsql.host", "10.0.0.190")
.set("spark.memsql.password", "foobar")
.set("spark.memsql.defaultDatabase", "customers")
val spark = SparkSession.builder().config(conf).getOrCreate()
val customersFromIllinois = spark
.read
.format("com.memsql.spark.connector")
.options(Map("path" -> ("customers.illinois")))
.load()
// customersFromIllinois is now a Spark DataFrame which represents the specified MemSQL table
// and can be queried using Spark DataFrame query functions
// count the number of rows
println(s"The number of customers from Illinois is ${customersFromIllinois.count()}")
// print out the DataFrame
customersFromIllinois.show()
Instead of specifying a MemSQL table as a path
option, you can also create a DataFrame from a SQL query by using the query
option. This option minimizes the amount of data transferred from MemSQL to Spark, and pushes down distributed computations to MemSQL instead of Spark. For best performance, either specify the database name using the option database
, or ensure a default database is set in the Spark configuration. Either of these settings enables the connector to query the MemSQL leaf nodes directly, instead of going through the master aggregator.
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val conf = new SparkConf()
.setAppName("MemSQL Spark Connector Example")
.set("spark.memsql.host", "10.0.0.190")
.set("spark.memsql.password", "foobar")
val spark = SparkSession.builder().config(conf).getOrCreate()
val customersFromIllinois = spark
.read
.format("com.memsql.spark.connector")
.options(Map("query" -> ("select age_group, count(*) from customers.illinois where number_of_orders > 3 GROUP BY age_group"),
"database" -> "customers"))
.load()
customersFromIllinois.show()
// +-----------+---------+
// | age_group | count(*)|
// +-----------+---------+
// | 13-18 | 128 |
// | 19-25 | 150 |
// | 26+ | 140 |
// +-----------+---------+
Saving Data to MemSQL
Similarly, use Spark SQL’s Data Sources API to save a DataFrame to MemSQL. To save a DataFrame in the MemSQL table students
:
...
val rdd = sc.parallelize(Array(Row("John Smith", 12), Row("Jane Doe", 13)))
val schema = StructType(Seq(StructField("Name", StringType, false),
StructField("Age", IntegerType, false)))
val df = sqlContext.createDataFrame(rdd, schema)
df
.write
.format("com.memsql.spark.connector")
.mode("error")
.save("people.students")
The mode
specifies how to handle duplicate keys when the MemSQL table has a primary key. If unspecified, the default is error
, which means that if a row with the same primary key already exists in MemSQL’s people.students
table, an error will be thrown. Other save modes include:
Save Mode String | Description |
---|---|
“error” | MemSQL will error when encountering a record with duplicate keys |
“ignore” | MemSQL will ignore records with duplicate keys and, without rolling back, continue inserting records with unique keys. |
“overwrite” | MemSQL will replace the existing record with the new record |
The second interface to save data to MemSQL is via the saveToMemSQL
implicit function on a DataFrame you wish to save:
...
val rdd = sc.parallelize(Array(Row("John Smith", 12), Row("Jane Doe", 13)))
val schema = StructType(Seq(StructField("Name", StringType, false),
StructField("Age", IntegerType, false)))
val df = sqlContext.createDataFrame(rdd, schema)
df.saveToMemSQL("people.students")
// The database name can be omitted if "spark.memsql.defaultDatabase" is set
// in the Spark configuration df.sqlContext.sparkContext.getConf.getAll
Types
When saving a DataFrame from Spark to MemSQL, the SparkType of each DataFrame column will be converted to the following MemSQL type:
SparkType | MemSQL Type |
---|---|
ShortType | SMALLINT |
FloatType | FLOAT |
DoubleType | DOUBLE |
LongType | BIGINT |
IntegerType | INT |
BooleanType | BOOLEAN |
StringType | TEXT |
BinaryType | BLOB |
DecimalType | DECIMAL |
TimeStampType | TIMESTAMP |
DateType | DATE |
When reading a MemSQL table as a Spark DataFrame, the MemSQL column type will be converted to the following SparkType:
MemSQL Type | SparkType |
---|---|
TINYINT, SMALLINT | ShortType |
INTEGER | IntegerType |
BIGINT (signed) | LongType |
DOUBLE, NUMERIC | DoubleType |
REAL | FloatType |
DECIMAL | DecimalType |
TIMESTAMP | TimestampType |
DATE | DateType |
TIME | StringType |
CHAR, VARCHAR | StringType |
BIT, BLOG, BINARY | BinaryType |
MemSQL Spark 2.0 Connector does not support GeoSpatial or JSON MemSQL types since Spark 2.0 has currently disabled user defined types (see JIRA issue). These types, when read, will become BinaryType.
Changes from MemSQL Spark Connector 1.x
While the MemSQL Spark Connector 1.x relied on Spark SQL experimental developer APIs, the MemSQL Spark 2.0 Connector uses only the official and stable APIs for loading data from an external data source documented here. In certain cases, the Spark Connector 2.0 can “push down” distributed computations to MemSQL. This means that instead of having Spark perform a a transformation (e.g. filter, join, etc) on the data it retrieved from MemSQL, you can let MemSQL do the operation on the data and pass the result to Spark. The MemSQL Spark Connector 2.0 supports column and filter pushdown; if you would like to push down joins or aggregates, consider explicitly including it in the user-specified query
option. For example, instead of:
val people = spark.read.format("com.memsql.spark.connector").options(Map("path" -> ("db.people"))).load()
val department = spark.read.format("com.memsql.spark.connector").options(Map("path" -> ("db.department"))).load()
val result = people.join(department, people("deptId") === department("id"))
Use the following instead:
val result = spark
.read
.format("com.memsql.spark.connector")
.options(Map("query" -> ("select * from people join department on people.deptId = department.id")))
.load()