Design and implementation of distributed time series database QTSDB

  Database, influxdb

Guide to exotic techniques

The existing open source time series database influxdb only supports single-machine operation. When faced with a large amount of data writing, there will be slow query, high machine load and the limitation of single-machine capacity.

In order to solve this problem, the 360 infrastructure team developed a cluster version-qtsdb on the basis of single-machine influxdb.

QTSDB brief introduction

QTSDB is a distributed time series database, which is used to process massive data writing and query. In terms of implementation, it is a distributed version developed based on influxdb 1.7, an open source stand-alone time series database. In addition to the features of influxdb itself, it also has cluster functions such as capacity expansion and replica fault tolerance.

The main features are as follows:

  • High-performance data storage specially written for time series data, taking into account writing performance and disk space occupation;
  • Sql-like query statements that support a variety of statistical aggregation functions;
  • Automatically clean up expired data;
  • Built-in continuous query automatically completes the aggregation operation preset by the user.
  • Golang, no other dependency, simple deployment and operation;
  • Nodes expand dynamically and horizontally to support mass data storage;
  • Replica redundancy design, automatic failover, support for high availability;
  • Optimize data writing to support high throughput;

System architecture

Logical storage hierarchy


The highest level of influxdb architecture is database. The lower part of database is divided into different retention policies according to the different retention periods of data, forming multiple storage containers under database. Because the time series database is associated with the time dimension, the contents with the same retention period are stored together to facilitate expiration and deletion. In addition, under the retention policy, the retention period of the retention policy is further subdivided, and the data of each period is stored in a shard group, so that when the shard group of a certain segment expires, the entire shardgroup will be deleted to avoid picking out some data from inside the storage engine. For example, data under database may be retained for 30 days or 7 days, and they will be stored under different retention policies. Assuming that 7 days of data are divided into 1 day, they will be stored in 7 shard groups respectively. when the data on the 8th day is generated, a new shard group will be created for writing and the shardgroup on the 1st day will be deleted completely.

So far, under the same retension policy, the current timing data sent will only fall in the current time period, i.e. only the latest shard group has data written. in order to improve the concurrency, a shard group is divided into multiple shards, which are globally unique and distributed on all physical nodes. each shard corresponds to a tsm storage engine and is responsible for storing data.

When requesting access to data, the requested information can lock a certain database and retension policy, and then lock a certain shard group (s) according to the time period information in the request. In the case of writing, each piece of data written corresponds to a serieskey (this concept will be described later). A shard can be locked and written by hashing and moduling the serieskey. While shard has copies, when writing, it will adopt the policy of no main and many writes and write to each copy at the same time. During the query, since there is no information about serieskey in the query request, all shards in the shard group can be queried once. For a shard, an available physical node will be selected in its copy for access.

How many shards does a shard group need to have? in order to achieve the maximum amount of concurrency without unduly interfering with the overall order of data, after the number of physical nodes and the number of copies are determined, the number of shards in a shard group is the number of machines divided by the number of copies, which ensures that the current data can be uniformly written on all physical nodes without too many shards affecting the query efficiency. For example, the data cluster on the figure has 6 physical nodes, and if the user specifies double copies, then there are 3 shard.

Cluster structure


The whole system is divided into three parts: proxy, meta cluster and data cluster. Proxy is responsible for receiving requests and is stateless. It can be connected to lvs to support horizontal extension. Meta cluster saves the logical storage hierarchy mentioned above and its corresponding relationship with physical nodes. raft protocol ensures strong consistency of metadata. meta information is stored in memory, and logs and snapshots are persisted to disk. Data cluster is a real data storage node, on which data is stored in shard units, and each shard corresponds to a tsm storage engine.

When the request arrives, a proxy is locked through lvs. The proxy searches meta information from the meta cluster according to database, retension policy and time period, and finally obtains a mapping from shard to physical node. Then the mapping relationship is converted into a mapping from physical node to shard and returned to the proxy. Finally, according to the mapping relationship, the proxy accesses specific shard to the physical node designated by the data cluster. Data access under shard will be described later.

Data access

Grammatical format


Influxdb query provides a query method similar to relational database, showing a relational table: measurement. Time in time series database is regarded as an eternal column, and other columns are divided into two types:


One is field, which is the most critical part of time series data. Its value will be added continuously with the flow of time, such as the delay between two machines at each time point.


The other is tag, which is a tag with a field value, so they are all string types and have a limited range of values. For example, the delay field value at a certain point in time is 2ms, which corresponds to two tag attributes, and the delay from which machine to which machine, so two tags: from and to can be designed.

Measurement shows that the first line is key, and the rest can be regarded as value, so tag has tagkey, tagvalue, and field has fieldkey and fieldvalue.

Data reading and writing


When a row of write data is received, it is converted into the following format:


If there are more than one field in a row, it will be divided into multiple such data stores. The storage engine of influxdb can be understood as a map, from measurement to fieldkey as the storage key, followed by fieldvalue and time as the storage value. These values will be added continuously. In the storage engine, these values will be stored together as a column, because they are data that gradually change with time. Saving them together can improve the compression effect. In addition, the remaining part after removing the fieldkey from the storage key is the serieskey mentioned above.

As mentioned above, how access requests lock shard in a cluster is described here.


Influxdb queries are similar to sql syntax, but scattered information with sql statements cannot directly query the storage engine, so some strategies are needed to convert sql statements into storage keys. Influxdb converts tag information after where into a set of all relevant serieskey by constructing an inverted index, and then splices each serieskey with the fieldkey after select to form a storage key, so that the corresponding data can be taken out by column.

By analyzing the Series key stored in the tsm storage engine, the inverted index can be constructed. The new version influxdb will persist the inverted index into each shard, which corresponds to the tsm storage engine storing data and is called tsi storage engine. The inverted index is equivalent to a three-layer map, the key of which is measurement, and the value is a two-layer map. the key of this two-layer map is tagkey, and the corresponding value is a one-layer map. the key of this one-layer map is tagval, and the corresponding value is a set of serieskeys. each series key string in this set includes measurement, tagkey, and tagval on the map index path.

In this way, you can analyze the query sql, use the measurement after from to query the inverted index three-level map to obtain a second-level map, and then analyze the multiple filtering logic units after where. Take tagkey1=tagval1 as an example, take these two pieces of information as the keys of the second-level map to find the final value: the set of SERIES key. Each serieskey string in this collection contains measurment, tagkey1 and tagval1, which are serieskeys that satisfy the current filtering logic unit. According to the AND or logic of these logic units, the sets of series ·keys corresponding to these logic units are subjected to intersection and parallel operation, and finally all the sets of series keys conforming to their logic are filtered according to the semantics of sql, and then these series keys are spliced with the fieldkey after select to obtain the final storage key, and the data can be read.


Queries without aggregate function: as shown in the figure, for a serieskey, many fieldkey need to be spliced to extract data from multiple columns. the problem they face after coming out is how to combine data into one row. influxdb row and column constraints are relatively loose, and rows cannot be determined simply according to column offset. Influxdb regards serieskey and time as the basis for judging the column data as a row, the multiple columns corresponding to each serieskey are assembled into a data stream with multiple behavior granularity, and the data streams corresponding to multiple Series Keys are assembled into a data stream according to a certain sequence, and are returned to the client as the final result set.


Query with aggregate function: this method is exactly the opposite of the above query. here, a large number of serieskey are spliced for the aggregate function parameter field. of course, the ultimate goal is the same. multiple storage keys can read multiple data streams. these data streams are subject to two kinds of processing. first, they are assembled into one data stream in a certain order, and then some adjacent data in this data stream are delineated according to a certain strategy for aggregate calculation, and finally the aggregated value is obtained. The order and strategy here come from the aggregation method after group by in sql statements.

The method of merging and aggregating multiple data streams is also applicable to the query results above shard.

For writing is relatively simple, directly update the data storage engine and inverted index.

The whole process

The whole process of access has already been mentioned above, and here we will sort it out as a whole: it is divided into two stages: queries above shard and queries below shard.

First, access requests are locked to a proxy through lvs. proxy searches meta information in meta cluster. According to the request information, database, retension policy and shard group are locked to obtain many shard.

For write operations, one shard is locked for writing according to the serieskey at the time of writing. since shard has multiple copies, data needs to be written to multiple copies at the same time. For queries, serieskey cannot be obtained by requesting information, so it is necessary to query all shards and select an available copy for each shard to access.

After the above processing, the mapping from shard to physical node is obtained, and then it is inverted into the mapping from physical node to shard and returned to proxy, which can access the corresponding shard at a node of the data cluster.

For write access under shard, insert statements need to be disassembled, combined into storage key-value pairs to be stored in tsm storage engine, and then the inverted index is updated according to the combined serieskey.

Query access under shard, analyze sql statements, query inverted indexes, obtain their related serieskey sets, splice them into field, form the final storage key, and perform data access. Then, many data are merged and aggregated over shard on the data node and over DATA on the proxy.

Finally, the proxy returns the access result to the client.

Fault handling


The influxdb mentioned above provides copy fault tolerance for shard. When write data is sent to the proxy, the proxy sends the data to all shard copies in the form of no main write. Meta cluster monitors whether the data node is online in the form of heartbeat. When reading, it randomly selects a reading node from the online data nodes for the same shard to read.

If a data node is unavailable during writing, it will be written to a temporary file of proxy, and the temporary data will be sent to the specified node when the network returns to normal.

To deal with

Data cluster expansion

When a new node joins a data cluster, automatic migration of existing data is not supported at present, but some efforts have been made. in order to make the currently written data apply to the new node as soon as possible, when the new node joins, the current time will be taken as the end time of the current shard group, and then a shard group will be newly built according to the new number of data nodes. In this way, the current data volume can be evenly distributed to each data node immediately, and meta information related to each shard group is stored in the meta cluster, so that the reading of previous data will not be disturbed.

Data node is temporarily unavailable

If the data node is in a short-term unavailable state, including self-recovery after a short-term network failure or intervention by operation and maintenance personnel after a hardware failure, and finally the data node still has data before disconnection, it can join the data cluster as it was. For writing, proxy will temporarily store the data of this data node during the unavailable period, and will send this part of data to the data node again when data joins the cluster, so as to ensure the final consistency of the data.

Data node is not available for a long time.

If the data node cannot or does not need to be added to the cluster as it was for some reason, and the operation and maintenance personnel are required to manually offline the previously unavailable data node, the machine can be added to the cluster as a brand new data when it is available, which is equivalent to the expansion of the cluster.

Total settlement

QTSDB cluster is implemented as follows: when writing, data is written to the specified shard according to serieskey, while when reading, serieskey cannot be predicted, so each shard needs to be queried. The whole reading process is divided into two stages: reading the storage engine on the data node and merging and aggregating the multiple shard in the node; summarizing the data of the multiple data nodes on the proxy node and merging and aggregating the data in the later stage to form the final result set and returning the final result set to the client.

QTSDB’s existing clustering functions are still imperfect and will be continuously improved in later use.

This article is an original 360 technical article, please indicate the source and the two-dimensional code at the end of the article, thank you ~


About 360 Technology

360 technology is a technology sharing public number created by 360 technology team, pushing technology dry goods every day.

For more technical information, please pay attention to “360 technology” WeChat public number