本文介紹了將Kafka Streams與Serde結(jié)合使用,這些Serde依賴于標(biāo)頭中的架構(gòu)引用的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!
問題描述
我正在嘗試使用Kafka Streams對CDC數(shù)據(jù)執(zhí)行KTable-KTable外鍵聯(lián)接。我將讀取的數(shù)據(jù)是Avro格式的,但是它被序列化的方式與其他行業(yè)序列化程序/反序列化程序(例如。合流架構(gòu)注冊表),因?yàn)榧軜?gòu)標(biāo)識符存儲在標(biāo)頭中。
當(dāng)我設(shè)置KTables的Serdes時,我的Kafka Streams應(yīng)用程序最初運(yùn)行,但最終失敗,因?yàn)樗趦?nèi)部調(diào)用了帶有byte[] serialize(String topic, T data);
的序列化程序方法,而不是帶有標(biāo)頭的方法(即。byte[] serialize(String topic, Headers headers, T data)
在包裝序列化程序ValueAndTimestampSerializer中。我正在使用的Serdes無法處理此問題并引發(fā)異常。
第一個問題是,有沒有人知道如何懇求Kafka Streams在內(nèi)部調(diào)用帶有正確方法簽名的方法?
我正在探索解決這個問題的方法,包括編寫新的Serde,用消息本身中的模式標(biāo)識符重新序列化。這可能涉及將數(shù)據(jù)重新復(fù)制到新主題或使用攔截器。
但是,我知道ValueTransformer
可以訪問ProcessorContext
中的標(biāo)頭,我想知道是否有更快的方法使用transformValues()
。其想法是首先將該值作為byte[]
讀取,然后將該值反序列化為轉(zhuǎn)換器中的Avro類(請參見下面的示例)。但是,當(dāng)我這樣做時,我會得到一個例外。
StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, MySpecificClass> myTable = builder.table(
"my-topic",
Consumed.with(Serdes.Long(), Serdes.ByteArray())
)
.transformValues(MyDeserializerTransformer::new);
...
KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);
joinResultTable.toStream()...
public class MyDeserializerTransformer implements
ValueTransformerWithKey<Long, byte[], MySpecificClass> {
MyAvroDeserializer deserializer;
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
deserializer = new MyAvroDeserializer();
this.context = context;
}
@Override
public MySpecificClass transform(Long key, byte[] value) {
return deserializer.deserialize(context.topic(), context.headers(), value);
}
@Override
public void close() {
}
}
當(dāng)我運(yùn)行它時,我收到一個ClassCastException。我如何解決此問題或找到解決方法?我需要使用輔助狀態(tài)存儲嗎?
"class": "org.apache.kafka.streams.errors.StreamsException",
"msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
"stack": [
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
"org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
"org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
"org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
"org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
"class": "java.lang.ClassCastException",
"msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
"stack": [
"org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
"org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
"org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
"org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
"org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
推薦答案
我能夠通過以下方法解決此問題:首先將輸入主題作為KStream讀取,然后將其轉(zhuǎn)換為具有不同Serde的KTable。第二步,狀態(tài)存儲似乎遇到了未調(diào)用帶有標(biāo)頭的序列化程序/反序列化程序方法簽名的問題。
這篇關(guān)于將Kafka Streams與Serde結(jié)合使用,這些Serde依賴于標(biāo)頭中的架構(gòu)引用的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,