Python
Dependencies
Code
#!/usr/bin/python
import os
import sys
import time
import threading
import argparse
from singlestore.common import database
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=None, help="The hostname of the SingleStore node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the SingleStore node to connect to")
parser.add_argument("--user", default="root", help="The user of the SingleStore node to connect to")
parser.add_argument("--password", default="", help="The password of the SingleStore node to connect to")
parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")
parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")
options = parser.parse_args()
HOST = None
PORT = None
TABLE = "tbl"
BATCH_SIZE = 5000
# Pre-generate the workload query
QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))
def get_connection(host=None, port=None, db=options.database):
""" Returns a new connection to the database. """
if host is None:
host = HOST
if port is None:
port = PORT
return database.connect(
host=host,
port=port,
user=options.user,
password=options.password,
database=db)
class InsertWorker(threading.Thread):
""" A simple thread which inserts empty rows in a loop. """
def __init__(self, stopping):
super(InsertWorker, self).__init__()
self.stopping = stopping
self.daemon = True
self.exception = None
def run(self):
with get_connection() as conn:
while not self.stopping.is_set():
conn.execute(QUERY_TEXT)
def test_connection():
try:
with get_connection(db="information_schema") as conn:
conn.ping()
except database.MySQLError:
print("Unable to connect to SingleStore DB with provided connection details.")
print("Please verify that SingleStore DB is running @ %s:%s" % (HOST, PORT))
sys.exit(1)
def setup_test_db():
""" Create a database and table for this benchmark to use. """
with get_connection(db="information_schema") as conn:
print('Creating database %s' % options.database)
try:
# note: the following query will fail if there is an existing database
conn.query('CREATE DATABASE %s' % options.database)
except database.MySQLError:
print("Database %s already exists - since we drop the database at" % options.database)
print("the end of this script, please specify an un-used database")
print("with the --database flag.")
sys.exit(1)
conn.query('USE %s' % options.database)
conn.query('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)
def warmup():
print('Warming up workload')
with get_connection() as conn:
conn.execute(QUERY_TEXT)
def run_benchmark():
""" Run a set of InsertWorkers and record their performance. """
stopping = threading.Event()
workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]
print('Launching %d workers' % options.num_workers)
print('Workload will take approximately %d seconds.' % options.time)
[ worker.start() for worker in workers ]
time.sleep(options.time)
print('Stopping workload')
stopping.set()
[ worker.join() for worker in workers ]
with get_connection() as conn:
count = conn.get("SELECT COUNT(*) AS count FROM %s" % TABLE).count
print("%d rows inserted using %d threads" % (count, options.num_workers))
print("%.1f rows per second" % (count / float(options.time)))
def cleanup():
""" Cleanup the database this benchmark is using. """
try:
with get_connection() as conn:
conn.query('DROP DATABASE IF EXISTS %s' % options.database)
except database.MySQLError:
pass
if __name__ == '__main__':
HOST = options.host or "127.0.0.1"
PORT = options.port or 3306
cleanup()
try:
test_connection()
setup_test_db()
warmup()
run_benchmark()
except KeyboardInterrupt:
print("Interrupted... exiting...")
Bash
Dependencies
mysql
client program- Bourne-Again Shell (
bash
)
Code
#!/bin/bash
MHOST="127.0.0.1"
MPORT="3306"
MUSER="root"
MDB=""
NUM_WORKERS=128
BATCH_SIZE=256
memsql_exec()
{
mysql -h $MHOST -P $MPORT -u $MUSER $MDB -e "$1"
}
memsql_exec_multi()
{
mysql -h $MHOST -P $MPORT -u $MUSER $MDB -e \
"$(for ((b = 0; b < $BATCH_SIZE; b++)); do
echo "$1;"
done)"
}
echo "Creating database test"
memsql_exec "CREATE DATABASE IF NOT EXISTS test"
MDB="test"
echo "Creating table tbl"
memsql_exec "CREATE TABLE IF NOT EXISTS tbl (id INT AUTO_INCREMENT PRIMARY KEY)"
echo "Launching $NUM_WORKERS workers"
sleep 1
declare -a WORKERS
for ((worker = 0; worker < $NUM_WORKERS; worker++)); do
(while [ 1 ]; do
echo "Worker $worker inserting"
memsql_exec_multi "INSERT INTO tbl VALUES (NULL)"
done) &
WORKERS[$worker]=$!
done
sleep 10
for ((worker = 0; worker < $NUM_WORKERS; worker++)); do
echo "Killing worker $worker"
kill ${WORKERS[$worker]}
wait ${WORKERS[$worker]} 2>/dev/null
done
echo "Cleaning up"
sleep 1
memsql_exec "DROP DATABASE test"
Java
Dependencies
- JDBC library (package
libmysql-java
on Debian-based distributions)
Code
import java.sql.*;
import java.util.Properties;
import java.util.concurrent.*;
public class Sample {
private static final String dbClassName = "com.mysql.jdbc.Driver";
private static final String CONNECTION = "jdbc:mysql://127.0.0.1:3306/";
private static final String USER = "root";
private static final String PASSWORD = "";
private static void executeSQL(Connection conn, String sql) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute(sql);
}
}
private static void ResetEnvironment() throws SQLException {
Properties p = new Properties();
p.put("user", USER);
p.put("password", PASSWORD);
try (Connection conn = DriverManager.getConnection(CONNECTION, p)) {
for (String query: new String[] {
"DROP DATABASE IF EXISTS test",
"CREATE DATABASE test",
"USE test",
"CREATE TABLE tbl (id INT AUTO_INCREMENT PRIMARY KEY)"
}) {
executeSQL(conn, query);
}
}
}
private static void worker() {
Properties properties = new Properties();
properties.put("user", USER);
properties.put("password", PASSWORD);
try (Connection conn = DriverManager.getConnection(CONNECTION, properties)) {
executeSQL(conn, "USE test");
while (!Thread.interrupted()) {
executeSQL(conn, "INSERT INTO tbl VALUES (NULL)");
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException {
Class.forName(dbClassName);
ResetEnvironment();
ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
worker();
}
});
}
Thread.sleep(20000);
executor.shutdownNow();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
}
C# / .NET Core
Dependencies
- MySql.Data NuGet Package: run
dotnet add package MySql.Data
Code
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MySql.Data.MySqlClient; // dotnet add package MySql.Data
namespace MemSQLTest
{
public class MemSQLTest
{
/**
* Tweak the following globals to fit your environment
* ###################################################
*/
public const string HOST = "127.0.0.1";
public const int PORT = 3306;
public const string USER = "root";
public const string PASSWORD = "";
// Specify which database and table to work with.
// Note: this database will be dropped at the end of this script
public const string DATABASE = "test";
public const string TABLE = "tbl";
// The number of workers to run
public const int NUM_WORKERS = 20;
// Run the workload for this many seconds
public const int WORKLOAD_TIME = 10; // seconds
// Batch size to use
public const int BATCH_SIZE = 5000;
/**
* Internal code starts here
* #########################
*/
private IDbCommand dbCommand;
private string insertCommand;
private void GetDbCommand()
{
IDbConnection conn = new MySqlConnection();
conn.ConnectionString = $"Server={HOST};Port={PORT};Uid={USER};Pwd={PASSWORD};";
conn.Open();
dbCommand = conn.CreateCommand();
string[] _batch = new string[BATCH_SIZE];
Array.Fill(_batch, "(DEFAULT)");
insertCommand = $"INSERT INTO {TABLE} VALUES {string.Join(",", _batch)}";
}
private void SetupTestDb()
{
dbCommand.CommandText = $"CREATE DATABASE IF NOT EXISTS {DATABASE}";
dbCommand.ExecuteNonQuery();
dbCommand.CommandText = $"USE {DATABASE}";
dbCommand.ExecuteNonQuery();
dbCommand.CommandText = $"CREATE TABLE {TABLE} (id int primary key auto_increment)";
dbCommand.ExecuteNonQuery();
}
private void Warmup()
{
Console.WriteLine("Warming up workload");
dbCommand.CommandText = insertCommand;
dbCommand.ExecuteNonQuery(); // FRAGILE: included in count, not included in time
}
private void DoBenchmark()
{
Console.WriteLine($"Launching {NUM_WORKERS} workers for {WORKLOAD_TIME} sec");
Thread[] workers = new Thread[NUM_WORKERS];
for(int i = 0; i < NUM_WORKERS; i++)
{
workers[i] = new Thread(new ThreadStart(Worker));
workers[i].Start();
}
Console.WriteLine($"{workers.Length} workers running...");
for(int i = 0; i < NUM_WORKERS; i++)
{
workers[i].Join();
}
}
/*
// yields authentication error: https://bugs.mysql.com/bug.php?id=75917
private async Task DoBenchmark()
{
List<Task> workers = new List<Task>();
for(int i = 0; i < NUM_WORKERS; i++)
{
workers.Add(Task.Run(Worker));
}
Console.WriteLine($"{workers.Count} workers running...");
await Task.WhenAll(workers);
}
*/
private void Worker()
{
// Create another connection per thread
using (IDbConnection conn = new MySqlConnection())
{
conn.ConnectionString = $"Server={HOST};Port={PORT};database={DATABASE};Uid={USER};Pwd={PASSWORD};SslMode=None;";
conn.Open();
using (IDbCommand dbCommand = conn.CreateCommand())
{
dbCommand.CommandText = insertCommand;
Stopwatch stop = new Stopwatch();
stop.Start();
while(stop.ElapsedMilliseconds < WORKLOAD_TIME*1000)
{
dbCommand.ExecuteNonQuery();
}
}
}
}
private void ShowStats()
{
dbCommand.CommandText = $"USE {DATABASE}";
dbCommand.ExecuteNonQuery();
dbCommand.CommandText = $"SELECT COUNT(*) FROM {TABLE}";
using (IDataReader reader = dbCommand.ExecuteReader())
{
long count = 0;
while(reader.Read())
{
count = (long)reader["COUNT(*)"];
}
Console.WriteLine($"{count} rows inserted using {NUM_WORKERS} workers");
Console.WriteLine($"{count / WORKLOAD_TIME} rows per second");
}
}
private void CleanupTestDb()
{
if (dbCommand != null)
{
Console.WriteLine("Cleaning up");
dbCommand.CommandText = $"USE `information_schema`";
dbCommand.ExecuteNonQuery();
dbCommand.CommandText = $"DROP DATABASE IF EXISTS {DATABASE}";
dbCommand.ExecuteNonQuery();
dbCommand = null;
}
}
public static int Main(string[] args)
{
MemSQLTest tester = new MemSQLTest();
try
{
tester.GetDbCommand();
tester.SetupTestDb();
tester.Warmup();
tester.DoBenchmark();
tester.ShowStats();
tester.CleanupTestDb();
return 0;
}
catch (Exception ex)
{
Console.WriteLine($"ERROR: {ex.Message}, {ex.GetType()}, {ex.StackTrace}");
try
{
tester.CleanupTestDb();
}
catch
{
// ignore error
}
return 1;
}
}
}
}
C
Dependencies
- C compiler (e.g. gcc)
pthreads
library (present on most Linux distributions)mysqlclient
library, available from thelibmysqlclient-dev
package on Debian-based distributions.
Code
/* Compile with:
*
* cc multi_threaded_inserts.c -lmysqlclient -pthread -o mti
*/
#include <stdlib.h>
#include <stdio.h>
#include <mysql/mysql.h>
const static char *host = "127.0.0.1";
const static char *user = "root";
const static char *passwd = "";
const static size_t port = 3306;
#define NUM_WORKERS 20
static volatile int keep_going = 1;
void *insert_worker(void *worker_id);
int main()
{
my_init();
MYSQL conn;
mysql_init(&conn);
printf("Connecting to SingleStore DB...\n");
if (mysql_real_connect(&conn, host, user, passwd, NULL, port, NULL, 0) != &conn)
{
printf("Could not connect to the SingleStore DB database!\n");
goto failure;
}
printf("Creating database 'test'...\n");
if (mysql_query(&conn, "create database test") || mysql_query(&conn, "use test"))
{
printf("Could not create 'test' database!\n");
goto failure;
}
printf("Creating table 'tbl' in database 'test'...\n");
if (mysql_query(&conn, "create table tbl (id bigint auto_increment primary key)"))
{
printf("Could not create 'tbl' table in the 'test' database!\n");
goto failure;
}
printf("Launching %lu insert workers...\n", NUM_WORKERS);
pthread_t workers[NUM_WORKERS];
size_t i;
for (i = 0; i < NUM_WORKERS; ++i)
{
pthread_create(&workers[i], NULL, &insert_worker, (void *)i);
}
printf("Running inserts for %lu seconds...\n", 10);
sleep(10);
keep_going = 0;
size_t rows_inserted = 0;
for (i = 0; i < NUM_WORKERS; ++i)
{
size_t rows_i;
pthread_join(workers[i], &rows_i);
rows_inserted += rows_i;
}
printf("Inserted %lu rows. Cleaning up...\n", rows_inserted);
if (mysql_query(&conn, "drop database test"))
{
printf("Could not drop the testing database 'test'!\n");
}
mysql_close(&conn);
return 0;
failure:
mysql_close(&conn);
return 1;
}
void *insert_worker(void *worker_id)
{
size_t id = (size_t) worker_id;
MYSQL conn;
mysql_init(&conn);
if (mysql_real_connect(&conn, host, user, passwd, "test", port, NULL, 0) != &conn)
{
printf("Worker %lu could not connect to the SingleStore DB database! Aborting...\n", id);
exit(1);
}
size_t i;
for (i = 0; keep_going; i += 8)
{
if (mysql_query(&conn, "insert into tbl values (null), (null), (null),"
"(null), (null), (null), (null), (null)"))
{
printf("Worker %lu failed to insert data, aborting...\n", id);
exit(1);
}
}
mysql_close(&conn);
return (void *)i;
}
Node.js
Dependencies
- mysql npm package: run
npm install mysql
.
Code
const mysql = require('mysql'); // npm install mysql
const util = require('util');
/**
* Tweak the following globals to fit your environment
* ###################################################
*/
const HOST = '127.0.0.1';
const PORT = 3306;
const USER = 'root';
const PASSWORD = '';
// Specify which database and table to work with.
// Note: this database will be dropped at the end of this script
const DATABASE = 'test';
const TABLE = 'tbl';
// The number of workers to run
const NUM_WORKERS = 20;
// Run the workload for this many seconds
const WORKLOAD_TIME = 10;
// Batch size to use
const BATCH_SIZE = 5000;
/**
* Internal code starts here
* #########################
*/
let isDone = false;
// await-able setTimeout()
function timeout(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Pre-generate the insert query
const _batch = new Array(BATCH_SIZE).fill().map(_ => '()').join(',');
const insertQuery = `INSERT INTO ${TABLE} VALUES ${_batch}`;;
function getConnection(dbName) {
return new Promise(function(resolve, reject) {
const conn = mysql.createConnection({
host: HOST,
port: PORT,
user: USER,
password: PASSWORD,
database: dbName
});
conn.connect(err => {
if (err) {
reject(err);
} else {
conn.query = util.promisify(conn.query);
resolve(conn);
}
});
});
};
async function setupTestDb() {
const conn = await getConnection('information_schema');
await conn.query(`CREATE DATABASE IF NOT EXISTS ${DATABASE}`);
await conn.query(`USE ${DATABASE}`);
await conn.query(`CREATE TABLE IF NOT EXISTS ${TABLE} (id INT AUTO_INCREMENT PRIMARY KEY)`);
}
async function insertWorker() {
const conn = await getConnection(DATABASE);
while (true) {
// await will process.nextTick()
await conn.query(insertQuery);
if (isDone) {
break;
}
}
}
async function warmup() {
console.log('Warming up workload');
const conn = await getConnection(DATABASE)
await conn.query(insertQuery); // FRAGILE: included in count, not included in time
}
async function doBenchmark() {
console.log(`Launching ${NUM_WORKERS} workers for ${WORKLOAD_TIME} sec`);
const workers = [];
for (let i = 0; i < NUM_WORKERS; ++i) {
workers.push(insertWorker());
}
console.log(`${workers.length} workers running...`);
await timeout(WORKLOAD_TIME * 1000);
console.log('Stopping workload');
isDone = true;
await Promise.all(workers);
}
async function printStats() {
const conn = await getConnection(DATABASE);
const rows = await conn.query(`SELECT COUNT(*) AS count FROM ${TABLE}`);
const count = rows[0].count;
console.log(`${count} rows inserted using ${NUM_WORKERS} workers`);
console.log(`${count / WORKLOAD_TIME} rows per second`);
}
async function cleanupTestDb() {
console.log('Cleaning up');
const conn = await getConnection('information_schema');
await conn.query(`DROP DATABASE ${DATABASE}`);
}
async function main() {
try {
await setupTestDb();
await warmup();
await doBenchmark();
await printStats();
await cleanupTestDb();
} catch (err) {
console.error('ERROR', err);
try {
await cleanupTestDb();
} catch (err2) {
console.error(err2);
}
process.exit(1);
}
process.exit(0); // releases all connections
}
main();