Outdated Version

You are viewing an older version of this section. View current production version.

Query Tuning Guide min read


Look for Hot Queries

You can look for frequently run queries and long-running queries on the Query Explorer Page or with SHOW PLANCACHE.

For example, you can use a query like the following to see the select queries with highest average execution time. Of course you can modify the query to filter by database, look for specific types of queries, find queries using the most total time or the highest memory use, etc.

select Database_Name, Query_Text, Commits, Execution_Time, Execution_Time/Commits as avg_time_ms, Average_Memory_Use
from information_schema.plancache
where Query_Text like 'select%'
order by avg_time_ms desc;

Check if Queries are Using Indexes

One important query performance consideration is adding appropriate indexes for your queries. You can use EXPLAIN to see whether queries are using indexes. We will show examples of a few simple cases to look for where indexes can greatly improve query performance.

Indexes for filters

Consider an example table:

create table t (a int, b int);

Suppose we are running queries like

select * from t where a=3;

EXPLAIN shows us that running the query against the current table schema requires a full Table Scan - scanning all the rows of t, which is unnecessarily expensive if a small fraction of the values in t equal 3.

memsql> explain select * from t where a=3;
+-----------------------+
| EXPLAIN               |
+-----------------------+
| Project [t.a, t.b]    |
| Gather partitions:all |
| Project [t.a, t.b]    |
| Filter [t.a = 3]      |
| TableScan db.t        |
+-----------------------+

If we add an index, we can see that the query instead uses an Index Range Scan on the key a:

memsql> alter table t add index (a);
memsql> explain select * from t where a=3;
+---------------------------------------------+
| EXPLAIN                                     |
+---------------------------------------------+
| Project [t.a, t.b]                          |
| Gather partitions:all                       |
| Project [t.a, t.b]                          |
| IndexRangeScan db.t, KEY a (a) scan:[a = 3] |
+---------------------------------------------+

A query that filters on both a and b is unable to take advantage of the filtering on b to reduce the rows we need to scan, though. As you can see, the scan uses a=3 only.

memsql> explain select * from t where a=3 and b=4;
+---------------------------------------------+
| EXPLAIN                                     |
+---------------------------------------------+
| Project [t.a, t.b]                          |
| Gather partitions:all                       |
| Project [t.a, t.b]                          |
| Filter [t.b = 4]                            |
| IndexRangeScan db.t, KEY a (a) scan:[a = 3] |
+---------------------------------------------+

Adding an index on (a, b) instead allows the query to scan more selectively:

memsql> alter table t add index (a, b);
memsql> explain select * from t where a=3 and b=4;
+------------------------------------------------------------+
| EXPLAIN                                                    |
+------------------------------------------------------------+
| Project [t.a, t.b]                                         |
| Gather partitions:all                                      |
| Project [t.a, t.b]                                         |
| IndexRangeScan db.t, KEY a_2 (a, b) scan:[a = 3 AND b = 4] |
+------------------------------------------------------------+

However, a query that filters on b only still does not match an index, since the query filters must match a prefix of the index column list to be able to effectively take advantage of the index:

memsql> explain select * from t where b=4;
+-----------------------+
| EXPLAIN               |
+-----------------------+
| Project [t.a, t.b]    |
| Gather partitions:all |
| Project [t.a, t.b]    |
| Filter [t.b = 4]      |
| TableScan db.t        |
+-----------------------+

Indexes for group-by and order-by

Another class of cases where indexes can improve query performance is group-by and order-by.

Consider this example table:

create table t (a int, b int);

Consider the following query:

memsql> explain select a, sum(b) from t group by a;
+------------------------------------------------------+
| EXPLAIN                                              |
+------------------------------------------------------+
| Project [t.a, `sum(b)`]                              |
| HashGroupBy [SUM(`sum(b)`) AS `sum(b)`] groups:[t.a] |
| Gather partitions:all                                |
| Project [t.a, `sum(b)`]                              |
| HashGroupBy [SUM(t.b) AS `sum(b)`] groups:[t.a]      |
| TableScan db.t                                       |
+------------------------------------------------------+

Executing the above query requires a hash group-by: MemSQL builds a hash table with an entry for each group of a.

However, with an index on a, MemSQL is able to execute the query with a streaming group-by, because by scanning the index on a we can process all elements of a group consecutively.

memsql> alter table t add index (a);
memsql> explain select a, sum(b) from t group by a;
+-----------------------------------------------------------+
| EXPLAIN                                                   |
+-----------------------------------------------------------+
| Project [t.a, `sum(b)`]                                   |
| StreamingGroupBy [SUM(`sum(b)`) AS `sum(b)`] groups:[t.a] |
| GatherMerge [t.a] partitions:all                          |
| Project [t.a, `sum(b)`]                                   |
| StreamingGroupBy [SUM(t.b) AS `sum(b)`] groups:[t.a]      |
| TableScan db.t, KEY a (a)                                 |
+-----------------------------------------------------------+

Similarly, for order-by, without an index MemSQL needs to sort:

memsql> explain select * from t order by b;
+----------------------------------+
| EXPLAIN                          |
+----------------------------------+
| Project [t.a, t.b]               |
| GatherMerge [t.b] partitions:all |
| Project [t.a, t.b]               |
| Sort [t.b]                       |
| TableScan db.t                   |
+----------------------------------+

With an index, MemSQL can avoid the need for a sort:

memsql> alter table t add index (b);
memsql> explain select * from t order by b;
+----------------------------------+
| EXPLAIN                          |
+----------------------------------+
| Project [t.a, t.b]               |
| GatherMerge [t.b] partitions:all |
| Project [t.a, t.b]               |
| TableScan db.t, KEY b (b)        |
+----------------------------------+

Fanout vs Single-Partition Queries

MemSQL’s distributed architecture is great for allowing you to take advantage of CPUs from many servers in your queries. This allows for very fast performance on aggregation queries which scan millions of rows. However, for transactional queries which select relatively few rows, this is not ideal. Instead, it is best for each query to only involve a single partition. When a query has equality filters which completely match the shard key of the table, MemSQL can optimize it to only require execution on a single partition.

We will use this example table:

create table urls (
  domain varchar(128),
  path varchar(8192),
  time_sec int,
  status_code binary(3),
  ...
  shard key (domain, path, time_sec)
);

The following query is single partition, because it has equality filters on all columns of the shard key, so the rows which match can only be on a single partition. In the explain, Gather partitions:single indicates MemSQL is using a single-partition plan.

memsql> explain SELECT status_code 
FROM   urls 
WHERE  domain = 'youtube.com' 
AND    path = '/watch?v=euh_uqxwk58'
AND    time_sec = 1

+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| Gather partitions:single                                                                                                                              |
| Project [urls.status_code]                                                                                                                            |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58" AND time_sec = 1] |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.66 sec)

But the following query which does not filter on time_sec does not match a single partition. The query therefore requires selecting from all partitions, indicated in the explain by Gather partitions:all.

memsql> explain SELECT status_code 
FROM   urls 
WHERE  domain = 'youtube.com'
AND    path = '/watch?v=euh_uqxwk58';

+--------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------+
| Project [urls.status_code]                                                                                                           |
| Gather partitions:all                                                                                                                |
| Project [urls.status_code]                                                                                                           |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58"] |
+--------------------------------------------------------------------------------------------------------------------------------------

To fix this, we could instead shard the table urls on domain. This would make it easier to write queries which route to a single partition. However, some domains will have much more pages than other domains and this could lead to skew. Choosing a shard key is often a balancing act: we want the least restrictive shard key possible while also ensuring that we do not have skew. A good compromise in this case would be to shard on (domain, path).

Distributed Joins

MemSQL’s query execution architecture allows you to run arbitrary SQL queries on any table regardless of data distribution. However, you can often improve performance by optimizing your schema to minimize data movement during query execution.

Collocating Joins

Consider the following tables:

CREATE TABLE lineitem(
    l_orderkey int not null,
    l_linenumber int not null,
    ...
    primary key(l_orderkey, l_linenumber)
);

CREATE TABLE orders(
    o_orderkey int not null,
    ...
    primary key(o_orderkey)
);

When we join lineitem and orders with the current schema, we will perform a distributed join and repartition data from the lineitem table.

memsql> explain SELECT Count(*) 
FROM lineitem 
JOIN orders 
ON o_orderkey = l_orderkey ;

+---------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------+
| Project [`count(*)`]                                                                                                            |
| Aggregate [SUM(`count(*)`) AS `count(*)`]                                                                                       |
| Gather partitions:all est_rows:1                                                                                                |
| Project [`count(*)`] est_rows:1 est_select_cost:1762812                                                                         |
| Aggregate [COUNT(*) AS `count(*)`]                                                                                              |
| NestedLoopJoin                                                                                                                  |
| |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = r0.l_orderkey] est_table_rows:565020 est_filtered:565020 |
| TableScan r0 storage:list stream:no                                                                                             |
| Repartition [lineitem.l_orderkey] AS r0 shard_key:[l_orderkey] est_rows:587604                                                  |
| TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) est_table_rows:587604 est_filtered:587604                       |
+---------------------------------------------------------------------------------------------------------------------------------+

We can improve this performance of this query by adding an explicit shard key to the lineitem table on l_orderkey. Now we can perform a local join between lineitem and orders.

+--------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                            |
+--------------------------------------------------------------------------------------------------------------------+
| Project [`count(*)`]                                                                                               |
| Aggregate [SUM(`count(*)`) AS `count(*)`]                                                                          |
| Gather partitions:all                                                                                              |
| Project [`count(*)`]                                                                                               |
| Aggregate [COUNT(*) AS `count(*)`]                                                                                 |
| ChoosePlan                                                                                                         |
| |   :estimate                                                                                                      |
| |       SELECT COUNT(*) AS cost FROM test.lineitem                                                                 |
| |       SELECT COUNT(*) AS cost FROM test.orders                                                                   |
| |---NestedLoopJoin                                                                                                 |
| |   |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = lineitem.l_orderkey]                    |
| |   TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber)                                                |
| +---NestedLoopJoin                                                                                                 |
|     |---IndexRangeScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) scan:[l_orderkey = orders.o_orderkey] |
|     TableScan test.orders, PRIMARY KEY (o_orderkey)                                                                |
+--------------------------------------------------------------------------------------------------------------------+

Reference Table Joins

Consider the following schema:

CREATE TABLE customer(
    c_custkey int not null,
    c_nationkey int not null,
    ...
    primary key(c_custkey),
    key(c_nationkey)
);

CREATE TABLE nation(
    n_nationkey int not null,
    ...
    primary key(n_nationkey)	
);

With the current schema, when we join the customer and nation tables together on nationkey will have to broadcast the nation table every time the query is run.

memsql> explain SELECT Count(*)
FROM customer
JOIN nation
ON n_nationkey = c_nationkey;

+-------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                         |
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`]                                                                                                                            |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`]                                                                                                       |
| Gather partitions:all est_rows:1                                                                                                                |
| Project [`Count(*)`] est_rows:1 est_select_cost:1860408                                                                                         |
| Aggregate [COUNT(*) AS `Count(*)`]                                                                                                              |
| NestedLoopJoin                                                                                                                                  |
| |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = r1.n_nationkey] est_table_rows:1856808 est_filtered:1856808 |
| TableScan r1 storage:list stream:no                                                                                                             |
| Broadcast [nation.n_nationkey] AS r1 est_rows:300                                                                                               |
| TableScan test.nation, PRIMARY KEY (n_nationkey) est_table_rows:300 est_filtered:300                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------+

We can avoid this broadcast by making nation reference table. nation makes a good reference table because it is relatively small and changes rarely. While broadcasting such a small table will likely have negligible effect on single query latency, repeatedly doing so can have an outsize effect on concurrent workloads.

CREATE REFERENCE TABLE nation(
    n_nationkey int not null,
    ...
    primary key(n_nationkey)	
);

memsql> explain SELECT Count(*)
FROM customer
JOIN nation
ON n_nationkey = c_nationkey;
+-------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                     |
+-------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`]                                                                                        |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`]                                                                   |
| Gather partitions:all                                                                                       |
| Project [`Count(*)`]                                                                                        |
| Aggregate [COUNT(*) AS `Count(*)`]                                                                          |
| ChoosePlan                                                                                                  |
| |   :estimate                                                                                               |
| |       SELECT COUNT(*) AS cost FROM test.customer                                                          |
| |       SELECT COUNT(*) AS cost FROM test.nation                                                            |
| |---NestedLoopJoin                                                                                          |
| |   |---IndexSeek test.nation, PRIMARY KEY (n_nationkey) scan:[n_nationkey = customer.c_nationkey]          |
| |   TableScan test.customer, PRIMARY KEY (c_custkey)                                                        |
| +---NestedLoopJoin                                                                                          |
|     |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = nation.n_nationkey] |
|     TableScan test.nation, PRIMARY KEY (n_nationkey)                                                        |
+-------------------------------------------------------------------------------------------------------------+

Joins on the Aggregator

Consider the following schema and row counts:

CREATE TABLE customer(
    c_custkey int not null,
    c_acctbal decimal(15,2) not null,
    primary key(c_custkey)
);
CREATE TABLE orders(
    o_orderkey int not null,
    o_custkey int not null,
    o_orderstatus varchar(20) not null,
    primary key(o_orderkey),
    key(o_custkey)
);

memsql> select count(*) from orders;
+----------+
| count(*) |
+----------+
|   429786 |
+----------+

memsql> select count(*) from orders where o_orderstatus = 'open';
+----------+
| count(*) |
+----------+
|     1000 |
+----------+

memsql> select count(*) from customer;
+----------+
| count(*) |
+----------+
|  1014726 |
+----------+

memsql> select count(*) from customer where c_acctbal > 100.0;
+----------+
| count(*) |
+----------+
|      988 |
+----------+
1 row in set (0.06 sec)

Note that while customer and orders are fairly large, when we filter on open orders and account balances greater than 100 we match relatively few rows. As a result, when we join orders and customer with these filters, we can perform the join on the aggregator. This is shown in the explain by having a separate Gather operator for orders and customer and a HashJoin operator above the Gather in the explain.

memsq> explain SELECT o_orderkey 
FROM   customer 
JOIN   orders 
where  c_acctbal > 100.0 
AND    o_orderstatus = 'open' 
AND    o_custkey = c_custkey;

+------------------------------------------------------------------+
| EXPLAIN                                                          |
+------------------------------------------------------------------+
| Project [orders.o_orderkey]                                      |
| HashJoin [orders.o_custkey = customer.c_custkey]                 |
| |---TempTable                                                    |
| |   Gather partitions:all                                        |
| |   Project [orders_0.o_orderkey, orders_0.o_custkey]            |
| |   Filter [orders_0.o_orderstatus = "open"]                     |
| |   TableScan test3.orders AS orders_0, PRIMARY KEY (o_orderkey) |
| TableScan 0tmp AS customer storage:list stream:yes               |
| TempTable                                                        |
| Gather partitions:all                                            |
| Project [customer_0.c_custkey]                                   |
| Filter [customer_0.c_acctbal > 100.0]                            |
| TableScan test3.customer AS customer_0, PRIMARY KEY (c_custkey)  |
+------------------------------------------------------------------+

By default, MemSQL will perform this optimization when each Gather pulls less than 120,000 rows to aggregator. This threshold can be changed via the max_subselect_aggregator_rowcount variable. We can also manually disable this optimization on this query via the leaf_pushdown hint. The leaf_pushdown hint forces the optimizer to perform as much work as possible on the leaves.

memsql> explain SELECT WITH(LEAF_PUSHDOWN=TRUE) o_orderkey 
FROM   customer 
JOIN   orders 
where  c_acctbal > 100.0 
AND    o_orderstatus = 'open' 
AND    o_custkey = c_custkey;

+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey]                                                                                                        |
| Gather partitions:all est_rows:11                                                                                              |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955                                                                       |
| Filter [customer.c_acctbal > 100.0]                                                                                            |
| NestedLoopJoin                                                                                                                 |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no                                                                                            |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972                                     |
| Filter [orders.o_orderstatus = "open"]                                                                                         |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972                                         |
+--------------------------------------------------------------------------------------------------------------------------------+

memsql> set max_subselect_aggregator_rowcount=500;

memsql> explain SELECT o_orderkey 
FROM   customer 
JOIN   orders 
where  c_acctbal > 100.0 
AND    o_orderstatus = 'open' 
AND    o_custkey = c_custkey;

+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey]                                                                                                        |
| Gather partitions:all est_rows:11                                                                                              |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955                                                                       |
| Filter [customer.c_acctbal > 100.0]                                                                                            |
| NestedLoopJoin                                                                                                                 |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no                                                                                            |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972                                     |
| Filter [orders.o_orderstatus = "open"]                                                                                         |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972                                         |
+--------------------------------------------------------------------------------------------------------------------------------+

Managing Concurrency for Distributed Joins

Queries that involve Broadcasts and Repartitions require leaf-to-leaf data movement, which is expensive and should be avoided when possible, e.g. by collocating joins as much as possible. Also consider using reference tables and joins on the aggregator as described above to reduce the need for Broadcasts and Repartitions.

An important consideration is that concurrency for queries which involve Broadcasts and Repartitions is much more limited than for queries which don’t, such as collocated joins, because each uses relatively large numbers of connections and threads. Running too many concurrent queries involving Broadcasts and Repartitions can exhaust available connections and strain the scheduler.

It is generally important to ensure you have sufficient pooled connections for your workload. You can run fill connection pools on each node to pre-fill the connection pools. The number of pooled connections between each pair of nodes is controlled by the max_pooled_connections configuration variable.

Generally, once your workload is running enough concurrent queries to fully utilize cluster resources such as CPU and IO, there is no advantage to running more concurrent queries - it will not increase throughput, but does increase latency, and may adversely affect performance when using too many connections or threads simultaneously. Consider managing concurrency in your client applications, or using the max_connection_threads setting on an aggregator to limit the number of concurrent queries MemSQL will run.

Also keep in mind that some communication costs, such as connections between all pairs of leaves, become more expensive with larger cluster sizes and numbers of partitions. If your workload is heavy in distributed queries which require Broadcasts and Repartitions, consider using fewer partitions per leaf, which trades off some disadvantages, such as reduced intra-query parallelism for some types of queries, for some advantages, such as reduced distributed query overheads.

Run ANALYZE

The ANALYZE command collects data statistics on a table to facilitate accurate query optimization. This is especially important for optimizing complex analytical queries. It is generally a best practice to ANALYZE all of your tables. See the ANALYZE page for more information.