亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.430618.com 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

本文介紹了如何避免使用Kafka流丟失消息的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!

問題描述

我們有一個Streams應(yīng)用程序,它使用源主題中的消息,執(zhí)行一些處理并將結(jié)果轉(zhuǎn)發(fā)到目標(biāo)主題。

消息的結(jié)構(gòu)由某些Avro架構(gòu)控制。

當(dāng)開始使用消息時,如果架構(gòu)尚未緩存,應(yīng)用程序?qū)L試從架構(gòu)注冊表中檢索它。如果由于任何原因架構(gòu)注冊表不可用(例如網(wǎng)絡(luò)故障),則當(dāng)前正在處理的消息將丟失,因為默認(rèn)處理程序是名為LogAndContinueExceptionHandler的處理程序。

o.a.k.s.e.LogAndContinueExceptionHandler : Exception caught during Deserialization, taskId: 1_5, topic: my.topic.v1, partition: 5, offset: 142768
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 62
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
...
o.a.k.s.p.internals.RecordDeserializer : stream-thread [my-app-StreamThread-3] task [1_5] Skipping record due to deserialization error. topic=[my.topic.v1] partition=[5] offset=[142768]
...

因此,我的問題是,處理上述情況的正確方法是什么,并確保無論如何都不會丟失消息。是否有現(xiàn)成的LogAndRollbackExceptionHandler錯誤處理程序或?qū)崿F(xiàn)您自己的方法?

提前感謝您的投入。

推薦答案

我在Kafka上的工作不是很多,但當(dāng)我工作時,我記得遇到了您在我們系統(tǒng)中描述的問題。

讓我告訴你我們是如何處理我們的場景的,也許這也會對你有所幫助:

場景1:如果您的消息在發(fā)布方(Publisher–&>Kafka)丟失,您可以根據(jù)需要配置Kafka確認(rèn)設(shè)置,如果您使用的是SpringCloud Stream和Kafka,則該屬性為spring.cloud.stream.kafka.binder.required-acks

可能的值:

    最多一次(Ack=0)

      出版商不在乎卡夫卡是否承認(rèn)。
      發(fā)送并忘記
      可能會丟失數(shù)據(jù)

    至少一次(Ack=1)

      如果Kafka不確認(rèn),出版商將重新發(fā)送消息。

      可能存在重復(fù)。

      在將郵件復(fù)制到副本之前發(fā)送確認(rèn)。

    正好一次(Ack=All)

      如果Kafka不確認(rèn),出版商將重新發(fā)送消息。

      但是,如果一條消息多次發(fā)送到Kafka,則不會有重復(fù)。

      內(nèi)部序列號,用于判斷消息是否已經(jīng)寫入主題。

      需要設(shè)置Min.insync.Replicas屬性,以確保在Kafka向生產(chǎn)者確認(rèn)之前,需要同步的最小復(fù)制數(shù)是多少。

場景2:如果您的數(shù)據(jù)在消費者端丟失(Kafka–&>Consumer),您可以根據(jù)您的使用情況更改Kafka的自動提交功能。如果您使用的是Spring Cloud Streamspring.cloud.stream.kafka.bindings.input.consumer.AutoCommitOffset.

,則為該屬性

默認(rèn)情況下,在Kafka中AutoCommittee Offset為True,并且發(fā)送給消費者的每一條消息在Kafka的末尾都是&Quot;Submitted&Quot;,這意味著它不會被再次發(fā)送。但是,如果您將AutoCommittee Offset更改為False,您將有權(quán)在代碼中輪詢來自Kafka的消息,并且一旦您完成工作,將Commit顯式設(shè)置為True,以讓Kafka知道您現(xiàn)在已經(jīng)完成了消息。

如果消息未提交,Kafka將繼續(xù)重新發(fā)送,直到消息提交為止。

希望這對您有所幫助,或至少為您指明正確的方向。

這篇關(guān)于如何避免使用Kafka流丟失消息的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,

分享到:
標(biāo)簽:Kafka 丟失 消息
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定