之前介紹了運維監控系統Prometheus,然后就有同鞋問我關于時序數據庫的情況,所以這里總結一下時序數據庫,并以InfluxDB為例,介紹時序數據庫的功能特性和使用方式,希望能對大家有所幫助。
一、時序數據庫概述1.1 什么是時序數據庫
時序數據是一組按照時間維度索引的數據。時序數據在日常生活中隨處可見,比如每個整點的溫度、濕度等天氣數據,每分鐘的股票價格數據等。我們常用曲線圖、柱狀圖等形式去展現時序數據,也就是我們常常聽到的“數據可視化”。
時序數據庫是一種非關系型數據庫,以時間作為數據主鍵,專門用來存儲時序數據。
1.2 時序數據庫的特點
- 高壓縮比:由于數據每分每秒都在變化,海量的時序數據往往體量巨大,占用大量硬件資源,所以需要優化數據壓縮算法提高數據壓縮比。
- 高并發寫入:時序數據庫采用持續高并發寫入數據,無更新的方式,對于時間相同的重復的數據,只保留一份數據。
- 低延時、高并發查詢:通過索引降低查詢延時,通過緩存等技術提高數據并發能力。
1.3 時序數據庫的使用場景
- IOT行業:電力、化工等工業物聯網數據監測
- 金融行業:各類金融產品及其衍生品、數字貨幣數據存儲與量化研究
- IT行業:服務器、虛擬機、容器等的狀態數據實時監測
- 互聯網行業:用戶行為軌跡,日志等數據。
目前比較流行的時序數據庫有:InfluxDB、Prometheus、OpenTSDB、TDengine等,其中使用最廣泛的當屬InfluxDB,行業內應用最廣泛。還有就是剛進入業內視野的國產時序數據庫TDengine。而Prometheus則是Prometheus監控系統自帶的數據庫。
二、InfluxDB簡介2.1 什么是InfluxDB
InfluxDB 是一個用于存儲和分析時間序列數據的開源數據庫。由 Golang 語言編寫,也是由 Golang 編寫的軟件中比較著名的一個,在很多 Golang 的沙龍或者文章中可能都會把 InfluxDB 當標桿來介紹,這也間接幫助 InfluxDB 提高了知名度。
2.2 InfluxDB的特性
- 內置 HTTP 接口,使用方便
- 數據可以打標記,這樣查詢可以很靈活
- 類 SQL 的查詢語句
- 安裝管理很簡單,并且讀寫數據很高效
- 能夠實時查詢,數據在寫入時被索引后就能夠被立即查出
在最新的 DB-ENGINES 給出的時間序列數據庫的排名中,InfluxDB 高居第一位,可以預見,InfluxDB 會越來越得到廣泛的使用。
2.3 InfluxDB幾個基本概念
時序數據庫由于其存儲海量時序數據的特性,因此與傳統數據庫有些許不同,下面先對influxdb中涉及的基本概念作出解釋。
influxdb數據庫由database、measurement、point等三部分構成。分別對應關系數據庫中的數據庫、表、數據行。
- database:數據庫,同MySQL等關系型數據庫中的“數據庫Database”
- measurement:數據表,相當于關系型數據庫中的“表Table”
- point:數據點,表示單條數據記錄,相當于關系型數據庫中的“一行數據”
概念
MySQL
InfluxDB
數據庫(同)
database
database
表(不同)
table
measurement(測量; 度量)
列(不同)
column
Point,包括:tag(帶索引的,非必須)、field(不帶索引)、timestemp(唯一主鍵)
2.4 Point數據構成
由于database和measurement與傳統數據庫基本相同,這里不做過多解釋,以下針對influxdb中特有的Point進行講解。
Point是InfluxDB中獨有的概念,由時間(time)、數據(field)、標簽(tags)三類字段組成。
(1)time:代表每條數據的時間字段,是measurement中的數據主鍵,因此time字段具有索引屬性。一條point只能有一個time。
(2)field:代表各種數據的字段,例如氣溫、壓力、股價等,field字段沒有索引屬性。一條point可以包括多個field。
(3)tag:代表各類非數據字段,例如設備編碼、地區、姓名等,tag字段有索引屬性。一條point可以包括多個tag。
例如:監控系統系統中,保存某個服務器的cpu和內存等資源使用情況,使用cpu_usage_total 的表名(measurement)保存數據。以下表示某一個point的樣例數據:
![]()
其中time為time字段,記錄數據產生的時間;cpu_usage和memory_usage分別代表CPU使用率和內存使用率,因此他們是field字段,真正的監控數據;cpu 和host代表CPU的名字和服務器IP,所以,他們是tag字段,用于查詢和檢索。
在使用和設計InfluxDB數據結構時,需要注意以下幾點:
- 1. tag 只能為字符串類型,可以加索引;
- 2. field 類型無限制,不能加索引;
- 3. InfluxDB不支持 join;
- 4. InfluxDB支持連續查詢操作(匯總統計數據):CONTINUOUS QUERY;
三、InfluxDB安裝
InfluxDB安裝非常簡單,根據操作系統執行對應的安裝命令即可。這里以window為例,演示如何安裝InfluxDB。
3.1 下載
InfluxDB:https://dl.influxdata.com/influxdb/releases/influxdb-1.7.4_windows_amd64.zip
chronograf :https://dl.influxdata.com/chronograf/releases/chronograf-1.7.8_windows_amd64.zip
chronograf為InfluxDB的Web后臺管理端,InfluxDB提供了控制臺命令端,如果使用不習慣,可以使用chronograf。
3.2 解壓安裝包
軟件下載成功后,解壓,我們可以看到influxDB的數據庫文件非常簡單。如下圖所示:
![]()
3.3 修改配置文件
InfluxDB 的數據存儲主要有三個目錄。默認情況下是 meta, wal 以及 data 三個目錄,程序啟動后會自動生成。
- meta 用于存儲數據庫的一些元數據,meta 目錄下有一個 meta.db 文件。
- wal 目錄存放預寫日志文件,以 .wal 結尾。
- data 目錄存放實際存儲的數據文件,以 .tsm 結尾。
接下來修改influxdb.conf 配置文件,修改以下部分的路徑。
![]()
另外,InfluxDB服務默認端口為8086,如果需要更改端口號,則增加以下配置。
![]()
3.4 啟動InfluxDB服務
配置文件修改完成后,接下來啟動InfluxDB服務。直接運行Influxd.exe使用默認配置運行即可。如果需要使用自定義的配置文件,則指定conf文件進行啟動,啟動命令如下:
#先cmd 進入influxDB目錄influxd.exe -config influxdb.conf
看到如下輸出,說明InfluxDB啟動成功。
![]()
四、InfluxDB使用
InfluxQL是一種類似于SQL的查詢語言,用于與InfluxDB進行交互。如果你使用過關系數據庫及SQL,那么你可以很快速的掌握InfluxQL。但是,InfluxQL又不完全是SQL,缺乏SQL中的一些高級的語法,例如UNION,JOIN,HAVING等。
那么InfluxDB的到底如何操作呢?接下來介紹InfluxQL語言的使用方法。
4.1 連接InfluxDB服務
進入到InfluxDB目錄后,在cmd中輸入influx命令即可,命令如下:
# 使用Command命令行進入influxdbinflux -port 8086
如果使用的是默認配置,可以不需要加端口,直接influx即可。
![]()
4.2 操作InfluxDB
InfluxQL與SQL命令語法類似。接下來我們看一看InfluxQL 是怎么使用的?
4.2.1創建數據庫# 創建數據庫CREATE DATABASE weiz_tes# 顯示所有數據庫SHOW DATABASES# 刪除數據庫DROP DATABASE weiz_test# 使用數據庫USE weiz_test
4.2.2 表操作
1.創建表
InfluxDB沒有專門的創建表的命令,當插入一條數據point至某A表時,此A表會自動創建,并且表的格式、字段名、字段類型也由此條插入命令決定。
2.修改表
InfluxDB沒有修改表的命令,但當插入一條新數據point至表A時,如果此point中的字段多于原A表的字段,會自動修改A表與此條插入數據的格式字段等一致。
注意:此種情況僅限于新插入的數據字段與表A字段的交集即表A的情況,如果新插入數據字段與表A完全不同則會插入失敗。
3.查詢表
# 顯示該數據庫中的表SHOW MEASUREMENTS
4.刪除表:
DROP MEASUREMENT "measurementName"
5.插入數據
insert host_cpu_usage_total,host_name=host1,cpu_core=core1 cpu_usage=0.26,cpu_idle=0.76
上面,我們新增一條數據,measurement為host_cpu_usage_total, tag為host_name,cpu_core, field為cpu_usage,cpu_idle。
我們簡單小結一下插入的語句寫法:
- 基本格式:.insert + measurement + "," + tag=value,tag=value +空格+ field=value,field=value ;
- tag與tag之間用逗號分隔;field與field之間用逗號分隔;
- tag與field之間用空格分隔;
- tag都是string類型,不需要引號將value包裹;
- field如果是string類型,需要加引號;
6.查詢數據
select * from host_cpu_usage_total
查詢語句使用select 關鍵字,格式與mysql 基本一致。
![]()
4.2.3 用戶管理
InfluxDB 默認管理員賬號:admin,密碼為空。我們可以新增用戶和權限。命令如下:
#顯示用戶show users#創建用戶create user "username" with password 'password'#創建管理員權限用戶create user "username" with password 'password' with all privileges#刪除用戶drop user "username"
以上是對InfluxDB數據庫操作的基本總結,其他復雜的用法可以參考官網教程。官網教程地址:https://docs.influxdata.com/influxdb/v1.7/。
五、SpringBoot整合InfluxDB
前面介紹了InfluxDB的基本安裝和使用。接下來我們介紹SpringBoot項目如何整合InfluxDB,實現數據的增刪改查。這里使用的Spring Boot版本為2.4.1。接下來看看如何實現的。
5.1 添加依賴
首先創建springboot項目spring-boot-starter-influxdb,并添加相關依賴,具體依賴如下:
org.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-testtestorg.influxdbinfluxdb-JAVA2.14
5.2 修改Application.properties 配置
接下來修改application.properties 配置文件,增加InfluxDB的相關配置,具體如下:
#influxdb 配置spring.influx.url=http://localhost:8086spring.influx.user=adminspring.influx.password=spring.influx.database=weiz_test
上面配置的是InfluxDB數據庫連接配置,默認url為:http://localhost:8086 ,數據庫為之前創建的weiz_test數據庫。用戶名為admin,密碼默認為空。
5.3 讀取配置文件
創建InfluxDBConfig類,負責讀取Influx的數據庫連接配置。具體代碼如下:
@Configurationpublic class InfluxDBConfig {@Value("${spring.influx.user}")public String userName;@Value("${spring.influx.password}")public String password;@Value("${spring.influx.url}")public String url;//數據庫@Value("${spring.influx.database}")public String database;
5.4 數據庫操作類
創建數據庫操作類InfluxDBService,負責數據庫的初始化,增刪改查等操作的具體實現,示例代碼如下:
@Servicepublic class InfluxDBService {@Autowiredprivate InfluxDBConfig influxDBConfig;@PostConstructpublic void initInfluxDb() {this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;this.influxDB = influxDbBuild();//保留策略private String retentionPolicy;private InfluxDB influxDB;* 設置數據保存策略 defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1 副本個數為1/ 結尾DEFAULT* 表示 設為默認的策略public void createRetentionPolicy() {String command = String.format("CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %s DEFAULT","defalut", influxDBConfig.database, "30d", 1);this.query(command);* 連接時序數據庫;獲得InfluxDBprivate InfluxDB influxDbBuild() {if (influxDB == null) {influxDB = InfluxDBFactory.connect(influxDBConfig.url, influxDBConfig.userName, influxDBConfig.password);influxDB.setDatabase(influxDBConfig.database);return influxDB;* 插入* @param measurement 表* @param tags 標簽* @param fields 字段public void insert(String measurement, Map tags, Map fields) {influxDbBuild();Point.Builder builder = Point.measurement(measurement);builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);builder.tag(tags);builder.fields(fields);influxDB.write(influxDBConfig.database, "", builder.build());* @desc 插入,帶時間time* @date 2021/3/27*@param measurement*@param time*@param tags*@param fields* @return voidpublic void insert(String measurement, long time, Map tags, Map fields) {influxDbBuild();Point.Builder builder = Point.measurement(measurement);builder.time(time, TimeUnit.MILLISECONDS);builder.tag(tags);builder.fields(fields);influxDB.write(influxDBConfig.database, "", builder.build());* @desc influxDB開啟UDP功能,默認端口:8089,默認數據庫:udp,沒提供代碼傳數據庫功能接口* @date 2021/3/13*@param measurement*@param time*@param tags*@param fields* @return voidpublic void insertUDP(String measurement, long time, Map tags, Map fields) {influxDbBuild();Point.Builder builder = Point.measurement(measurement);builder.time(time, TimeUnit.MILLISECONDS);builder.tag(tags);builder.fields(fields);int udpPort = 8089;influxDB.write(udpPort, builder.build());* 查詢* @param command 查詢語句* @returnpublic QueryResult query(String command) {influxDbBuild();return influxDB.query(new Query(command, influxDBConfig.database));* @desc 查詢結果處理* @date 2021/5/12*@param queryResultpublic List> queryResultProcess(QueryResult queryResult) {List> mapList = new ArrayList<>();List resultList = queryResult.getResults();//把查詢出的結果集轉換成對應的實體對象,聚合成listfor(QueryResult.Result query : resultList){List seriesList = query.getSeries();if(seriesList != null && seriesList.size() != 0) {for(QueryResult.Series series : seriesList){List columns = series.getColumns();String[] keys = columns.toArray(new String[columns.size()]);List> values = series.getValues();if(values != null && values.size() != 0) {for(List value : values){Map map = new HashMap(keys.length);for (int i = 0; i < keys.length; i++) {map.put(keys[i], value.get(i));mapList.add(map);return mapList;* @desc InfluxDB 查詢 count總條數* @date 2021/4/8public long countResultProcess(QueryResult queryResult) {long count = 0;List> list = queryResultProcess(queryResult);if(list != null && list.size() != 0) {Map map = list.get(0);double num = (Double)map.get("count");count = new Double(num).longValue();return count;* 查詢* @param dbName 創建數據庫* @returnpublic void createDB(String dbName) {influxDbBuild();influxDB.createDatabase(dbName);* 批量寫入測點* @param batchPointspublic void batchInsert(BatchPoints batchPoints) {influxDbBuild();influxDB.write(batchPoints);* 批量寫入數據* @param database 數據庫* @param retentionPolicy 保存策略* @param consistency 一致性* @param records 要保存的數據(調用BatchPoints.lineProtocol()可得到一條record)public void batchInsert(final String database, final String retentionPolicy,final InfluxDB.ConsistencyLevel consistency, final List records) {influxDbBuild();influxDB.write(database, retentionPolicy, consistency, records);* @desc 批量寫入數據* @date 2021/3/19*@param consistency*@param recordspublic void batchInsert(final InfluxDB.ConsistencyLevel consistency, final List records) {influxDbBuild();influxDB.write(influxDBConfig.database, "", consistency, records);
5.5 測試驗證
接下來,我們寫幾個單元測試,驗證數據的增刪改查等操作是否成功。單元測試代碼如下:
@SpringBootTestclass Example01ApplicationTests {@Autowiredprivate InfluxDBService influxDBService;@Testvoid contextLoads() {@Testpublic void testSave(){String measurement = "host_cpu_usage_total";Map tags = new HashMap<>();tags.put("host_name","host2");tags.put("cpu_core","core0");Map fields = new HashMap<>();fields.put("cpu_usage",0.22);fields.put("cpu_idle",0.56);influxDBService.insert(measurement, tags, fields);@Testpublic void testGetdata(){String command = "select * from host_cpu_usage_total";QueryResult queryResult = influxDBService.query(command);List> result = influxDBService.queryResultProcess(queryResult);for (Map map: result) {System.out.println("time:"+ map.get("time")+" host_name:" + map.get("host_name")+" cpu_core:" + map.get("cpu_core")+" cpu_usage:" + map.get("host_name")+" cpu_idle:" + map.get("host_name"));
運行上面的新增和查詢等單元測試,單擊Run Test或在方法上右擊,選擇Run 'testSave' ,查看單元測試結果,運行結果如下圖所示。
![]()
接下來調用數據驗證數據查詢,運行'testGetData'測試方法,運行結果如下圖所示:
![]()
保存和查詢等功能的單元測試運行成功,說明InfluxDB的增加和查詢操作執行成功。
最后
以上,我們就把時序數據庫InfluxDB介紹完了,并通過示例介紹了如何在SpringBoot項目中整合InfluxDB。示例代碼也會同步上傳:https://gitee.com/weizhong1988/spring-boot-starter 。如有疑問,請在下方留言!
InfluxDB在系統監控、物聯網等方面的應用越來越多,希望大家能夠熟練掌握。






