一個挺著啤酒肚,身穿格子衫,發(fā)際線嚴重后移的中年男子,手拿著保溫杯,胳膊夾著macBook向你走來,看樣子是架構(gòu)師級別。
面試開始,直入正題。
面試官: 我看到你的簡歷上寫著項目中用到了消息隊列,還用的是kafka,你有遇到過消息隊列丟失消息的情況嗎?
我: 消息隊列還能丟失消息?那誰還用消息隊列!你是不是搞錯了?我沒遇到過丟失消息的情況,也沒考慮過這個問題。
面試官: 嗯...,小伙子,看來有些面試套路,你還是不太懂。今天面試就先到這里吧!給你的簡歷,我送你下樓。
我去!面試還有啥套路?
能不能少一點套路,多一點真誠!
難道都要去背一遍八股文才能參加面試?
好吧,我去瞅一眼一燈總結(jié)的面試八股文。
我: 消息隊列發(fā)送消息和消費消息的過程,共分為三段,生產(chǎn)過程、服務(wù)端持久化過程、消費過程,如下圖所示。
這三個過程都有可能弄丟消息。
面試官: 嗯,消息丟失的具體原因是什么?怎么防止丟失消息呢?
我: 我詳細說一下這種情況:
一、生產(chǎn)過程丟失消息
丟失原因:一般可能是網(wǎng)絡(luò)故障,導(dǎo)致消息沒有發(fā)送出去。
解決方案:重發(fā)就行了。
由于kafka為了提高性能,采用了異步發(fā)送消息。我們只有獲取到發(fā)送結(jié)果,才能確保消息發(fā)送成功。 有兩個方案可以獲取發(fā)送結(jié)果。
一種是kafka把發(fā)送結(jié)果封裝在Future對象中,我可以使用Future的get方法同步阻塞獲取結(jié)果。
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, message));
try {
RecordMetadata recordMetadata = future.get();
if (recordMetadata != null) {
System.out.println("發(fā)送成功");
}
} catch (Exception e) {
e.printStackTrace();
}
另一種是使用kafka的callback函數(shù)獲取返回結(jié)果。
producer.send(new ProducerRecord<>(topic, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("發(fā)送成功");
} else {
System.out.println("發(fā)送失敗");
}
}
});
如果發(fā)送失敗了,有兩種重試方案:
- 手動重試 在catch邏輯或else邏輯中,再調(diào)用一次send方法。如果還不成功怎么辦? 在數(shù)據(jù)庫中建一張異常消息表,把失敗消息存入表中,然后搞個異步任務(wù)重試,便于控制重試次數(shù)和間隔時間。
- 自動重試 kafka支持自動重試,設(shè)置參數(shù)如下,當(dāng)集群Leader選舉中或者Follower數(shù)量不足等原因返回失敗時,就可以自動重試。
- # 設(shè)置重試次數(shù)為3
retries = 3# 設(shè)置重試間隔為100msretry.backoff.ms = 100 - 一般我們不會用kafka自動重試,因為超過重試次數(shù),還是會返回失敗,還需要我們手動重試。
二、服務(wù)端持久化過程丟失消息
為了保證性能,kafka采用的是異步刷盤,當(dāng)我們發(fā)送消息成功后,Broker節(jié)點在刷盤之前宕機了,就會導(dǎo)致消息丟失。
當(dāng)然我們也可以設(shè)置刷盤頻率:
# 設(shè)置每1000條消息刷一次盤
flush.messages = 1000
# 設(shè)置每秒刷一次盤
flush.ms = 1000
先普及一下kafka集群的架構(gòu)模型:
kafka集群由多個broker組成,一個broker就是一個節(jié)點(機器)。 一個topic有多個partition(分區(qū)),每個partition分布在不同的broker上面,可以充分利用分布式機器性能,擴容時只需要加機器、加partition就行了。
一個partition又有多個replica(副本),有一個leader replica(主副本)和多個follower replica(從副本),這樣設(shè)計是為了保證數(shù)據(jù)的安全性。
發(fā)送消息和消費消息都在leader上面,follower負責(zé)定時從leader上面拉取消息,只有follower從leader上面把這條消息拉取回來,才算生產(chǎn)者發(fā)送消息成功。
kafka為了加快持久化消息的性能,把性能較好的follower組成一個ISR列表(in-sync replica),把性能較差的follower組成一個OSR列表(out-of-sync replica),ISR+OSR=AR(assigned repllicas)。 如果某個follower一段時間沒有向leader拉取消息,落后leader太多,就把它移出ISR,放到OSR之中。 如果某個follower追上了leader,又會把它重新放到ISR之中。 如果leader掛掉,就會從ISR之中選一個follower做leader。
為了提升持久化消息性能,我們可以進行一些設(shè)置:
# 如果follower超過一秒沒有向leader拉取消息,就把它移出ISR列表
rerplica.lag.time.max.ms = 1000
# 如果follower落后leader一千條消息,就把它移出ISR列表
rerplica.lag.max.messages = 1000
# 至少保證ISR中有3個follower
min.insync.replicas = 3
# 異步消息,不需要leader確認,立即給生產(chǎn)者返回發(fā)送成功,丟失消息概率較大
asks = 0
# leader把消息寫入本地日志中,不會等所有follower確認,就給生產(chǎn)者返回發(fā)送成功,小概率丟失消息
asks = 1
# leader需要所有ISR中follower確認,才給生產(chǎn)者返回發(fā)送成功,不會丟失消息
asks = -1 或者 asks = all
三、消費過程丟失消息
kafka中有個offset的概念,consumer從partition中拉取消息,consumer本地處理完成后需要commit一下offset,表示消費完成,下次就不會再拉取到這條消息。
所以我們需要關(guān)閉自動commit offset的配置,防止consumer拉到消息后,服務(wù)宕機,導(dǎo)致消息丟失。
enable.auto.commit = false
面試官: 還得是你,就你總結(jié)的全,我都想不那么全,明天來上班吧,薪資double。
本文知識點總結(jié):