Outdated Version

You are viewing an older version of this section. View current production version.

Concurrent Multi-Insert Examples min read


Python

Dependencies

Code

#!/usr/bin/python

import os
import sys
import time
import threading
import argparse

from memsql.common import database

parser = argparse.ArgumentParser()

parser.add_argument("--host", default=None, help="The hostname of the MemSQL node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the MemSQL node to connect to")
parser.add_argument("--user", default="root", help="The user of the MemSQL node to connect to")
parser.add_argument("--password", default="", help="The password of the MemSQL node to connect to")

parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")

parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")

options = parser.parse_args()

HOST = None
PORT = None

TABLE = "tbl"
BATCH_SIZE = 5000

# Pre-generate the workload query
QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))

def get_connection(host=None, port=None, db=options.database):
    """ Returns a new connection to the database. """
    if host is None:
        host = HOST
    if port is None:
        port = PORT

    return database.connect(
        host=host,
        port=port,
        user=options.user,
        password=options.password,
        database=db)

class InsertWorker(threading.Thread):
    """ A simple thread which inserts empty rows in a loop. """

    def __init__(self, stopping):
        super(InsertWorker, self).__init__()
        self.stopping = stopping
        self.daemon = True
        self.exception = None

    def run(self):
        with get_connection() as conn:
            while not self.stopping.is_set():
                conn.execute(QUERY_TEXT)

def test_connection():
    try:
        with get_connection(db="information_schema") as conn:
            conn.ping()
    except database.MySQLError:
        print("Unable to connect to MemSQL with provided connection details.")
        print("Please verify that MemSQL is running @ %s:%s" % (HOST, PORT))
        sys.exit(1)

def setup_test_db():
    """ Create a database and table for this benchmark to use. """

    with get_connection(db="information_schema") as conn:
        print('Creating database %s' % options.database)

        try:
            # note: the following query will fail if there is an existing database
            conn.query('CREATE DATABASE %s' % options.database)
        except database.MySQLError:
            print("Database %s already exists - since we drop the database at" % options.database)
            print("the end of this script, please specify an un-used database")
            print("with the --database flag.")
            sys.exit(1)

        conn.query('USE %s' % options.database)

        conn.query('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)

def warmup():
    print('Warming up workload')
    with get_connection() as conn:
        conn.execute(QUERY_TEXT)

def run_benchmark():
    """ Run a set of InsertWorkers and record their performance. """

    stopping = threading.Event()
    workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]

    print('Launching %d workers' % options.num_workers)
    print('Workload will take approximately %d seconds.' % options.time)

    [ worker.start() for worker in workers ]
    time.sleep(options.time)

    print('Stopping workload')

    stopping.set()
    [ worker.join() for worker in workers ]

    with get_connection() as conn:
        count = conn.get("SELECT COUNT(*) AS count FROM %s" % TABLE).count

    print("%d rows inserted using %d threads" % (count, options.num_workers))
    print("%.1f rows per second" % (count / float(options.time)))

def cleanup():
    """ Cleanup the database this benchmark is using. """
    try:
        with get_connection() as conn:
            conn.query('DROP DATABASE IF EXISTS %s' % options.database)
    except database.MySQLError:
        pass

if __name__ == '__main__':
    HOST = options.host or "127.0.0.1"
    PORT = options.port or 3306

    cleanup()

    try:
        test_connection()
        setup_test_db()
        warmup()
        run_benchmark()
    except KeyboardInterrupt:
        print("Interrupted... exiting...")

Bash

Dependencies

  • mysql client program
  • Bourne-Again Shell (bash)

Code

#!/bin/bash

MHOST="127.0.0.1"
MPORT="3306"
MUSER="root"
MDB=""
NUM_WORKERS=128
BATCH_SIZE=256

memsql_exec()
{
        mysql -h $MHOST -P $MPORT -u $MUSER $MDB -e "$1"
}

memsql_exec_multi()
{
        mysql -h $MHOST -P $MPORT -u $MUSER $MDB -e \
                "$(for ((b = 0; b < $BATCH_SIZE; b++)); do
                        echo "$1;"
                done)"
}

echo "Creating database test"
memsql_exec "CREATE DATABASE IF NOT EXISTS test"
MDB="test"

echo "Creating table tbl"
memsql_exec "CREATE TABLE IF NOT EXISTS tbl (id INT AUTO_INCREMENT PRIMARY KEY)"

echo "Launching $NUM_WORKERS workers"
sleep 1
declare -a WORKERS
for ((worker = 0; worker < $NUM_WORKERS; worker++)); do
        (while [ 1 ]; do
                echo "Worker $worker inserting"
                memsql_exec_multi "INSERT INTO tbl VALUES (NULL)"
        done) &
        WORKERS[$worker]=$!
done

sleep 10

for ((worker = 0; worker < $NUM_WORKERS; worker++)); do
        echo "Killing worker $worker"
        kill ${WORKERS[$worker]}
        wait ${WORKERS[$worker]} 2>/dev/null
done
echo "Cleaning up"
sleep 1
memsql_exec "DROP DATABASE test"

Java

Dependencies

  • JDBC library (package libmysql-java on Debian-based distributions)

Code

import java.sql.*;
import java.util.Properties;
import java.util.concurrent.*;

public class Sample {
    private static final String dbClassName = "com.mysql.jdbc.Driver";
    private static final String CONNECTION = "jdbc:mysql://127.0.0.1:3306/";
    private static final String USER = "root";
    private static final String PASSWORD = "";
    private static void executeSQL(Connection conn, String sql) throws SQLException {
        try (Statement stmt = conn.createStatement()) {
            stmt.execute(sql);
        }
    }
    private static void ResetEnvironment() throws SQLException {
        Properties p = new Properties();
        p.put("user", USER);
        p.put("password", PASSWORD);
        try (Connection conn = DriverManager.getConnection(CONNECTION, p)) {
            for (String query: new String[] {
                    "DROP DATABASE IF EXISTS test",
                    "CREATE DATABASE test",
                    "USE test",
                    "CREATE TABLE tbl (id INT AUTO_INCREMENT PRIMARY KEY)"
            }) {
                executeSQL(conn, query);
            }
        }
    }
    private static void worker() {
        Properties properties = new Properties();
        properties.put("user", USER);
        properties.put("password", PASSWORD);
        try (Connection conn = DriverManager.getConnection(CONNECTION, properties)) {
            executeSQL(conn, "USE test");
            while (!Thread.interrupted()) {
                executeSQL(conn, "INSERT INTO tbl VALUES (NULL)");
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException {
        Class.forName(dbClassName);
        ResetEnvironment();
        ExecutorService executor = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 20; i++) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    worker();
                }
            });
        }
        Thread.sleep(20000);
        executor.shutdownNow();
        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
            System.err.println("Pool did not terminate");
        }
    }
}

C#

Dependencies

Code

using System;
using System.Data;
using System.Threading;
using System.Diagnostics;
using MySql.Data.MySqlClient;

namespace MemSQLTest {
  //Thread creates a connection and inserts for 1000 milliseconds
  public class WorkerThread {
    public static void Thread() {
      try {
        IDbConnection conn;
        conn = new MySql.Data.MySqlClient.MySqlConnection();
        conn.ConnectionString = "Server=127.0.0.1;Uid=root;Pwd=;";
        conn.Open();

        IDbCommand dbcmd = conn.CreateCommand();
        dbcmd.CommandText = "USE db";
        dbcmd.ExecuteNonQuery();
        dbcmd.CommandText = "INSERT INTO t VALUES (DEFAULT)";
        Stopwatch stop = new Stopwatch();
        stop.Start();
        while(stop.ElapsedMilliseconds < 1000)
        {
          dbcmd.ExecuteNonQuery();
        }
      } catch (Exception ex) {
          Console.WriteLine(ex.Message);
      }
    }
  }
  class MemSQLTest {
    static void Main(string[] args) {
      try {
        IDbConnection conn;
        conn = new MySql.Data.MySqlClient.MySqlConnection();
        conn.ConnectionString = "Server=127.0.0.1;Uid=root;Pwd=;";
        conn.Open();

        //Initialize database with auto increment table
        IDbCommand dbcmd = conn.CreateCommand();
        dbcmd.CommandText = "DROP DATABASE IF EXISTS db";
        dbcmd.ExecuteNonQuery();
        dbcmd.CommandText = "CREATE DATABASE db";
        dbcmd.ExecuteNonQuery();
        dbcmd.CommandText = "USE db";
        dbcmd.ExecuteNonQuery();
        dbcmd.CommandText = "CREATE TABLE t (id int primary key auto_increment)";
        dbcmd.ExecuteNonQuery();
        //Initialize threads
        Thread[] threads = new Thread[10];
        for(int i = 0; i < 10; i++)
        {
          threads[i] = new Thread(new ThreadStart(WorkerThread.Thread));
          threads[i].Start();
        }
        for(int i = 0; i < 10; i++)
        {
          threads[i].Join();
        }
        //Show select count(*) on auto increment table
        dbcmd.CommandText = "SELECT COUNT(*) FROM t";
        IDataReader reader = dbcmd.ExecuteReader();
        while(reader.Read()) {
          Console.WriteLine(reader["COUNT(*)"]);
        }
      } catch (Exception ex) {
        Console.WriteLine(ex.Message);
      }
    }
  }
}

C

Dependencies

  • C compiler (e.g. gcc)
  • pthreads library (present on most Linux distributions)
  • mysqlclient library, available from the libmysqlclient-dev package on Debian-based distributions.

Code

/* Compile with:
 *
 * cc multi_threaded_inserts.c -lmysqlclient -pthread -o mti
 */

#include <stdlib.h>
#include <stdio.h>

#include <mysql/mysql.h>

const static char *host = "127.0.0.1";
const static char *user = "root";
const static char *passwd = "";
const static size_t port = 3306;

#define NUM_WORKERS 20

static volatile int keep_going = 1;

void *insert_worker(void *worker_id);

int main()
{
    my_init();

    MYSQL conn;
    mysql_init(&conn);

    printf("Connecting to MemSQL...\n");
    if (mysql_real_connect(&conn, host, user, passwd, NULL, port, NULL, 0) != &conn)
    {
        printf("Could not connect to the MemSQL database!\n");
        goto failure;
    }

    printf("Creating database 'test'...\n");
    if (mysql_query(&conn, "create database test") || mysql_query(&conn, "use test"))
    {
        printf("Could not create 'test' database!\n");
        goto failure;
    }

    printf("Creating table 'tbl' in database 'test'...\n");
    if (mysql_query(&conn, "create table tbl (id bigint auto_increment primary key)"))
    {
        printf("Could not create 'tbl' table in the 'test' database!\n");
        goto failure;
    }

    printf("Launching %lu insert workers...\n", NUM_WORKERS);

    pthread_t workers[NUM_WORKERS];

    size_t i;
    for (i = 0; i < NUM_WORKERS; ++i)
    {
        pthread_create(&workers[i], NULL, &insert_worker, (void *)i);
    }

    printf("Running inserts for %lu seconds...\n", 10);
    sleep(10);
    keep_going = 0;

    size_t rows_inserted = 0;
    for (i = 0; i < NUM_WORKERS; ++i)
    {
        size_t rows_i;
        pthread_join(workers[i], &rows_i);
        rows_inserted += rows_i;
    }

    printf("Inserted %lu rows. Cleaning up...\n", rows_inserted);

    if (mysql_query(&conn, "drop database test"))
    {
        printf("Could not drop the testing database 'test'!\n");
    }
    mysql_close(&conn);

    return 0;

failure:
    mysql_close(&conn);
    return 1;
}

void *insert_worker(void *worker_id)
{
    size_t id = (size_t) worker_id;

    MYSQL conn;
    mysql_init(&conn);
    if (mysql_real_connect(&conn, host, user, passwd, "test", port, NULL, 0) != &conn)
    {
        printf("Worker %lu could not connect to the MemSQL database! Aborting...\n", id);
        exit(1);
    }

    size_t i;
    for (i = 0; keep_going; i += 8)
    {
        if (mysql_query(&conn, "insert into tbl values (null), (null), (null),"
                        "(null), (null), (null), (null), (null)"))
        {
            printf("Worker %lu failed to insert data, aborting...\n", id);
            exit(1);
        }
    }

    mysql_close(&conn);

    return (void *)i;
}

NodeJS

Dependencies

  • NPM packages mysql and promise (run npm install mysql and npm install promise).

Code

var mysql = require("mysql");
var Promise = require("promise");  // jshint ignore:line
var util = require("util");
var timers = require("timers");

/**
 * Tweak the following globals to fit your environment
 * ###################################################
 */

var HOST = "127.0.0.1";
var PORT = 3306;
var USER = "root";
var PASSWORD = "";

// Specify which database and table to work with.
// Note: this database will be dropped at the end of this script
var DATABASE = "test";
var TABLE = "tbl";

// The number of workers to run
var NUM_WORKERS = 20;

// Run the workload for this many seconds
var WORKLOAD_TIME = 10;

// Batch size to use
var BATCH_SIZE = 5000;

/**
 * Internal code starts here
 * #########################
 */

// Pre-generate the insert query
var _batch = [];
for (var i = 0; i < BATCH_SIZE; ++i) { _batch.push("()"); }
_batch = _batch.join(",");

var QUERY_TEXT = util.format("INSERT INTO %s VALUES %s", TABLE, _batch);

var EXIT_FLAG = false;

var get_connection = function(db) {
    var conn = mysql.createConnection({
        host: HOST,
        port: PORT,
        user: USER,
        password: PASSWORD,
        database: (db || DATABASE)
    });

    return new Promise(function(resolve, reject) {
        conn.connect(function(err) {
            if (err) { reject(err); }
            else { resolve(conn); }
        });
    });
};

var query = function(conn, query_text) {
    return new Promise(function(resolve, reject) {
        conn.query(query_text, function(err, rows, fields) {
            if (err) { reject(err); }
            else { resolve(rows); }
        });
    });
};

var chained_query = function() {
    var args = arguments;
    return function(conn) {
        return query(conn, util.format.apply(util.format, args))
            .then(function(rows) { return conn; });
    };
};

var setup_test_db = function() {
    console.log("Setting up database");
    return get_connection("information_schema")
        .then(chained_query("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
        .then(chained_query("USE %s", DATABASE))
        .then(chained_query("CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY)", TABLE));
};

var insert_worker = function() {
    return get_connection()
        .then(function(conn) {
            return new Promise(function(resolve, reject) {
                (function _loop() {
                    conn.query(QUERY_TEXT, function(err) {
                        if (err) { reject(err); }
                        else if (EXIT_FLAG) { resolve(); }
                        else { _loop(); }
                    });
                }());
            });
        });
};

var warmup = function() {
    console.log("Warming up workload");
    return get_connection()
        .then(chained_query(QUERY_TEXT));
};

var do_benchmark = function() {
    return new Promise(function(resolve, reject) {
        console.log("Launching %d workers", NUM_WORKERS);

        var workers = [];
        for (var i = 0; i < NUM_WORKERS; ++i) {
            workers.push(insert_worker());
        }

        timers.setTimeout(function() {
            console.log("Stopping workload");
            EXIT_FLAG = true;

            resolve(Promise.all(workers));
        }, WORKLOAD_TIME * 1000);
    });
};

var print_stats = function() {
    return get_connection()
        .then(function(conn) {
            return query(conn, util.format("SELECT COUNT(*) AS count FROM %s", TABLE));
        })
        .then(function(rows) {
            var count = rows[0].count;
            console.log("%d rows inserted using %d workers", count, NUM_WORKERS);
            console.log("%d rows per second", count / WORKLOAD_TIME);
        });
};

var cleanup_test_db = function() {
    console.log("Cleaning up");
    return get_connection("information_schema")
        .then(chained_query("DROP DATABASE %s", DATABASE));
};

var main = function() {
    setup_test_db()
        .then(warmup)
        .then(do_benchmark)
        .then(print_stats)
        .then(cleanup_test_db, function(err) {
            console.error("Error: ", err);
            return cleanup_test_db();
        })
        .then(
            function() { process.exit(0); },
            function() { process.exit(1); }
        );
};

main();