本文介紹了如何使用Quarkus在Kafka中設(shè)置同一主題中的多個(gè)消費(fèi)者的處理方法,對(duì)大家解決問題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!
問題描述
我正在使用Quarkus框架構(gòu)建一個(gè)Kafka消費(fèi)者,它將讀取帶有3個(gè)分區(qū)的主題。下面的代碼片段正在工作,但根據(jù)日志,我只是啟動(dòng)了具有3個(gè)分區(qū)的1個(gè)使用者。我現(xiàn)在的問題是,一旦我運(yùn)行我的應(yīng)用程序,我如何才能產(chǎn)生3個(gè)消費(fèi)者。
@Incoming("topic-1")
public CompletionStage<Void> onMessage(KafkaRecord<String, String> message) throws IOException {
LOG.info("Kafka order message with value = {} arrived from topic {} ", message.getPayload(),
message.getTopic());
//JsonObject event = new JsonObject(message.getPayload());
try {
if (true) {
LOG.info("Kafka message: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
return message.ack();
}
請(qǐng)參閱示例日志:
信息[org.apa.kaf.cli.con.int.Consumer協(xié)調(diào)器](vert.x-kafka-Consumer-線程-0)[消費(fèi)者客戶端ID=測(cè)試消費(fèi)者,組ID=kafka-檢測(cè)-消費(fèi)者]已完成第64代的組的分配:{testconsumer-bf6d314c-44e1-47b1-9439-fe4058951841=Assignment(partitions=[test_part-0,TEST_PART-1,TEST_PART-2])}
推薦答案
如果您在Containers平臺(tái)(Docker、K8s…)上運(yùn)行應(yīng)用程序然后,您可以橫向擴(kuò)展您的服務(wù);否則,請(qǐng)使用不同的端口再次運(yùn)行您的應(yīng)用程序。
Kafka客戶端啟動(dòng)時(shí)會(huì)被分配到某個(gè)分區(qū),因此同一個(gè)客戶端不能從多個(gè)Theme-Partition消費(fèi)。
這篇關(guān)于如何使用Quarkus在Kafka中設(shè)置同一主題中的多個(gè)消費(fèi)者的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,