亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.430618.com 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

本文介紹了將Kafka Streams與Serde結(jié)合使用,這些Serde依賴于標(biāo)頭中的架構(gòu)引用的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!

問題描述

我正在嘗試使用Kafka Streams對CDC數(shù)據(jù)執(zhí)行KTable-KTable外鍵聯(lián)接。我將讀取的數(shù)據(jù)是Avro格式的,但是它被序列化的方式與其他行業(yè)序列化程序/反序列化程序(例如。合流架構(gòu)注冊表),因?yàn)榧軜?gòu)標(biāo)識符存儲在標(biāo)頭中。

當(dāng)我設(shè)置KTables的Serdes時,我的Kafka Streams應(yīng)用程序最初運(yùn)行,但最終失敗,因?yàn)樗趦?nèi)部調(diào)用了帶有byte[] serialize(String topic, T data);的序列化程序方法,而不是帶有標(biāo)頭的方法(即。byte[] serialize(String topic, Headers headers, T data)在包裝序列化程序ValueAndTimestampSerializer中。我正在使用的Serdes無法處理此問題并引發(fā)異常。

第一個問題是,有沒有人知道如何懇求Kafka Streams在內(nèi)部調(diào)用帶有正確方法簽名的方法?

我正在探索解決這個問題的方法,包括編寫新的Serde,用消息本身中的模式標(biāo)識符重新序列化。這可能涉及將數(shù)據(jù)重新復(fù)制到新主題或使用攔截器。

但是,我知道ValueTransformer可以訪問ProcessorContext中的標(biāo)頭,我想知道是否有更快的方法使用transformValues()。其想法是首先將該值作為byte[]讀取,然后將該值反序列化為轉(zhuǎn)換器中的Avro類(請參見下面的示例)。但是,當(dāng)我這樣做時,我會得到一個例外。

StreamsBuilder builder = new StreamsBuilder();
 final KTable<Long, MySpecificClass> myTable = builder.table(
      "my-topic",
       Consumed.with(Serdes.Long(), Serdes.ByteArray())
    )
    .transformValues(MyDeserializerTransformer::new);

 ...

 KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);

 joinResultTable.toStream()...
public class MyDeserializerTransformer implements
    ValueTransformerWithKey<Long, byte[], MySpecificClass> {
  MyAvroDeserializer deserializer;
  ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    deserializer = new MyAvroDeserializer();
    this.context = context;
  }

  @Override
  public MySpecificClass transform(Long key, byte[] value) {
    return deserializer.deserialize(context.topic(), context.headers(), value);
  }

  @Override
  public void close() {

  }
}

當(dāng)我運(yùn)行它時,我收到一個ClassCastException。我如何解決此問題或找到解決方法?我需要使用輔助狀態(tài)存儲嗎?

"class": "org.apache.kafka.streams.errors.StreamsException",
    "msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
    "stack": [
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
      "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
      "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
 "class": "java.lang.ClassCastException",
      "msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
      "stack": [
        "org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
        "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
        "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",

推薦答案

我能夠通過以下方法解決此問題:首先將輸入主題作為KStream讀取,然后將其轉(zhuǎn)換為具有不同Serde的KTable。第二步,狀態(tài)存儲似乎遇到了未調(diào)用帶有標(biāo)頭的序列化程序/反序列化程序方法簽名的問題。

這篇關(guān)于將Kafka Streams與Serde結(jié)合使用,這些Serde依賴于標(biāo)頭中的架構(gòu)引用的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,

分享到:
標(biāo)簽:Kafka Serde STREAMS 依賴于 引用 架構(gòu) 標(biāo)頭中
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定