前提
最近學(xué)習(xí)Netty的時(shí)候想做一個(gè)基于redis服務(wù)協(xié)議的編碼解碼模塊,過(guò)程中順便閱讀了Redis服務(wù)序列化協(xié)議RESP,結(jié)合自己的理解對(duì)文檔進(jìn)行了翻譯并且簡(jiǎn)單實(shí)現(xiàn)了RESP基于JAVA語(yǔ)言的解析。編寫(xiě)本文的使用使用的JDK版本為[8+]。
RESP簡(jiǎn)介
Redis客戶(hù)端與Redis服務(wù)端基于一個(gè)稱(chēng)作RESP的協(xié)議進(jìn)行通信,RESP全稱(chēng)為Redis Serialization Protocol,也就是Redis序列化協(xié)議。雖然RESP為Redis設(shè)計(jì),但是它也可以應(yīng)用在其他客戶(hù)端-服務(wù)端(Client-Server)的軟件項(xiàng)目中。RESP在設(shè)計(jì)的時(shí)候折中考慮了如下幾點(diǎn):
- 易于實(shí)現(xiàn)。
- 快速解析。
- 可讀性高。
RESP可以序列化不同的數(shù)據(jù)類(lèi)型,如整型、字符串、數(shù)組還有一種特殊的Error類(lèi)型。需要執(zhí)行的Redis命令會(huì)封裝為類(lèi)似于字符串?dāng)?shù)組的請(qǐng)求然后通過(guò)Redis客戶(hù)端發(fā)送到Redis服務(wù)端。Redis服務(wù)端會(huì)基于特定的命令類(lèi)型選擇對(duì)應(yīng)的一種數(shù)據(jù)類(lèi)型進(jìn)行回復(fù)(這一句是意譯,原文是:Redis replies with a command-specific data type)。
RESP是二進(jìn)制安全的(binary-safe),并且在RESP下不需要處理從一個(gè)進(jìn)程傳輸?shù)搅硪粋€(gè)進(jìn)程的批量數(shù)據(jù),因?yàn)樗褂昧饲熬Y長(zhǎng)度(prefixed-length,后面會(huì)分析,就是在每個(gè)數(shù)據(jù)塊的前綴已經(jīng)定義好數(shù)據(jù)塊的個(gè)數(shù),類(lèi)似于Netty里面的定長(zhǎng)編碼解碼)來(lái)傳輸批量數(shù)據(jù)。
注意:此處概述的協(xié)議僅僅使用在客戶(hù)端-服務(wù)端通信,Redis Cluster使用不同的二進(jìn)制協(xié)議在多個(gè)節(jié)點(diǎn)之間交換消息(也就是Redis集群中的節(jié)點(diǎn)之間并不使用RESP通信)。
網(wǎng)絡(luò)層
Redis客戶(hù)端通過(guò)創(chuàng)建一個(gè)在6379端口的TCP連接,連接到Redis服務(wù)端。
雖然RESP在底層通信協(xié)議技術(shù)上是非TCP特定的,但在Redis的上下文中,RESP僅用于TCP連接(或類(lèi)似的面向流的連接,如Unix套接字)。
請(qǐng)求-響應(yīng)模型
Redis服務(wù)端接收由不同參數(shù)組成的命令,接收到命令并將其處理之后會(huì)把回復(fù)發(fā)送回Redis客戶(hù)端。這是最簡(jiǎn)單的模型,但是有兩種例外的情況:
- Redis支持管道(Pipelining,流水線(xiàn),多數(shù)情況下習(xí)慣稱(chēng)為管道)操作。使用管道的情況下,Redis客戶(hù)端可以一次發(fā)送多個(gè)命令,然后等待一次性的回復(fù)(文中的回復(fù)是replies,理解為Redis服務(wù)端會(huì)一次性返回一個(gè)批量回復(fù)結(jié)果)。
- 當(dāng)Redis客戶(hù)端訂閱Pub/Sub信道時(shí),該協(xié)議會(huì)更改語(yǔ)義并成為推送協(xié)議(push protocol),也就是說(shuō),客戶(hù)端不再需要發(fā)送命令,因?yàn)镽edis服務(wù)端將自動(dòng)向客戶(hù)端(訂閱了改信道的客戶(hù)端)發(fā)送新消息(這里的意思是:在訂閱/發(fā)布模式下,消息是由Redis服務(wù)端主動(dòng)推送給訂閱了特定信道的Redis客戶(hù)端)。
除了上述兩個(gè)特例之外,Redis協(xié)議是一種簡(jiǎn)單的請(qǐng)求-響應(yīng)協(xié)議。
RESP支持的數(shù)據(jù)類(lèi)型
RESP在Redis 1.2中引入,在Redis 2.0,RESP正式成為與Redis服務(wù)端通信的標(biāo)準(zhǔn)方案。也就是如果需要編寫(xiě)Redis客戶(hù)端,你就必須在客戶(hù)端中實(shí)現(xiàn)此協(xié)議。
RESP本質(zhì)上是一種序列化協(xié)議,它支持的數(shù)據(jù)類(lèi)型如下:?jiǎn)涡凶址㈠e(cuò)誤消息、整型數(shù)字、定長(zhǎng)字符串和RESP數(shù)組。
RESP在Redis中用作請(qǐng)求-響應(yīng)協(xié)議的方式如下:
- Redis客戶(hù)端將命令封裝為RESP的數(shù)組類(lèi)型(數(shù)組元素都是定長(zhǎng)字符串類(lèi)型,注意這一點(diǎn),很重要)發(fā)送到Redis服務(wù)器。
- Redis服務(wù)端根據(jù)命令實(shí)現(xiàn)選擇對(duì)應(yīng)的RESP數(shù)據(jù)類(lèi)型之一進(jìn)行回復(fù)。
在RESP中,數(shù)據(jù)類(lèi)型取決于數(shù)據(jù)報(bào)的第一個(gè)字節(jié):
- 單行字符串的第一個(gè)字節(jié)為+。
- 錯(cuò)誤消息的第一個(gè)字節(jié)為-。
- 整型數(shù)字的第一個(gè)字節(jié)為:。
- 定長(zhǎng)字符串的第一個(gè)字節(jié)為$。
- RESP數(shù)組的第一個(gè)字節(jié)為*。
另外,在RESP中可以使用定長(zhǎng)字符串或者數(shù)組的特殊變體來(lái)表示Null值,后面會(huì)提及。在RESP中,協(xié)議的不同部分始終以rn(CRLF)終止。
目前RESP中5種數(shù)據(jù)類(lèi)型的小結(jié)如下:
數(shù)據(jù)類(lèi)型 本文翻譯名稱(chēng) 基本特征 例子 Simple String 單行字符串 第一個(gè)字節(jié)是+,最后兩個(gè)字節(jié)是rn,其他字節(jié)是字符串內(nèi)容 +OKrn Error 錯(cuò)誤消息 第一個(gè)字節(jié)是-,最后兩個(gè)字節(jié)是rn,其他字節(jié)是異常消息的文本內(nèi)容 -ERRrn Integer 整型數(shù)字 第一個(gè)字節(jié)是:,最后兩個(gè)字節(jié)是rn,其他字節(jié)是數(shù)字的文本內(nèi)容 :100rn Bulk String 定長(zhǎng)字符串 第一個(gè)字節(jié)是$,緊接著的字節(jié)是內(nèi)容字符串長(zhǎng)度rn,最后兩個(gè)字節(jié)是rn,其他字節(jié)是字符串內(nèi)容 $4rndogern Array RESP數(shù)組 第一個(gè)字節(jié)是*,緊接著的字節(jié)是元素個(gè)數(shù)rn,最后兩個(gè)字節(jié)是rn,其他字節(jié)是各個(gè)元素的內(nèi)容,每個(gè)元素可以是任意一種數(shù)據(jù)類(lèi)型 *2rn:100rn$4rndogern 下面的小節(jié)是對(duì)每種數(shù)據(jù)類(lèi)型的更細(xì)致的分析。
RESP簡(jiǎn)單字符串-Simple String
簡(jiǎn)單字符串的編碼方式如下:
- (1)第一個(gè)字節(jié)為+。
- (2)緊接著的是一個(gè)不能包含CR或者LF字符的字符串。
- (3)以CRLF終止。
簡(jiǎn)單字符串能夠保證在最小開(kāi)銷(xiāo)的前提下傳輸非二進(jìn)制安全的字符串。例如很多Redis命令執(zhí)行成功后服務(wù)端需要回復(fù)OK字符串,此時(shí)通過(guò)簡(jiǎn)單字符串編碼為5字節(jié)的數(shù)據(jù)報(bào)如下:
+OKrn 復(fù)制代碼
如果需要發(fā)送二進(jìn)制安全的字符串,那么需要使用定長(zhǎng)字符串。
當(dāng)Redis服務(wù)端用簡(jiǎn)單字符串響應(yīng)時(shí),Redis客戶(hù)端庫(kù)應(yīng)該向調(diào)用者返回一個(gè)字符串,該響應(yīng)到調(diào)用者的字符串由+之后直到字符串內(nèi)容末尾的字符組成(其實(shí)就是上面提到的第(2)部分的內(nèi)容),不包括最后的CRLF字節(jié)。
RESP錯(cuò)誤消息-Error
錯(cuò)誤消息類(lèi)型是RESP特定的數(shù)據(jù)類(lèi)型。實(shí)際上,錯(cuò)誤消息類(lèi)型和簡(jiǎn)單字符串類(lèi)型基本一致,只是其第一個(gè)字節(jié)為-。錯(cuò)誤消息類(lèi)型跟簡(jiǎn)單字符串類(lèi)型的最大區(qū)別是:錯(cuò)誤消息作為Redis服務(wù)端響應(yīng)的時(shí)候,對(duì)于客戶(hù)端而言應(yīng)該感知為異常,而錯(cuò)誤消息中的字符串內(nèi)容應(yīng)該感知為Redis服務(wù)端返回的錯(cuò)誤信息。錯(cuò)誤消息的編碼方式如下:
- (1)第一個(gè)字節(jié)為-。
- (2)緊接著的是一個(gè)不能包含CR或者LF字符的字符串。
- (3)以CRLF終止。
一個(gè)簡(jiǎn)單的例子如下:
-Error messagern 復(fù)制代碼
Redis服務(wù)端只有在真正發(fā)生錯(cuò)誤或者感知錯(cuò)誤的時(shí)候才會(huì)回復(fù)錯(cuò)誤消息,例如嘗試對(duì)錯(cuò)誤的數(shù)據(jù)類(lèi)型執(zhí)行操作或者命令不存在等等。Redis客戶(hù)端接收到錯(cuò)誤消息的時(shí)候,應(yīng)該觸發(fā)異常(一般情況就是直接拋出異常,可以根據(jù)錯(cuò)誤消息的內(nèi)容進(jìn)行異常分類(lèi))。下面是錯(cuò)誤消息響應(yīng)的一些例子:
-ERR unknown command 'foobar' -WRONGTYPE Operation against a key holding the wrong kind of value 復(fù)制代碼
-之后的第一個(gè)單詞到第一個(gè)空格或換行符之間的內(nèi)容,代表返回的錯(cuò)誤類(lèi)型。這只是Redis使用的約定,不是RESP錯(cuò)誤消息格式的一部分。
例如,ERR是通用錯(cuò)誤,WRONGTYPE則是更具體的錯(cuò)誤,表示客戶(hù)端試圖針對(duì)錯(cuò)誤的數(shù)據(jù)類(lèi)型執(zhí)行操作。這種定義方式稱(chēng)為錯(cuò)誤前綴,是一種使客戶(hù)端能夠理解服務(wù)器返回的錯(cuò)誤類(lèi)型的方法,而不必依賴(lài)于所給出的確切消息定義,該消息可能會(huì)隨時(shí)間而變化。
客戶(hù)端實(shí)現(xiàn)可以針對(duì)不同的錯(cuò)誤類(lèi)型返回不同種類(lèi)的異常,或者可以通過(guò)將錯(cuò)誤類(lèi)型的名稱(chēng)作為字符串直接提供給調(diào)用方來(lái)提供捕獲錯(cuò)誤的通用方法。
但是,不應(yīng)該將錯(cuò)誤消息分類(lèi)處理的功能視為至關(guān)重要的功能,因?yàn)樗饔貌⒉痪薮螅⑶矣行┑目蛻?hù)端實(shí)現(xiàn)可能會(huì)簡(jiǎn)單地返回特定值去屏蔽錯(cuò)誤消息作為通用的異常處理,例如直接返回false。
RESP整型數(shù)字-Integer
整型數(shù)字的編碼方式如下:
- (1)第一個(gè)字節(jié)為:。
- (2)緊接著的是一個(gè)不能包含CR或者LF字符的字符串,也就是數(shù)字要先轉(zhuǎn)換為字符序列,最終要輸出為字節(jié)。
- (3)以CRLF終止。
例如:
:0rn :1000rn 復(fù)制代碼
許多Redis命令返回整型數(shù)字,像INCR,LLEN和LASTSAVE命令等等。
返回的整型數(shù)字沒(méi)有特殊的含義,像INCR返回的是增量的總量,而LASTSAVE是UNIX時(shí)間戳。但是Redis服務(wù)端保證返回的整型數(shù)字在帶符號(hào)的64位整數(shù)范圍內(nèi)。
有些情況下,返回的整型數(shù)字會(huì)指代true或者false。如EXISTS或者SISMEMBER命令執(zhí)行返回1代表true,0代表false。
有些情況下,返回的整型數(shù)字會(huì)指代命令是否真正產(chǎn)生了效果。如SADD,SREM和SETNX命令執(zhí)行返回1代表命令執(zhí)行生效,0代表命令執(zhí)行不生效(等價(jià)于命令沒(méi)有執(zhí)行)。
下面的一組命令執(zhí)行后都是返回整型數(shù)字:SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD。
RESP定長(zhǎng)字符串-Bulk String
定長(zhǎng)字符串用于表示一個(gè)最大長(zhǎng)度為512MB的二進(jìn)制安全的字符串(Bulk,本身有體積大的含義)。定長(zhǎng)字符串的編碼方式如下:
- (1)第一個(gè)字節(jié)為$。
- (2)緊接著的是組成字符串的字節(jié)數(shù)長(zhǎng)度(稱(chēng)為prefixed length,也就是前綴長(zhǎng)度),前綴長(zhǎng)度分塊以CRLF終止。
- (3)然后是一個(gè)不能包含CR或者LF字符的字符串,也就是數(shù)字要先轉(zhuǎn)換為字符序列,最終要輸出為字節(jié)。
- (4)以CRLF終止。
舉個(gè)例子,doge使用定長(zhǎng)字符串編碼如下:
第一個(gè)字節(jié) 前綴長(zhǎng)度 CRLF 字符串內(nèi)容 CRLF 定長(zhǎng)字符串 $ 4 rn doge rn ===> $4rndogern foobar使用定長(zhǎng)字符串編碼如下:
第一個(gè)字節(jié) 前綴長(zhǎng)度 CRLF 字符串內(nèi)容 CRLF 定長(zhǎng)字符串 $ 6 rn foobar rn ===> $6rnfoobarrn 表示空字符串(Empty String,對(duì)應(yīng)Java中的"") 的時(shí)候,使用定長(zhǎng)字符串編碼如下:
第一個(gè)字節(jié) 前綴長(zhǎng)度 CRLF CRLF 定長(zhǎng)字符串 $ 0 rn rn ===> $0rnrn 定長(zhǎng)字符串也可以使用特殊的格式來(lái)表示Null值,指代值不存在。在這種特殊格式中,前綴長(zhǎng)度為-1,并且沒(méi)有數(shù)據(jù),因此使用定長(zhǎng)字符串對(duì)Null值進(jìn)行編碼如下:
第一個(gè)字節(jié) 前綴長(zhǎng)度 CRLF 定長(zhǎng)字符串 $ -1 rn ===> $-1rn 當(dāng)Redis服務(wù)端返回定長(zhǎng)字符串編碼的Null值的時(shí)候,客戶(hù)端不應(yīng)該返回空字符串,而應(yīng)該返回對(duì)應(yīng)編程語(yǔ)言中的Null對(duì)象。例如Ruby中對(duì)應(yīng)nil,C語(yǔ)言中對(duì)應(yīng)NULL,Java中對(duì)應(yīng)null,以此類(lèi)推。
RESP數(shù)組-Array
Redis客戶(hù)端使用RESP數(shù)組發(fā)送命令到Redis服務(wù)端。與此相似,某些Redis命令執(zhí)行完畢后服務(wù)端需要使用RESP數(shù)組類(lèi)型將元素集合返回給客戶(hù)端,如返回一個(gè)元素列表的LRANGE命令。RESP數(shù)組和我們認(rèn)知中的數(shù)組并不完全一致,它的編碼格式如下:
- (1)第一個(gè)字節(jié)為*。
- (2)緊接著的是組成RESP數(shù)組的元素個(gè)數(shù)(十進(jìn)制數(shù),但是最終需要轉(zhuǎn)換為字節(jié)序列,如10需要轉(zhuǎn)換為1和0兩個(gè)相鄰的字節(jié)),元素個(gè)數(shù)分塊以CRLF終止。
- (3)RESP數(shù)組的每個(gè)元素內(nèi)容,每個(gè)元素可以是任意的RESP數(shù)據(jù)類(lèi)型。
一個(gè)空的RESP數(shù)組的編碼如下:
*0rn 復(fù)制代碼
一個(gè)包含2個(gè)定長(zhǎng)字符串元素內(nèi)容分別為foo和bar的RESP數(shù)組的編碼如下:
*2rn$3rnfoorn$3rnbarrn 復(fù)制代碼
通用格式就是:*<count>CRLF作為RESP數(shù)組的前綴部分,而組成RESP數(shù)組的其他數(shù)據(jù)類(lèi)型的元素只是一個(gè)接一個(gè)地串聯(lián)在一起。例如一個(gè)包含3個(gè)整數(shù)類(lèi)型元素的RESP數(shù)組的編碼如下:
*3rn:1rn:2rn:3rn 復(fù)制代碼
RESP數(shù)組的元素不一定是同一種數(shù)據(jù)類(lèi)型,可以包含混合類(lèi)型的元素。例如下面是一個(gè)包含4個(gè)整數(shù)類(lèi)型元素和1個(gè)定長(zhǎng)字符串類(lèi)型元素(一共有5個(gè)元素)的RESP數(shù)組的編碼(為了看得更清楚,分多行進(jìn)行編碼,實(shí)際上不能這樣做):
# 元素個(gè)數(shù) *5rn # 第1個(gè)整型類(lèi)型的元素 :1rn # 第2個(gè)整型類(lèi)型的元素 :2rn # 第3個(gè)整型類(lèi)型的元素 :3rn # 第4個(gè)整型類(lèi)型的元素 :4rn # 定長(zhǎng)字符串類(lèi)型的元素 $6rn foobarrn 復(fù)制代碼
Redis服務(wù)端響應(yīng)報(bào)的首行*5rn定義了后面會(huì)緊跟著5個(gè)回復(fù)數(shù)據(jù),然后每個(gè)回復(fù)數(shù)據(jù)分別作元素項(xiàng),構(gòu)成了用于傳輸?shù)亩嘣囟ㄩL(zhǎng)回復(fù)(Multi Bulk Reply,感覺(jué)比較難翻譯,這里的大概意思就是每個(gè)回復(fù)行都是整個(gè)回復(fù)報(bào)中的一個(gè)項(xiàng))。
這里可以類(lèi)比為Java中的ArrayList(泛型擦除),有點(diǎn)類(lèi)似于下面的偽代碼:
List encode = new ArrayList();
// 添加元素個(gè)數(shù)
encode.add(elementCount);
encode.add(CRLF);
// 添加第1個(gè)整型類(lèi)型的元素 - 1
encode.add(':');
encode.add(1);
encode.add(CRLF);
// 添加第2個(gè)整型類(lèi)型的元素 - 2
encode.add(':');
encode.add(2);
encode.add(CRLF);
// 添加第3個(gè)整型類(lèi)型的元素 - 3
encode.add(':');
encode.add(3);
encode.add(CRLF);
// 添加第4個(gè)整型類(lèi)型的元素 - 4
encode.add(':');
encode.add(4);
encode.add(CRLF);
// 添加定長(zhǎng)字符串類(lèi)型的元素
encode.add('$');
// 前綴長(zhǎng)度
encode.add(6);
// 字符串內(nèi)容
encode.add("foobar");
encode.add(CRLF);
復(fù)制代碼
RESP數(shù)組中也存在Null值的概念,下面稱(chēng)為RESP Null Array。處于歷史原因,RESP數(shù)組中采用了另一種特殊的編碼格式定義Null值,區(qū)別于定長(zhǎng)字符串中的Null值字符串。例如,BLPOP命令執(zhí)行超時(shí)的時(shí)候,就會(huì)返回一個(gè)RESP Null Array類(lèi)型的響應(yīng)。RESP Null Array的編碼如下:
*-1rn 復(fù)制代碼
當(dāng)Redis服務(wù)端的回復(fù)是RESP Null Array類(lèi)型的時(shí)候,客戶(hù)端應(yīng)該返回一個(gè)Null對(duì)象,而不是一個(gè)空數(shù)組或者空列表。這一點(diǎn)比較重要,它是區(qū)分回復(fù)是空數(shù)組(也就是命令正確執(zhí)行完畢,返回結(jié)果正常)或者其他原因(如BLPOP命令的超時(shí)等)的關(guān)鍵。
RESP數(shù)組的元素也可以是RESP數(shù)組,下面是一個(gè)包含2個(gè)RESP數(shù)組類(lèi)型的元素的RESP數(shù)組,編碼如下(為了看得更清楚,分多行進(jìn)行編碼,實(shí)際上不能這樣做):
# 元素個(gè)數(shù) *2rn # 第1個(gè)RESP數(shù)組元素 *3rn :1rn :2rn :3rn # 第2個(gè)RESP數(shù)組元素 *2rn +Foorn -Barrn 復(fù)制代碼
上面的RESP數(shù)組的包含2個(gè)RESP數(shù)組類(lèi)型的元素,第1個(gè)RESP數(shù)組元素包含3個(gè)整型類(lèi)型的元素,而第2個(gè)RESP數(shù)組元素包含1個(gè)簡(jiǎn)單字符串類(lèi)型的元素和1個(gè)錯(cuò)誤消息類(lèi)型的元素。
RESP數(shù)組中的Null元素
RESP數(shù)組中的單個(gè)元素也有Null值的概念,下面稱(chēng)為Null元素。Redis服務(wù)端回復(fù)如果是RESP數(shù)組類(lèi)型,并且RESP數(shù)組中存在Null元素,那么意味著元素丟失,絕對(duì)不能用空字符串替代。缺少指定鍵的前提下,當(dāng)與GET模式選項(xiàng)一起使用時(shí),SORT命令可能會(huì)發(fā)生這種情況。
下面是一個(gè)包含Null元素的RESP數(shù)組的例子(為了看得更清楚,分多行進(jìn)行編碼,實(shí)際上不能這樣做):
*3rn $3rn foorn $-1rn $3rn barrn 復(fù)制代碼
RESP數(shù)組中的第2個(gè)元素是Null元素,客戶(hù)端API最終返回的內(nèi)容應(yīng)該是:
# Ruby ["foo",nil,"bar"] # Java ["foo",null,"bar"] 復(fù)制代碼
RESP其他相關(guān)內(nèi)容
主要包括:
- 將命令發(fā)送到Redis服務(wù)端的示例。
- 批量命令與管道。
- 內(nèi)聯(lián)命令(Inline Commands)。
其實(shí)文檔中還有一節(jié)使用C語(yǔ)言編寫(xiě)高性能RESP解析器,這里不做翻譯,因?yàn)檎莆誖ESP的相關(guān)內(nèi)容后,可以基于任何語(yǔ)言編寫(xiě)解析器。
將命令發(fā)送到Redis服務(wù)端
如果已經(jīng)相對(duì)熟悉RESP中的序列化格式,那么編寫(xiě)Redis客戶(hù)端類(lèi)庫(kù)就會(huì)變得很容易。我們可以進(jìn)一步指定客戶(hù)端和服務(wù)器之間的交互方式:
- Redis客戶(hù)端向Redis服務(wù)端發(fā)送僅僅包含定長(zhǎng)字符串類(lèi)型元素的RESP數(shù)組。
- Redis服務(wù)端可以采用任意一種RESP數(shù)據(jù)類(lèi)型向Redis客戶(hù)端進(jìn)行回復(fù),具體的數(shù)據(jù)類(lèi)型一般取決于命令類(lèi)型。
下面是典型的交互例子:Redis客戶(hù)端發(fā)送命令LLEN mylist以獲得KEY為mylist的長(zhǎng)度,Redis服務(wù)端將以整數(shù)類(lèi)型進(jìn)行回復(fù),如以下示例所示(C是客戶(hù)端,S服務(wù)器),偽代碼如下:
C: *2rn C: $4rn C: LLENrn C: $6rn C: mylistrn S: :48293rn 復(fù)制代碼
為了簡(jiǎn)單起見(jiàn),我們使用換行符來(lái)分隔協(xié)議的不同部分(這里指上面的代碼分行展示),但是實(shí)際交互的時(shí)候Redis客戶(hù)端在發(fā)送*2rn$4rnLLENrn$6rnmylistrn的時(shí)候是整體發(fā)送的。
批量命令與管道
Redis客戶(hù)端可以使用相同的連接發(fā)送批量命令。Redis支持管道特性,因此Redis客戶(hù)端可以通過(guò)一次寫(xiě)操作發(fā)送多個(gè)命令,而無(wú)需在發(fā)送下一個(gè)命令之前讀取Redis服務(wù)端對(duì)上一個(gè)命令的回復(fù)。批量發(fā)送命令之后,所有的回復(fù)可以在最后得到(合并為一個(gè)回復(fù))。更多相關(guān)信息可以查看Using pipelining to speedup Redis queries。
內(nèi)聯(lián)命令
有些場(chǎng)景下,我們可能只有telnet命令可以使用,在這種條件下,我們需要發(fā)送命令到Redis服務(wù)端。盡管Redis協(xié)議易于實(shí)現(xiàn),但在交互式會(huì)話(huà)中并不理想,并且redis-cli有些情況下不一定可用。處于這類(lèi)原因,Redis設(shè)計(jì)了一種專(zhuān)為人類(lèi)設(shè)計(jì)的命令格式,稱(chēng)為內(nèi)聯(lián)命令(Inline Command格式。
以下是服務(wù)器/客戶(hù)端使用內(nèi)聯(lián)命令進(jìn)行聊天的示例(S代表服務(wù)端,C代表客戶(hù)端):
C: PING S: +PONG 復(fù)制代碼
以下是使用內(nèi)聯(lián)命令返回整數(shù)的另一個(gè)示例:
C: EXISTS somekey S: :0 復(fù)制代碼
基本上只需在telnet會(huì)話(huà)中編寫(xiě)以空格分隔的參數(shù)。由于除了統(tǒng)一的請(qǐng)求協(xié)議之外沒(méi)有命令會(huì)以*開(kāi)頭,Redis能夠檢測(cè)到這種情況并解析輸入的命令。
基于RESP編寫(xiě)高性能解析器
因?yàn)镴DK原生提供的字節(jié)緩沖區(qū)java.nio.ByteBuffer存在不能自動(dòng)擴(kuò)容、需要切換讀寫(xiě)模式等等問(wèn)題,這里直接引入Netty并且使用Netty提供的ByteBuf進(jìn)行RESP數(shù)據(jù)類(lèi)型解析。編寫(xiě)本文的時(shí)候(2019-10-09)Netty的最新版本為4.1.42.Final。引入依賴(lài):
<dependency> <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> <version>4.1.42.Final</version> </dependency> 復(fù)制代碼
定義解碼器接口:
public interface RespDecoder<V>{
V decode(ByteBuf buffer);
}
復(fù)制代碼
定義常量:
public class RespConstants {
public static final Charset ASCII = StandardCharsets.US_ASCII;
public static final Charset UTF_8 = StandardCharsets.UTF_8;
public static final byte DOLLAR_BYTE = '$';
public static final byte ASTERISK_BYTE = '*';
public static final byte PLUS_BYTE = '+';
public static final byte MINUS_BYTE = '-';
public static final byte COLON_BYTE = ':';
public static final String EMPTY_STRING = "";
public static final Long ZERO = 0L;
public static final Long NEGATIVE_ONE = -1L;
public static final byte CR = (byte) 'r';
public static final byte LF = (byte) 'n';
public static final byte[] CRLF = "rn".getBytes(ASCII);
public enum ReplyType {
SIMPLE_STRING,
ERROR,
INTEGER,
BULK_STRING,
RESP_ARRAY
}
}
復(fù)制代碼
下面的章節(jié)中解析模塊的實(shí)現(xiàn)已經(jīng)忽略第一個(gè)字節(jié)的解析,因?yàn)榈谝粋€(gè)字節(jié)是決定具體的數(shù)據(jù)類(lèi)型。
解析簡(jiǎn)單字符串
簡(jiǎn)單字符串類(lèi)型就是單行字符串,它的解析結(jié)果對(duì)應(yīng)的就是Java中的String類(lèi)型。解碼器實(shí)現(xiàn)如下:
// 解析單行字符串
public class LineStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
return CodecUtils.X.readLine(buffer);
}
}
public enum CodecUtils {
X;
public int findLineEndIndex(ByteBuf buffer) {
int index = buffer.forEachByte(ByteProcessor.FIND_LF);
return (index > 0 && buffer.getByte(index - 1) == 'r') ? index : -1;
}
public String readLine(ByteBuf buffer) {
int lineEndIndex = findLineEndIndex(buffer);
if (lineEndIndex > -1) {
int lineStartIndex = buffer.readerIndex();
// 計(jì)算字節(jié)長(zhǎng)度
int size = lineEndIndex - lineStartIndex - 1;
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
// 重置讀游標(biāo)為rn之后的第一個(gè)字節(jié)
buffer.readerIndex(lineEndIndex + 1);
buffer.markReaderIndex();
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
復(fù)制代碼
這里抽取出一個(gè)類(lèi)LineStringDecoder用于解析單行字符串,這樣在解析錯(cuò)誤消息的時(shí)候可以做一次繼承即可。測(cè)試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// +OKrn
buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:OK
復(fù)制代碼
解析錯(cuò)誤消息
錯(cuò)誤消息的本質(zhì)也是單行字符串,所以其解碼的實(shí)現(xiàn)可以和簡(jiǎn)單字符串的解碼實(shí)現(xiàn)一致。錯(cuò)誤消息數(shù)據(jù)類(lèi)型的解碼器如下:
public class RespErrorDecoder extends LineStringDecoder {
}
復(fù)制代碼
測(cè)試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// -ERR unknown command 'foobar'rn
buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:ERR unknown command 'foobar'
復(fù)制代碼
解析整型數(shù)字
整型數(shù)字類(lèi)型,本質(zhì)就是需要從字節(jié)序列中還原出帶符號(hào)的64bit的長(zhǎng)整型,因?yàn)槭菐Х?hào)的,類(lèi)型標(biāo)識(shí)位:后的第一個(gè)字節(jié)需要判斷是否負(fù)數(shù)字符-,因?yàn)槭菑淖笙蛴医馕觯缓竺拷馕龀鲆粋€(gè)新的位,當(dāng)前的數(shù)字值要乘10。其解碼器的實(shí)現(xiàn)如下:
public class RespIntegerDecoder implements RespDecoder<Long> {
@Override
public Long decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
// 沒(méi)有行尾,異常
if (-1 == lineEndIndex) {
return null;
}
long result = 0L;
int lineStartIndex = buffer.readerIndex();
boolean negative = false;
byte firstByte = buffer.getByte(lineStartIndex);
// 負(fù)數(shù)
if (RespConstants.MINUS_BYTE == firstByte) {
negative = true;
} else {
int digit = firstByte - '0';
result = result * 10 + digit;
}
for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
byte value = buffer.getByte(i);
int digit = value - '0';
result = result * 10 + digit;
}
if (negative) {
result = -result;
}
// 重置讀游標(biāo)為rn之后的第一個(gè)字節(jié)
buffer.readerIndex(lineEndIndex + 1);
return result;
}
}
復(fù)制代碼
整型數(shù)字類(lèi)型的解析相對(duì)復(fù)雜,一定要注意負(fù)數(shù)判斷。測(cè)試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// :-1000rn
buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
Long value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:-1000
復(fù)制代碼
解析定長(zhǎng)字符串
定長(zhǎng)字符串類(lèi)型解析的關(guān)鍵是先讀取類(lèi)型標(biāo)識(shí)符$后的第一個(gè)字節(jié)序列分塊解析成64bit帶符號(hào)的整數(shù),用來(lái)確定后面需要解析的字符串內(nèi)容的字節(jié)長(zhǎng)度,然后再按照該長(zhǎng)度讀取后面的字節(jié)。其解碼器實(shí)現(xiàn)如下:
public class RespBulkStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 使用RespIntegerDecoder讀取長(zhǎng)度
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Bulk Null String
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Bulk Empty String
if (RespConstants.ZERO.equals(length)) {
return RespConstants.EMPTY_STRING;
}
// 真實(shí)字節(jié)內(nèi)容的長(zhǎng)度
int readLength = (int) length.longValue();
if (buffer.readableBytes() > readLength) {
byte[] bytes = new byte[readLength];
buffer.readBytes(bytes);
// 重置讀游標(biāo)為rn之后的第一個(gè)字節(jié)
buffer.readerIndex(buffer.readerIndex() + 2);
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
復(fù)制代碼
測(cè)試一下:
public static void main(String[] args) throws Exception{
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// $6rnthrowablern
buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("$9".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:throwable
復(fù)制代碼
解析RESP數(shù)組
RESP數(shù)組類(lèi)型解析的關(guān)鍵:
- 先讀取類(lèi)型標(biāo)識(shí)符*后的第一個(gè)字節(jié)序列分塊解析成64bit帶符號(hào)的整數(shù),確定數(shù)組中的元素個(gè)數(shù)。
- 遞歸解析每個(gè)元素。
參考過(guò)不少Redis協(xié)議解析框架,不少是用棧或者狀態(tài)機(jī)實(shí)現(xiàn),這里先簡(jiǎn)單點(diǎn)用遞歸實(shí)現(xiàn),解碼器代碼如下:
public class RespArrayDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 解析元素個(gè)數(shù)
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Null Array
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Array Empty List
if (RespConstants.ZERO.equals(length)) {
return Lists.newArrayList();
}
List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
// 遞歸
for (int i = 0; i < length; i++) {
result.add(DefaultRespCodec.X.decode(buffer));
}
return result;
}
}
復(fù)制代碼
測(cè)試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
//*2rn$3rnfoorn$3rnbarrn
buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("*2".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("foo".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("bar".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
List value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:[foo, bar]
復(fù)制代碼
小結(jié)
對(duì)RESP的內(nèi)容和其編碼解碼的過(guò)程有相對(duì)深刻的認(rèn)識(shí)后,就可以基于Netty編寫(xiě)Redis服務(wù)的編碼解碼模塊,作為Netty入門(mén)的十分有意義的例子。本文的最后一節(jié)只演示了RESP的解碼部分,編碼模塊和更多細(xì)節(jié)會(huì)在另一篇用Netty實(shí)現(xiàn)Redis客戶(hù)端的文章中展示。
參考資料:
- Redis Protocol specification
鏈接
希望你能讀到這里,然后發(fā)現(xiàn)我:
- Github Page:www.throwable.club/2019/10/09/…
- Coding Page:throwable.coding.me/2019/10/09/…
附錄
本文涉及的所有代碼:
public class RespConstants {
public static final Charset ASCII = StandardCharsets.US_ASCII;
public static final Charset UTF_8 = StandardCharsets.UTF_8;
public static final byte DOLLAR_BYTE = '$';
public static final byte ASTERISK_BYTE = '*';
public static final byte PLUS_BYTE = '+';
public static final byte MINUS_BYTE = '-';
public static final byte COLON_BYTE = ':';
public static final String EMPTY_STRING = "";
public static final Long ZERO = 0L;
public static final Long NEGATIVE_ONE = -1L;
public static final byte CR = (byte) 'r';
public static final byte LF = (byte) 'n';
public static final byte[] CRLF = "rn".getBytes(ASCII);
public enum ReplyType {
SIMPLE_STRING,
ERROR,
INTEGER,
BULK_STRING,
RESP_ARRAY
}
}
public enum CodecUtils {
X;
public int findLineEndIndex(ByteBuf buffer) {
int index = buffer.forEachByte(ByteProcessor.FIND_LF);
return (index > 0 && buffer.getByte(index - 1) == 'r') ? index : -1;
}
public String readLine(ByteBuf buffer) {
int lineEndIndex = findLineEndIndex(buffer);
if (lineEndIndex > -1) {
int lineStartIndex = buffer.readerIndex();
// 計(jì)算字節(jié)長(zhǎng)度
int size = lineEndIndex - lineStartIndex - 1;
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
// 重置讀游標(biāo)為rn之后的第一個(gè)字節(jié)
buffer.readerIndex(lineEndIndex + 1);
buffer.markReaderIndex();
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public interface RespCodec {
RespCodec X = DefaultRespCodec.X;
<IN, OUT> OUT decode(ByteBuf buffer);
<IN, OUT> ByteBuf encode(IN in);
}
public enum DefaultRespCodec implements RespCodec {
X;
static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap();
private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder();
static {
DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder());
DECODERS.put(ReplyType.ERROR, new RespErrorDecoder());
DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder());
DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder());
DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder());
}
@SuppressWarnings("unchecked")
@Override
public <IN, OUT> OUT decode(ByteBuf buffer) {
return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer);
}
private ReplyType determineReplyType(ByteBuf buffer) {
byte firstByte = buffer.readByte();
ReplyType replyType;
switch (firstByte) {
case RespConstants.PLUS_BYTE:
replyType = ReplyType.SIMPLE_STRING;
break;
case RespConstants.MINUS_BYTE:
replyType = ReplyType.ERROR;
break;
case RespConstants.COLON_BYTE:
replyType = ReplyType.INTEGER;
break;
case RespConstants.DOLLAR_BYTE:
replyType = ReplyType.BULK_STRING;
break;
case RespConstants.ASTERISK_BYTE:
replyType = ReplyType.RESP_ARRAY;
break;
default: {
throw new IllegalArgumentException("first byte:" + firstByte);
}
}
return replyType;
}
@Override
public <IN, OUT> ByteBuf encode(IN in) {
// TODO
throw new UnsupportedOperationException("encode");
}
}
public interface RespDecoder<V> {
V decode(ByteBuf buffer);
}
public class DefaultRespDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
throw new IllegalStateException("decoder");
}
}
public class LineStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
return CodecUtils.X.readLine(buffer);
}
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
public class RespErrorDecoder extends LineStringDecoder {
}
public class RespIntegerDecoder implements RespDecoder<Long> {
@Override
public Long decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
// 沒(méi)有行尾,異常
if (-1 == lineEndIndex) {
return null;
}
long result = 0L;
int lineStartIndex = buffer.readerIndex();
boolean negative = false;
byte firstByte = buffer.getByte(lineStartIndex);
// 負(fù)數(shù)
if (RespConstants.MINUS_BYTE == firstByte) {
negative = true;
} else {
int digit = firstByte - '0';
result = result * 10 + digit;
}
for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
byte value = buffer.getByte(i);
int digit = value - '0';
result = result * 10 + digit;
}
if (negative) {
result = -result;
}
// 重置讀游標(biāo)為rn之后的第一個(gè)字節(jié)
buffer.readerIndex(lineEndIndex + 1);
return result;
}
}
public class RespBulkStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Bulk Null String
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Bulk Empty String
if (RespConstants.ZERO.equals(length)) {
return RespConstants.EMPTY_STRING;
}
// 真實(shí)字節(jié)內(nèi)容的長(zhǎng)度
int readLength = (int) length.longValue();
if (buffer.readableBytes() > readLength) {
byte[] bytes = new byte[readLength];
buffer.readBytes(bytes);
// 重置讀游標(biāo)為rn之后的第一個(gè)字節(jié)
buffer.readerIndex(buffer.readerIndex() + 2);
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public class RespArrayDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 解析元素個(gè)數(shù)
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Null Array
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Array Empty List
if (RespConstants.ZERO.equals(length)) {
return Lists.newArrayList();
}
List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
// 遞歸
for (int i = 0; i < length; i++) {
result.add(DefaultRespCodec.X.decode(buffer));
}
return result;
}
}
作者:Throwable
鏈接:https://juejin.im/post/5d9dec2b6fb9a04e0a37edd5






