亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

在閱讀該文之前,我已經假設你已經對kafka的broker、topic、partition、consumer等概念已經有了一定的了解。

流處理

流數據是一組順序、大量、快速、連續到達的數據序列,一般情況下,數據流可被視為一個隨時間延續而無限增長的動態數據集合。

Confluent KSQL

Confluent KSQL是一個基于kafka的實時數據流處理工具引擎,提供了強大且易用的sql交互方式來對kafka數據流進行處理,而無需編寫代碼。ksql具備高擴展、高彈性、容錯式等優良特性,并且它提供了大范圍的流式處理操作,比如數據過濾、轉化、聚合、連接join、窗口化和 Sessionization (即捕獲單一會話期間的所有的流事件)等。

概念

架構

流處理引擎:ksql

 

  • KSQL engine: 處理ksql聲明和查詢
  • REST interfaceL:客戶端和engine的連接器
  • KSQL CLI:命令行交互終端,通過rest api和引擎進行交互
  • KSQL UI:ksql的控制中心

stream和table

流(stream)表示的從開始至今的完整的歷史,它代表了過去產生的數據(事件、日志等)及其相應的時間。新的數據只能被不斷地添加到流中,無法被刪除和修改,它們是既定的事實。從某種角度而言,流是對事實的建模。

表(table)表示的是基于數據流進行了某種操作之后的數據,它是對歷史數據的某種狀態的快照。表的這個概念,是源自于已經發展了數十年的RDBMS,因此,基本可以用相同的理解去使用table。

其實,RDBMS中也有數據流,如binlog本身就是一種流式數據。KSQL將stream作為基礎對象,而RDBMS的基礎對象是table。KSQL和RDBMS都有將stream和table互相轉化的功能,只是二者的側重點不同而已。

query的生命周期

  • 使用DDL注冊一個stream或者table,如:create stream stream_name with topic_name ...
  • 使用一個ksql聲明來表示你的應用:create table as select from stream_name ...
  • ksql將你的DDL/DML解析為AST
  • ksql基于ASL生成一個邏輯計劃
  • ksql基于邏輯計劃生成一個物理執行計劃
  • ksql生成和執行kafka流應用
  • 你可以通過對stream和table進行操作來管理你的應用

基本流程和一般DBMS相同。

使用

最簡單的體驗方式: 使用Docker。這種方式默認下將zookeeper、kafka、ksql在一個compose(一共9個service)下啟動。最低配置8G內存,嘗試請謹慎。

git clone https://github.com/confluentinc/cp-docker-images
cd cp-docker-images
git checkout 5.2.1-post
cd examples/cp-all-in-one/
docker-compose up -d --build
# 新建topic: user
docker-compose exec broker kafka-topics --create --zookeeper 
zookeeper:2181 --replication-factor 1 --partitions 1 --topic users
# 新建topic: pageview
docker-compose exec broker kafka-topics --create --zookeeper 
zookeeper:2181 --replication-factor 1 --partitions 1 --topic pageviews

樣例里面會自動生成兩個topic:pageview和user,表示用戶對某個頁面的訪問日志。

現在我們kafka和ksql都已經有了,還創建了兩個topic。現在我們使用一個腳本來往這兩個topic寫入一些數據(這個腳本寫入的數據為avro)

wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_pageviews_cos.config
curl -X POST -H "Content-Type: Application/json" --data @connector_pageviews_cos.config http://localhost:8083/connectors
wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_users_cos.config
curl -X POST -H "Content-Type: application/json" --data @connector_users_cos.config http://localhost:8083/connectors

啟動KSQL終端

docker-compose exec ksql-cli ksql http://ksql-server:8088

DDL

  • CREATE STREAM:基于某個topic新建一個流
  • CREATE TABLE:基于一個stream新建一個table
  • DROP STREAM/TABLE:刪除stream或者table
  • CREATE STREAM AS SELECT (CSAS)
  • CREATE TABLE AS SELECT (CTAS)
  • 新建stream pageviews/users。(SHOW STREAMS;可以用來查看當前有什么stream)
CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) 
WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');

CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR,  
userid VARCHAR) 
WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO', KEY = 'userid');

SHOW STREAMS;
  • 從stream中查詢數據
# 設置query語句讀取最開始的數據
SET 'auto.offset.reset'='earliest';
SELECT pageid FROM pageviews LIMIT 3;

你會發現這條query會從pageviews流中獲取每條記錄的pageid。你也可以加上一些where條件嘗試一下。

  • 從其他stream生成一個新的stream
CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, 
regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid 
WHERE gender = 'FEMALE';

這條DDL會對pageviews和users中的數據進行左連接操作,并把連接結果作為新stream pageviews_femails的數據。這個stream的數據會寫到一個新的kafka topic:PAGEVIEWS_FEMALE。

即:我們可以完全基于一個現有的topic新建一個stream;也可以基于現有的stream新建一個stream,這建立方法所得到的數據會存儲在一個和stream名相同的topic中。

  • 我們也可以基于一個現有的topic的部分數據建立一個stream,并指定新stream的topic名。以下這個stream的數據會存儲在topic pageviews_enriched_r8_r9中。
CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', 
value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
  • stream的建立語句可以使用聚合函數和窗口函數
CREATE TABLE pageviews_regions AS SELECT gender, regionid , 
COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) 
GROUP BY gender, regionid HAVING COUNT(*) > 1;
  • 查看stream的定義
# 類似于MySQL的desc
DESCRIBE EXTENDED pageviews_female_like_89;

和外部系統的連接

ksql可以使用 ksql connectors 和外部系統如:mysql、s3、hdfs等進行通信、操作。

優缺點

  • 優點KSQL 流數據查詢在實現上是分布式的、容錯的、彈性的、可擴展的和實時的,這些特性可以滿足現代企業對數據的需求。KSQL的數據過濾、轉化、聚合、連接join、窗口化和Sessionization等功能基本能夠覆蓋大部分應用場景;近似標準SQL的客戶端實現降低了學習成本。對于 JAVA 或 Scala 開發人員而言,Kakfa Streams API 是一個強大的軟件庫,它實現了將流數據處理集成到應用中。利用ksql可以輕松實現實時報告、服務監控、活動告警、基于會話的用戶數據分析、實時ETL等。
  • 缺點KSQL算是一種重量級的流數據處理工具,對于資源要求較高。

分享到:
標簽:引擎 ksql
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定