Outdated Version

You are viewing an older version of this section. View current production version.

Configure Historical Monitoring min read


This section describes how to create:

  • A monitoring database to hold data about the monitored cluster
  • Two MemSQL pipelines to transfer data to the collecting cluster
  • A Grafana monitoring data source that connects Grafana to the collecting cluster
  • A suite of Grafana dashboards to view data about the monitored cluster

Monitoring Database

Create the monitoring database and associated tables by copying, pasting, and executing each set of SQL statements below on the collecting Master Aggregator.

This can be done via the SQL Editor in SingleStore DB Studio, or your favored SQL client.

Note: These SQL statements are also available in a downloadable historical-monitoring-sql.zip file from MemSQL.

create database if not exists monitoring;

use monitoring;

CREATE TABLE IF NOT EXISTS `metrics` (
  `name`                                            TINYBLOB NOT NULL,
  `tags`                                            JSON COLLATE utf8_bin,
  `cluster` AS `tags`::$cluster                     PERSISTED TINYBLOB,
  `host` AS `tags`::$host                           PERSISTED TINYBLOB,
  `port` as `tags`::$port                           PERSISTED SMALLINT(6),
  `role` as `tags`::$role                           PERSISTED TINYBLOB,
  `extractor` as substring_index(`name`, '_', 1)    PERSISTED TINYBLOB,
  `subsystem` as substring_index(substring_index(`name`, '_', 2), '_', -1)
                                                    PERSISTED TINYBLOB,
  `job` AS `tags`::$push_job                        PERSISTED TINYBLOB,
  `value`                                           DOUBLE NOT NULL,
  `intval` AS floor(`value`)                        PERSISTED BIGINT(20),
  `time_sec`                                        BIGINT(20) NOT NULL,
  KEY `name` (`cluster`,`extractor`,`subsystem`,`host`,`role`,`name`,`time_sec`)
  /*!90619 USING CLUSTERED COLUMNSTORE */
  /*!90618 , SHARD KEY () */
) /*!90621 AUTOSTATS_ENABLED=TRUE */;

CREATE TABLE IF NOT EXISTS act_samples (
  `cluster` varchar(512) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `tags` JSON NOT NULL,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `ACTIVITY_TYPE` varchar(512) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `ACTIVITY_NAME` varchar(512) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `DATABASE_NAME` varchar(512) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
  `CPU_TIME_MS` bigint(4) DEFAULT NULL,
  `CPU_WAIT_TIME_MS` bigint(4) DEFAULT NULL,
  `ELAPSED_TIME_MS` bigint(4) DEFAULT NULL,
  `LOCK_TIME_MS` bigint(4)  DEFAULT NULL,
  `NETWORK_TIME_MS` bigint(4)  DEFAULT NULL,
  `DISK_TIME_MS` bigint(4) DEFAULT NULL,
  `DISK_B` bigint(4) DEFAULT NULL,
  `NETWORK_B` bigint(4) DEFAULT NULL,
  `MEMORY_BS` bigint(4) DEFAULT NULL,
  `MEMORY_MAJOR_FAULTS` bigint(4) DEFAULT NULL,
  `RUN_COUNT` bigint(4)  NULL DEFAULT '0',
  `SUCCESS_COUNT` bigint(4)  NULL DEFAULT '0',
  `FAILURE_COUNT` bigint(4)  NULL DEFAULT '0',
  KEY (`cluster`, `ACTIVITY_TYPE`,`ACTIVITY_NAME`,`DATABASE_NAME`, ts)
   /*!90619  USING CLUSTERED COLUMNSTORE, */
   /*!90618 SHARD KEY () */
) /*!90621 AUTOSTATS_ENABLED=TRUE */;

create table IF NOT EXISTS mv_queries
(
  `cluster` varchar(512) NOT NULL,
  `tags` JSON NOT NULL,
   ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
   ACTIVITY_NAME varchar(512),
   QUERY_TEXT varchar(8192),
   PLAN_WARNINGS varchar(8192),
   KEY (`cluster`, `ACTIVITY_NAME`) /*!90619  USING CLUSTERED COLUMNSTORE, */
   /*!90618 SHARD KEY () */
) /*!90621 AUTOSTATS_ENABLED=TRUE */;

create table IF NOT EXISTS cluster_info
(
  cluster varchar(512) NOT NULL,
  tags JSON NOT NULL,
  ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  status JSON NOT NULL,
  KEY (cluster, ts)
  /*!90619  USING CLUSTERED COLUMNSTORE, */
  /*!90618 SHARD KEY () */
) /*!90621 AUTOSTATS_ENABLED=TRUE */;

create table IF NOT EXISTS db_status
(
  `tags` JSON NOT NULL,
  `keys` JSON NOT NULL,
  `values` JSON NOT NULL,
  ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `cluster` AS `tags`::$cluster                 PERSISTED TINYBLOB,
  database_name AS `keys`::$database_name       PERSISTED TINYBLOB,
  `role` as `keys`::$role                       PERSISTED tinyblob,
  num_partitions as `values`::$num_partitions   PERSISTED SMALLINT(6),
  `online` as `values`::$online                 PERSISTED SMALLINT(6),
  `offline` as `values`::$offline               PERSISTED SMALLINT(6),
  replicating as `values`::$replicating         PERSISTED SMALLINT(6),
  recovering as `values`::$recovering           PERSISTED SMALLINT(6),
  pending as `values`::$pending                 PERSISTED SMALLINT(6),
  transition as `values`::$transition           PERSISTED SMALLINT(6),
  unrecoverable as `values`::$unrecoverable     PERSISTED SMALLINT(6),
  ref_db_state as `values`::$ref_db_state       PERSISTED TINYBLOB,
  summary as `values`::$summary                 PERSISTED TINYBLOB,
  KEY (cluster, database_name, ts)
  /*!90619  USING CLUSTERED COLUMNSTORE, */
  /*!90618 SHARD KEY () */
) /*!90621 AUTOSTATS_ENABLED=TRUE */;

create table if not exists mv_events
(
  `tags` JSON NOT NULL,
  `keys` JSON NOT NULL,
  `values` JSON NOT NULL,
  ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  event_ts TIMESTAMP,
  `cluster` AS `tags`::$cluster                 PERSISTED TINYBLOB,
  `event_type` AS `keys`::$event_type           PERSISTED TINYBLOB,
  `origin_node_id` AS `keys`::$origin_node_id   PERSISTED SMALLINT,
  `ip_addr` AS `values`::$ip_addr               PERSISTED TINYBLOB,
  `port` AS `values`::$port                     PERSISTED SMALLINT,
  `type` AS `values`::$type                     PERSISTED TINYBLOB,
  `severity` as `keys`::$severity               PERSISTED TINYBLOB,
  `details` as `values`::$details               PERSISTED TINYBLOB,
  KEY (cluster, origin_node_id, event_ts)
  /*!90619  USING CLUSTERED COLUMNSTORE, */
  /*!90618 SHARD KEY () */
) /*!90621 AUTOSTATS_ENABLED=TRUE */;

-- helper table for integers 1..100
create reference table if not exists ints
(
    f int auto_increment key
);

insert into ints values
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), (),
(), (), (), (), (), (), (), (), (), ();

delimiter //
create or replace procedure sort_blobs (blobs_pipe query(type text, `keys` JSON, `vals` JSON, time_sec bigint, `tags` JSON NULL)) AS
  begin
    insert into act_samples (
        `cluster`,
        `tags`,
        `ts`,
        `ACTIVITY_TYPE`,
        `ACTIVITY_NAME`,
        `DATABASE_NAME`,
        `CPU_TIME_MS`,
        `CPU_WAIT_TIME_MS`,
        `ELAPSED_TIME_MS`,
        `LOCK_TIME_MS` ,
        `NETWORK_TIME_MS` ,
        `DISK_TIME_MS`,
        `DISK_B`,
        `NETWORK_B`,
        `MEMORY_BS`,
        `MEMORY_MAJOR_FAULTS`,
        `RUN_COUNT`,
        `SUCCESS_COUNT`,
        `FAILURE_COUNT`
      ) select
        `tags`::$cluster,
        `tags`,
        FROM_UNIXTIME(time_sec),
        `keys`::$activity_type,
        `keys`::$activity_name,
        `keys`::$db_name,
        `vals`::cpu_time_ms,
        `vals`::cpu_wait_time_ms,
        `vals`::elapsed_time_ms,
        `vals`::lock_time_ms,
        `vals`::network_time_ms,
        `vals`::disk_time_ms,
        `vals`::disk_b,
        `vals`::network_b,
        `vals`::memory_bs,
        `vals`::memory_major_faults,
        `vals`::run_count,
        `vals`::success_count,
        `vals`::failure_count
        from blobs_pipe
        where type = 'activity';

        insert into mv_events (
            `tags`,
            `keys`,
            `values`,
            `ts`,
            `event_ts`
          )
        select
            `tags`,
            `keys`,
            `vals`,
            time_sec,
            FROM_UNIXTIME(`vals`::$event_time)
          from blobs_pipe b
          left join (SELECT distinct cluster,
                            origin_node_id,
                            event_ts from mv_events) k
          ON b.`tags`::$cluster = k.cluster
          and b.`keys`::$origin_node_id = k.origin_node_id
          and b.`keys`::$event_time = k.event_ts
          where type = 'event'
            and k.cluster is null
            and k.origin_node_id is null;

        insert into mv_queries (
            `cluster`,
            `ACTIVITY_NAME`,
            `tags`,
            `ts`,
            `QUERY_TEXT`,
            `PLAN_WARNINGS`
          )
        select
            `tags`::$cluster,
            `keys`::$activity_name,
            `tags`,
            FROM_UNIXTIME(MAX(time_sec)),
            `vals`::$query_text,
            `vals`::$plan_warnings
          from blobs_pipe b
          left join (SELECT distinct cluster, ACTIVITY_NAME from mv_queries) k
          ON b.`tags`::$cluster = k.cluster and b.`keys`::$activity_name = k.ACTIVITY_NAME
          where type = 'query'
            and k.cluster is null and k.ACTIVITY_NAME is null
          group by 1, 2;

        insert into cluster_info (
            `cluster`,
            `tags`,
            `ts`,
            `status`
        ) select
            `tags`::$cluster,
            `tags`,
            FROM_UNIXTIME(time_sec),
            `vals`
        from blobs_pipe
        where type = 'cluster';

        insert into db_status (
            `tags`,
            `keys`,
            `values`,
            `ts`
        ) select
            `tags`, `keys`, `vals`, FROM_UNIXTIME(time_sec)
        from blobs_pipe
        where type = 'db_state';
  end //
delimiter ;

delimiter //
CREATE OR REPLACE FUNCTION `trim_metric`(input varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, prefix varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL) RETURNS varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL AS begin
  return
  CASE
  WHEN input is not null and prefix is not null and locate(prefix, input, 1) > 0 THEN
    UPPER(substr(input, locate(prefix, input, 1) + length(prefix), length(input) - length(prefix)))
  ELSE input end;
end //
delimiter ;

delimiter //
CREATE OR REPLACE FUNCTION `trim_host`(input varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, c int default 1) RETURNS varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL AS begin
 return
 CASE
 WHEN input RLIKE '([[:digit:]]{1,3}\\.){3}[[:digit:]]{1,3}' THEN
  input
 ELSE substring_index(input, '.', c) end;
end //
delimiter ;

MemSQL Pipelines

Create <org-name>_metrics and <org-name>_blobs pipelines by copying, pasting, and executing each set of SQL statements below on the collecting Master Aggregator.

Replace <kafka-broker-host>, <kafka-port>, and <org-name> to match your configuration before executing.

CREATE or replace PIPELINE  metrics
  as load data kafka '<kafka-broker-host>:<kafka-port>/<org-name>_metrics'
  skip all errors
  into table metrics
  fields terminated by '\t'
  (name, tags, value, time_sec);

start pipeline if not running metrics;

CREATE or replace PIPELINE `sort_blobs`
  as load data kafka '<kafka-broker-host>:<kafka-port>/<org-name>_blobs'
BATCH_INTERVAL 2500
IGNORE PARSER ERRORS
INTO PROCEDURE `sort_blobs`
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES TERMINATED BY '\n' STARTING BY '';

start pipeline if not running sort_blobs;

Grafana Monitoring Data Source

Info

For those MemSQL instances that don’t run database queries as root, this requires both admin privileges and SELECT and EXECUTE grants to the monitoring database as per GRANT.

For example:

GRANT SELECT, EXECUTE ON monitoring.* TO <user>@%;

where <user> is the user other than root under which these queries will be run.

  1. Create a “monitoring” MySQL data source with the following settings. The MemSQL data source port is 3306 by default.

    Data source type: mysql
    Data source name: monitoring
    Data source host: <collecting-cluster-master-aggregator-host>
    Data source port: 3306
    Database name: monitoring
    User: <user>
    Password: <password, blank if none>

  2. Alternatively, modify the sample.yaml file in the /etc/grafana/provisioning/datasources directory with the following contents, substituting the values in angle brackets with your MemSQL configuration.

    # # config file version
    
    apiVersion: 1
    
    datasources:
      - name: monitoring
        type: mysql
        url: <collecting-cluster-master-aggregator-host>:<port>
        database: monitoring
        user: <user>
        password: <password, blank if none>
        isDefault: true
        version: 1
    

Restart the Grafana server.

sudo systemctl restart grafana-server

Grafana Dashboards

Info

You may need Editor privileges to create new Grafana dashboards.

  1. Download the historical monitoring dashboards from MemSQL.

  2. Import the dashboards into Grafana.

  3. Alternatively, modify the sample.yaml file in the /etc/grafana/provisioning/dashboards/ directory with the following contents.

    Copy the dashboard JSON files to the /var/lib/grafana/dashboards directory.

    Be sure that the dashboards are owned by grafana:grafana (via chown grafana:grafana *).

    # # config file version
    apiVersion: 1
    
    providers:
    - name: 'default'
      orgId: 1
      folder: ''
      folderUid: ''
      type: file
      options:
        path: /var/lib/grafana/dashboards
    

Restart the Grafana server.

sudo systemctl restart grafana-server