Outdated Version

You are viewing an older version of this section. View current production version.

Distributed DML Query Execution

In this topic we will look at common DML query patterns and how they are executed through the distributed system. You can use the EXPLAIN command to examine a query’s aggregator-level and leaf-level query plans.

Let’s assume the following schema:

        a1 int,
        a2 int,
        a3 int,
        SHARD KEY (a1, a2),
        KEY (a3)

        b1 int,
        b2 int,
        b3 int,
        SHARD KEY (b1, b2)

        r1 int,
        r2 int,
        PRIMARY KEY (r1),
        KEY (r2)

Index Matching

Matching the Shard Key

If you specify an equality on every column in the shard key, then the aggregator will direct the query to exactly one partition. Most queries do not fall into this pattern; instead, the aggregator must send queries to every partition in the cluster for intermediate results and then stitch them together.

These queries are sent to one partition:

    SELECT * FROM a WHERE a1 = 4 AND a2 = 10;
    SELECT a3, count(*) FROM a WHERE a1 = 4 AND a2 = 10 GROUP BY a3;

These queries are sent to all partitions:

    SELECT * FROM a WHERE a1 = 4;
    SELECT * FROM a WHERE a1 = 4 OR a2 = 10;
    SELECT * FROM a WHERE a1 IN (4, 5) AND a2 IN (10);

Secondary Index Matching

If your query uses a secondary (non-shard) index, then the aggregator must send the query to every partition in the cluster. Locally, each partition’s table will use its part of the secondary index to speed up the query. While the overall performance of the query is dictated by the seek and scan time of these indexes, the fact that the query must be sent everywhere in the cluster can increase the variance (and therefore overall latency) of the query.

This query matches the secondary index on the column a3:

    SELECT * FROM a WHERE a3 = 5;

No Index Matching

Queries that do not match any index perform a full table scan on all partitions. From the perspective of the aggregator, these queries are the same as queries that match a secondary index.

Aggregator Merging

Most queries that don’t involve aggregates, group-bys, or order-bys don’t require any further processing on the aggregator. These queries are forwarded verbatim to one or many partitions, and the partition’s results are streamed back to the client. More complex queries do require additional processing on the aggregator to merge the results from the leaves.


ORDER BY queries that don’t involve aggregates or group-bys can sort rows on the leaves and then merge the sorted intermediate results on the aggregator. For example, a query like SELECT * FROM a WHERE a3 = 5 ORDER BY a1 will follow this pattern. These queries leverage distributed (leaf) processing to do the majority of filtering and sorting, which makes them scalable with the amount of data in the system.


Queries with aggregates compute aggregate values on the leaves and then use aggregate merging on the aggregator to compute a final result. Each non-associative aggregate is converted into an expression that is associative. For example, AVG(expr) is converted to SUM(expr)/COUNT(expr) automatically by the aggregator.

Distinct Aggregates

Distinct aggregates like COUNT(DISTINCT ...) are not as efficient as simple aggregates like COUNT(*). Distinct values must be resolved across partition boundaries (you could have a3=10 on two different partitions in SELECT COUNT(DISTINCT a3) FROM a), so each partition must send every distinct value it has back to the aggregator. Queries with distinct aggregates ship one row per distinct value per partition back to the aggregator and can therefore be expensive if there are a lot of distinct values.

There is an exception to this rule: if you run a DISTINCT aggregate over the shard key, distinct values can be resolved on the leaves and the aggregator can merge aggregate values as it would with simple aggregates. An example of such a query would be SELECT COUNT(DISTINCT a1, a2) FROM a.


It is also possible to calculate a fast approximation of distinct values; see APPROX_COUNT_DISTINCT .


GROUP BY queries are spread very efficiently across the leaves. The aggregator sends the GROUP BY construct to the leaves so that the leaves process data down to the size of the final, grouped result set. The aggregator then merges together these grouped results (combining aggregates along the way) and sends the final result back to the client. The cost of a distributed GROUP BY query is usually proportional to the number of rows in the final result set, since the traffic through the system is roughly the number of partitions multiplied by the size of the grouped result set.


HAVING clauses are processed entirely on the aggregator since they perform filtering after the GROUP BY operation is complete.

Distributed Joins

Reference Joins

As a general rule, SingleStore DB will efficiently execute any join query with a single sharded table and as many reference tables as you’d like. Since reference tables are fully replicated on every machine in the cluster, leaves can join against their local copies of reference tables.

These queries leverage reference joins:

    SELECT * FROM a INNER JOIN r ON a.a1 = r.r1;
    SELECT * FROM r LEFT JOIN a ON a.a1 = r.r1;
    SELECT * FROM a, r r1, r r2, r r3;
        (SELECT DISTINCT r1 FROM r) x
        ON a.a1 = x.c;

Aligning Shard Keys for Performance

Aligning the shard keys of large tables enables more efficient joining. It is possible to perform arbitrary distributed joins across any tables and along any column. However, if you join two tables with identical shard key signatures along that shard key, the joins will be performed local to the partitions, reducing network overhead.

    CREATE TABLE users (
        user_name VARCHAR(1000),
        account_id BIGINT,
        PRIMARY KEY (id)

    CREATE TABLE clicks (
        click_id BIGINT AUTO_INCREMENT,
        account_id BIGINT,
        user_id BIGINT,
        page_id INT,
        ts TIMESTAMP,
        SHARD KEY (user_id),
        PRIMARY KEY (click_id, user_id)

In this example, id is the shard key of the users table, and the shard key on the clicks table has the same signature (a single BIGINT). These queries join locally without network overhead:

    SELECT * FROM users INNER JOIN clicks ON users.id = clicks.user_id WHERE clicks.page_id = 10;

    SELECT avg(c1.t - c2.t) FROM clicks c1 INNER JOIN clicks c2 ON c1.user_id = c2.user_id WHERE c1.page_id > c2.page_id;

Whereas this query will stream rows between leaves:

    SELECT u.account_id, count(distinct user_id), count(1)
    FROM users u INNER JOIN clicks c ON u.account_id = c.account_id
    GROUP BY u.account_id;

If you identify your data layout and join patterns in advance, this technique can be an extremely effective way to run performant joins between distributed tables. For more information about how queries execute, see EXPLAIN.

Write Queries


The WHERE clause in an UPDATE or DELETE query is optimized the same way as the WHERE clause in a SELECT query. If the predicate matches the shard key exactly then the query is routed to a single partition.

INSERT Queries

SingleStore DB executes INSERT queries by analyzing the insert values relevant to the shard key and routing the query to the corresponding partition. For example, INSERT INTO a (a1, a2, a3) VALUES (1, 2, 3) would compute the hash value of (1, 2) and map this value to the appropriate partition.

If you are bulk inserting data with INSERT queries, then you should take advantage of the multi-insert syntax: INSERT INTO a (a1, a2, a3) VALUES (1, 2, 3), (2, 3, 4), .... The aggregator will chop up the multi-insert into single-partition insert queries and run them in parallel across the cluster. This technique enables your application to combat the inherent latency of running in a distributed system.

When a multi-insert statement is executed, SingleStore DB uses two steps to commit the transaction:

  • Step one, where each leaf node validates their portion of the statement and acknowledges that they are ready to commit, and
  • Step two, where each leaf node actually commits the transaction.

Both steps are necessary to ensure that each leaf node partition successfully receives and executes their portion of the insert statement. If an error occurs, the entire transaction is rolled back across the cluster.

For example, consider a cluster with two leaf nodes and four partitions total. When a multi-insert statement is executed against the cluster’s aggregator, each leaf node validates its portion of the statement and prepares to commit the transaction on its two partitions. When all leaf nodes have acknowledged their readiness, the aggregator notifies them to commit the transaction. Finally, the transaction is committed, and the data is successfully inserted.