在閱讀該文之前,我已經假設你已經對kafka的broker、topic、partition、consumer等概念已經有了一定的了解。
流處理
流數據是一組順序、大量、快速、連續到達的數據序列,一般情況下,數據流可被視為一個隨時間延續而無限增長的動態數據集合。
Confluent KSQL
Confluent KSQL是一個基于kafka的實時數據流處理工具引擎,提供了強大且易用的sql交互方式來對kafka數據流進行處理,而無需編寫代碼。ksql具備高擴展、高彈性、容錯式等優良特性,并且它提供了大范圍的流式處理操作,比如數據過濾、轉化、聚合、連接join、窗口化和 Sessionization (即捕獲單一會話期間的所有的流事件)等。
概念
架構
- KSQL engine: 處理ksql聲明和查詢
- REST interfaceL:客戶端和engine的連接器
- KSQL CLI:命令行交互終端,通過rest api和引擎進行交互
- KSQL UI:ksql的控制中心
stream和table
流(stream)表示的從開始至今的完整的歷史,它代表了過去產生的數據(事件、日志等)及其相應的時間。新的數據只能被不斷地添加到流中,無法被刪除和修改,它們是既定的事實。從某種角度而言,流是對事實的建模。
表(table)表示的是基于數據流進行了某種操作之后的數據,它是對歷史數據的某種狀態的快照。表的這個概念,是源自于已經發展了數十年的RDBMS,因此,基本可以用相同的理解去使用table。
其實,RDBMS中也有數據流,如binlog本身就是一種流式數據。KSQL將stream作為基礎對象,而RDBMS的基礎對象是table。KSQL和RDBMS都有將stream和table互相轉化的功能,只是二者的側重點不同而已。
query的生命周期
- 使用DDL注冊一個stream或者table,如:create stream stream_name with topic_name ...
- 使用一個ksql聲明來表示你的應用:create table as select from stream_name ...
- ksql將你的DDL/DML解析為AST
- ksql基于ASL生成一個邏輯計劃
- ksql基于邏輯計劃生成一個物理執行計劃
- ksql生成和執行kafka流應用
- 你可以通過對stream和table進行操作來管理你的應用
基本流程和一般DBMS相同。
使用
最簡單的體驗方式: 使用Docker。這種方式默認下將zookeeper、kafka、ksql在一個compose(一共9個service)下啟動。最低配置8G內存,嘗試請謹慎。
git clone https://github.com/confluentinc/cp-docker-images
cd cp-docker-images
git checkout 5.2.1-post
cd examples/cp-all-in-one/
docker-compose up -d --build
# 新建topic: user
docker-compose exec broker kafka-topics --create --zookeeper
zookeeper:2181 --replication-factor 1 --partitions 1 --topic users
# 新建topic: pageview
docker-compose exec broker kafka-topics --create --zookeeper
zookeeper:2181 --replication-factor 1 --partitions 1 --topic pageviews
樣例里面會自動生成兩個topic:pageview和user,表示用戶對某個頁面的訪問日志。
現在我們kafka和ksql都已經有了,還創建了兩個topic。現在我們使用一個腳本來往這兩個topic寫入一些數據(這個腳本寫入的數據為avro)
wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_pageviews_cos.config
curl -X POST -H "Content-Type: Application/json" --data @connector_pageviews_cos.config http://localhost:8083/connectors
wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_users_cos.config
curl -X POST -H "Content-Type: application/json" --data @connector_users_cos.config http://localhost:8083/connectors
啟動KSQL終端
docker-compose exec ksql-cli ksql http://ksql-server:8088
DDL
- CREATE STREAM:基于某個topic新建一個流
- CREATE TABLE:基于一個stream新建一個table
- DROP STREAM/TABLE:刪除stream或者table
- CREATE STREAM AS SELECT (CSAS)
- CREATE TABLE AS SELECT (CTAS)
- 新建stream pageviews/users。(SHOW STREAMS;可以用來查看當前有什么stream)
CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR,
userid VARCHAR)
WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO', KEY = 'userid');
SHOW STREAMS;
- 從stream中查詢數據
# 設置query語句讀取最開始的數據
SET 'auto.offset.reset'='earliest';
SELECT pageid FROM pageviews LIMIT 3;
你會發現這條query會從pageviews流中獲取每條記錄的pageid。你也可以加上一些where條件嘗試一下。
- 從其他stream生成一個新的stream
CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid,
regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid
WHERE gender = 'FEMALE';
這條DDL會對pageviews和users中的數據進行左連接操作,并把連接結果作為新stream pageviews_femails的數據。這個stream的數據會寫到一個新的kafka topic:PAGEVIEWS_FEMALE。
即:我們可以完全基于一個現有的topic新建一個stream;也可以基于現有的stream新建一個stream,這建立方法所得到的數據會存儲在一個和stream名相同的topic中。
- 我們也可以基于一個現有的topic的部分數據建立一個stream,并指定新stream的topic名。以下這個stream的數據會存儲在topic pageviews_enriched_r8_r9中。
CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9',
value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
- stream的建立語句可以使用聚合函數和窗口函數
CREATE TABLE pageviews_regions AS SELECT gender, regionid ,
COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second)
GROUP BY gender, regionid HAVING COUNT(*) > 1;
- 查看stream的定義
# 類似于MySQL的desc
DESCRIBE EXTENDED pageviews_female_like_89;
和外部系統的連接
ksql可以使用 ksql connectors 和外部系統如:mysql、s3、hdfs等進行通信、操作。
優缺點
- 優點KSQL 流數據查詢在實現上是分布式的、容錯的、彈性的、可擴展的和實時的,這些特性可以滿足現代企業對數據的需求。KSQL的數據過濾、轉化、聚合、連接join、窗口化和Sessionization等功能基本能夠覆蓋大部分應用場景;近似標準SQL的客戶端實現降低了學習成本。對于 JAVA 或 Scala 開發人員而言,Kakfa Streams API 是一個強大的軟件庫,它實現了將流數據處理集成到應用中。利用ksql可以輕松實現實時報告、服務監控、活動告警、基于會話的用戶數據分析、實時ETL等。
- 缺點KSQL算是一種重量級的流數據處理工具,對于資源要求較高。






