Python
Dependencies
Code
#!/usr/bin/python
import os
import sys
import time
import threading
import argparse
from memsql.common import database
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=None, help="The hostname of the MemSQL node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the MemSQL node to connect to")
parser.add_argument("--user", default="root", help="The user of the MemSQL node to connect to")
parser.add_argument("--password", default="", help="The password of the MemSQL node to connect to")
parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")
parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")
options = parser.parse_args()
HOST = None
PORT = None
TABLE = "tbl"
BATCH_SIZE = 5000
# Pre-generate the workload query
QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))
def get_connection(host=None, port=None, db=options.database):
""" Returns a new connection to the database. """
if host is None:
host = HOST
if port is None:
port = PORT
return database.connect(
host=host,
port=port,
user=options.user,
password=options.password,
database=db)
class InsertWorker(threading.Thread):
""" A simple thread which inserts empty rows in a loop. """
def __init__(self, stopping):
super(InsertWorker, self).__init__()
self.stopping = stopping
self.daemon = True
self.exception = None
def run(self):
with get_connection() as conn:
while not self.stopping.is_set():
conn.execute(QUERY_TEXT)
def test_connection():
try:
with get_connection(db="information_schema") as conn:
conn.ping()
except database.MySQLError:
print("Unable to connect to MemSQL with provided connection details.")
print("Please verify that MemSQL is running @ %s:%s" % (HOST, PORT))
sys.exit(1)
def setup_test_db():
""" Create a database and table for this benchmark to use. """
with get_connection(db="information_schema") as conn:
print('Creating database %s' % options.database)
try:
# note: the following query will fail if there is an existing database
conn.query('CREATE DATABASE %s' % options.database)
except database.MySQLError:
print("Database %s already exists - since we drop the database at" % options.database)
print("the end of this script, please specify an un-used database")
print("with the --database flag.")
sys.exit(1)
conn.query('USE %s' % options.database)
conn.query('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)
def warmup():
print('Warming up workload')
with get_connection() as conn:
conn.execute(QUERY_TEXT)
def run_benchmark():
""" Run a set of InsertWorkers and record their performance. """
stopping = threading.Event()
workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]
print('Launching %d workers' % options.num_workers)
print('Workload will take approximately %d seconds.' % options.time)
[ worker.start() for worker in workers ]
time.sleep(options.time)
print('Stopping workload')
stopping.set()
[ worker.join() for worker in workers ]
with get_connection() as conn:
count = conn.get("SELECT COUNT(*) AS count FROM %s" % TABLE).count
print("%d rows inserted using %d threads" % (count, options.num_workers))
print("%.1f rows per second" % (count / float(options.time)))
def cleanup():
""" Cleanup the database this benchmark is using. """
try:
with get_connection() as conn:
conn.query('DROP DATABASE IF EXISTS %s' % options.database)
except database.MySQLError:
pass
if __name__ == '__main__':
HOST = options.host or "127.0.0.1"
PORT = options.port or 3306
cleanup()
try:
test_connection()
setup_test_db()
warmup()
run_benchmark()
except KeyboardInterrupt:
print("Interrupted... exiting...")
Bash
Dependencies
mysql
client program- Bourne-Again Shell (
bash
)
Code
#!/bin/bash
MHOST="127.0.0.1"
MPORT="3306"
MUSER="root"
MDB=""
NUM_WORKERS=128
BATCH_SIZE=256
memsql_exec()
{
mysql -h $MHOST -P $MPORT -u $MUSER $MDB -e "$1"
}
memsql_exec_multi()
{
mysql -h $MHOST -P $MPORT -u $MUSER $MDB -e \
"$(for ((b = 0; b < $BATCH_SIZE; b++)); do
echo "$1;"
done)"
}
echo "Creating database test"
memsql_exec "CREATE DATABASE IF NOT EXISTS test"
MDB="test"
echo "Creating table tbl"
memsql_exec "CREATE TABLE IF NOT EXISTS tbl (id INT AUTO_INCREMENT PRIMARY KEY)"
echo "Launching $NUM_WORKERS workers"
sleep 1
declare -a WORKERS
for ((worker = 0; worker < $NUM_WORKERS; worker++)); do
(while [ 1 ]; do
echo "Worker $worker inserting"
memsql_exec_multi "INSERT INTO tbl VALUES (NULL)"
done) &
WORKERS[$worker]=$!
done
sleep 10
for ((worker = 0; worker < $NUM_WORKERS; worker++)); do
echo "Killing worker $worker"
kill ${WORKERS[$worker]}
wait ${WORKERS[$worker]} 2>/dev/null
done
echo "Cleaning up"
sleep 1
memsql_exec "DROP DATABASE test"
Java
Dependencies
- JDBC library (package
libmysql-java
on Debian-based distributions)
Code
import java.sql.*;
import java.util.Properties;
import java.util.concurrent.*;
public class Sample {
private static final String dbClassName = "com.mysql.jdbc.Driver";
private static final String CONNECTION = "jdbc:mysql://127.0.0.1:3306/";
private static final String USER = "root";
private static final String PASSWORD = "";
private static void executeSQL(Connection conn, String sql) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute(sql);
}
}
private static void ResetEnvironment() throws SQLException {
Properties p = new Properties();
p.put("user", USER);
p.put("password", PASSWORD);
try (Connection conn = DriverManager.getConnection(CONNECTION, p)) {
for (String query: new String[] {
"DROP DATABASE IF EXISTS test",
"CREATE DATABASE test",
"USE test",
"CREATE TABLE tbl (id INT AUTO_INCREMENT PRIMARY KEY)"
}) {
executeSQL(conn, query);
}
}
}
private static void worker() {
Properties properties = new Properties();
properties.put("user", USER);
properties.put("password", PASSWORD);
try (Connection conn = DriverManager.getConnection(CONNECTION, properties)) {
executeSQL(conn, "USE test");
while (!Thread.interrupted()) {
executeSQL(conn, "INSERT INTO tbl VALUES (NULL)");
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException {
Class.forName(dbClassName);
ResetEnvironment();
ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
worker();
}
});
}
Thread.sleep(20000);
executor.shutdownNow();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
}
C#
Dependencies
Code
using System;
using System.Data;
using System.Threading;
using System.Diagnostics;
using MySql.Data.MySqlClient;
namespace MemSQLTest {
//Thread creates a connection and inserts for 1000 milliseconds
public class WorkerThread {
public static void Thread() {
try {
IDbConnection conn;
conn = new MySql.Data.MySqlClient.MySqlConnection();
conn.ConnectionString = "Server=127.0.0.1;Uid=root;Pwd=;";
conn.Open();
IDbCommand dbcmd = conn.CreateCommand();
dbcmd.CommandText = "USE db";
dbcmd.ExecuteNonQuery();
dbcmd.CommandText = "INSERT INTO t VALUES (DEFAULT)";
Stopwatch stop = new Stopwatch();
stop.Start();
while(stop.ElapsedMilliseconds < 1000)
{
dbcmd.ExecuteNonQuery();
}
} catch (Exception ex) {
Console.WriteLine(ex.Message);
}
}
}
class MemSQLTest {
static void Main(string[] args) {
try {
IDbConnection conn;
conn = new MySql.Data.MySqlClient.MySqlConnection();
conn.ConnectionString = "Server=127.0.0.1;Uid=root;Pwd=;";
conn.Open();
//Initialize database with auto increment table
IDbCommand dbcmd = conn.CreateCommand();
dbcmd.CommandText = "DROP DATABASE IF EXISTS db";
dbcmd.ExecuteNonQuery();
dbcmd.CommandText = "CREATE DATABASE db";
dbcmd.ExecuteNonQuery();
dbcmd.CommandText = "USE db";
dbcmd.ExecuteNonQuery();
dbcmd.CommandText = "CREATE TABLE t (id int primary key auto_increment)";
dbcmd.ExecuteNonQuery();
//Initialize threads
Thread[] threads = new Thread[10];
for(int i = 0; i < 10; i++)
{
threads[i] = new Thread(new ThreadStart(WorkerThread.Thread));
threads[i].Start();
}
for(int i = 0; i < 10; i++)
{
threads[i].Join();
}
//Show select count(*) on auto increment table
dbcmd.CommandText = "SELECT COUNT(*) FROM t";
IDataReader reader = dbcmd.ExecuteReader();
while(reader.Read()) {
Console.WriteLine(reader["COUNT(*)"]);
}
} catch (Exception ex) {
Console.WriteLine(ex.Message);
}
}
}
}
C
Dependencies
- C compiler (e.g. gcc)
pthreads
library (present on most Linux distributions)mysqlclient
library, available from thelibmysqlclient-dev
package on Debian-based distributions.
Code
/* Compile with:
*
* cc multi_threaded_inserts.c -lmysqlclient -pthread -o mti
*/
#include <stdlib.h>
#include <stdio.h>
#include <mysql/mysql.h>
const static char *host = "127.0.0.1";
const static char *user = "root";
const static char *passwd = "";
const static size_t port = 3306;
#define NUM_WORKERS 20
static volatile int keep_going = 1;
void *insert_worker(void *worker_id);
int main()
{
my_init();
MYSQL conn;
mysql_init(&conn);
printf("Connecting to MemSQL...\n");
if (mysql_real_connect(&conn, host, user, passwd, NULL, port, NULL, 0) != &conn)
{
printf("Could not connect to the MemSQL database!\n");
goto failure;
}
printf("Creating database 'test'...\n");
if (mysql_query(&conn, "create database test") || mysql_query(&conn, "use test"))
{
printf("Could not create 'test' database!\n");
goto failure;
}
printf("Creating table 'tbl' in database 'test'...\n");
if (mysql_query(&conn, "create table tbl (id bigint auto_increment primary key)"))
{
printf("Could not create 'tbl' table in the 'test' database!\n");
goto failure;
}
printf("Launching %lu insert workers...\n", NUM_WORKERS);
pthread_t workers[NUM_WORKERS];
size_t i;
for (i = 0; i < NUM_WORKERS; ++i)
{
pthread_create(&workers[i], NULL, &insert_worker, (void *)i);
}
printf("Running inserts for %lu seconds...\n", 10);
sleep(10);
keep_going = 0;
size_t rows_inserted = 0;
for (i = 0; i < NUM_WORKERS; ++i)
{
size_t rows_i;
pthread_join(workers[i], &rows_i);
rows_inserted += rows_i;
}
printf("Inserted %lu rows. Cleaning up...\n", rows_inserted);
if (mysql_query(&conn, "drop database test"))
{
printf("Could not drop the testing database 'test'!\n");
}
mysql_close(&conn);
return 0;
failure:
mysql_close(&conn);
return 1;
}
void *insert_worker(void *worker_id)
{
size_t id = (size_t) worker_id;
MYSQL conn;
mysql_init(&conn);
if (mysql_real_connect(&conn, host, user, passwd, "test", port, NULL, 0) != &conn)
{
printf("Worker %lu could not connect to the MemSQL database! Aborting...\n", id);
exit(1);
}
size_t i;
for (i = 0; keep_going; i += 8)
{
if (mysql_query(&conn, "insert into tbl values (null), (null), (null),"
"(null), (null), (null), (null), (null)"))
{
printf("Worker %lu failed to insert data, aborting...\n", id);
exit(1);
}
}
mysql_close(&conn);
return (void *)i;
}
NodeJS
Dependencies
- NPM packages
mysql
andpromise
(runnpm install mysql
andnpm install promise
).
Code
var mysql = require("mysql");
var Promise = require("promise"); // jshint ignore:line
var util = require("util");
var timers = require("timers");
/**
* Tweak the following globals to fit your environment
* ###################################################
*/
var HOST = "127.0.0.1";
var PORT = 3306;
var USER = "root";
var PASSWORD = "";
// Specify which database and table to work with.
// Note: this database will be dropped at the end of this script
var DATABASE = "test";
var TABLE = "tbl";
// The number of workers to run
var NUM_WORKERS = 20;
// Run the workload for this many seconds
var WORKLOAD_TIME = 10;
// Batch size to use
var BATCH_SIZE = 5000;
/**
* Internal code starts here
* #########################
*/
// Pre-generate the insert query
var _batch = [];
for (var i = 0; i < BATCH_SIZE; ++i) { _batch.push("()"); }
_batch = _batch.join(",");
var QUERY_TEXT = util.format("INSERT INTO %s VALUES %s", TABLE, _batch);
var EXIT_FLAG = false;
var get_connection = function(db) {
var conn = mysql.createConnection({
host: HOST,
port: PORT,
user: USER,
password: PASSWORD,
database: (db || DATABASE)
});
return new Promise(function(resolve, reject) {
conn.connect(function(err) {
if (err) { reject(err); }
else { resolve(conn); }
});
});
};
var query = function(conn, query_text) {
return new Promise(function(resolve, reject) {
conn.query(query_text, function(err, rows, fields) {
if (err) { reject(err); }
else { resolve(rows); }
});
});
};
var chained_query = function() {
var args = arguments;
return function(conn) {
return query(conn, util.format.apply(util.format, args))
.then(function(rows) { return conn; });
};
};
var setup_test_db = function() {
console.log("Setting up database");
return get_connection("information_schema")
.then(chained_query("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
.then(chained_query("USE %s", DATABASE))
.then(chained_query("CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY)", TABLE));
};
var insert_worker = function() {
return get_connection()
.then(function(conn) {
return new Promise(function(resolve, reject) {
(function _loop() {
conn.query(QUERY_TEXT, function(err) {
if (err) { reject(err); }
else if (EXIT_FLAG) { resolve(); }
else { _loop(); }
});
}());
});
});
};
var warmup = function() {
console.log("Warming up workload");
return get_connection()
.then(chained_query(QUERY_TEXT));
};
var do_benchmark = function() {
return new Promise(function(resolve, reject) {
console.log("Launching %d workers", NUM_WORKERS);
var workers = [];
for (var i = 0; i < NUM_WORKERS; ++i) {
workers.push(insert_worker());
}
timers.setTimeout(function() {
console.log("Stopping workload");
EXIT_FLAG = true;
resolve(Promise.all(workers));
}, WORKLOAD_TIME * 1000);
});
};
var print_stats = function() {
return get_connection()
.then(function(conn) {
return query(conn, util.format("SELECT COUNT(*) AS count FROM %s", TABLE));
})
.then(function(rows) {
var count = rows[0].count;
console.log("%d rows inserted using %d workers", count, NUM_WORKERS);
console.log("%d rows per second", count / WORKLOAD_TIME);
});
};
var cleanup_test_db = function() {
console.log("Cleaning up");
return get_connection("information_schema")
.then(chained_query("DROP DATABASE %s", DATABASE));
};
var main = function() {
setup_test_db()
.then(warmup)
.then(do_benchmark)
.then(print_stats)
.then(cleanup_test_db, function(err) {
console.error("Error: ", err);
return cleanup_test_db();
})
.then(
function() { process.exit(0); },
function() { process.exit(1); }
);
};
main();