本文介紹了我們有沒有辦法暫停卡夫卡流一段時間,然后再恢復?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我們有一個要求,我們使用Kafka Streams讀取Kafka主題,然后通過一個會話池通過網絡發送數據。然而,有時網絡調用有點慢,我們需要頻繁地暫停流,以確保我們沒有使網絡超載。目前,我們將數據捕獲到流中,并將其加載到Executor服務,然后通過會話池通過網絡發送。
如果Executor服務中的數據太高,我們需要暫停流一段時間,然后在Executor服務上的積壓清理完畢后恢復它。為了實現此暫停機制,我們當前正在關閉流,并在清除積壓后重新啟動。
有什么方法可以暫停Kafka流嗎?
推薦答案
如果我理解正確的話,您沒有什么特別需要做的。你說的是”背壓”,而Kafka Streams可以開箱即用。
可以做的是將該數據放入某個最大大小的隊列中,并使用該隊列加載Executor服務。當隊列達到某個閾值時,有兩種方法:
如果您將數據放入隊列的調用在沒有超時的情況下被阻塞,則無需再做任何操作。只要等系統恢復在線,你的電話
返回,處理將繼續。
如果將數據放入隊列的調用因超時而阻塞,只需執行查找以檢查隊列的大小。重復此操作,直到系統重新聯機,您的呼叫成功。
唯一的警告是,只要您的Streams應用程序阻止,內部使用的Kafka消費者客戶端就不會向Kafka發送任何心跳信號,并且可能會超時。因此,您需要將超時配置參數設置為高于外部系統的預期最長停機時間。
另一種方法是使用Kafka-Streams中提供的處理器API,但這通常不是推薦的模式。
如果有幫助,請讓我知道!!
這篇關于我們有沒有辦法暫停卡夫卡流一段時間,然后再恢復?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,






