Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
3.8k views
in Technique[技术] by (71.8m points)

kotlin - Kafka, Avro and Schema Registry

I have a Kafka consumer configured with schema polling from the topic, what I would like to do, is create another Avro schema, on top of the current one, and hydrate data using it, basically I don't need 50% of the information and need to write some logic to change a couple of fields. Thats just an example

    val consumer: KafkaConsumer<String, GenericRecord>(props) = createConsumer()
    while (true) {
        consumer.poll(Duration.ofSeconds(10).forEach {it ->
            println(it.value())
        }
    }

The event returned from stream is pretty complex, so I've modelled a smaller CustomObj as a .avsc file and compiled it to java. And when trying to run the code with the CustomObj, Error deserializing key/value for partition all I want to do is consume an event, and then deserialize it into a much smaller object with just selected fields.

return KafkaConsumer<String, CustomObj>(props)

This didn't work, not sure how can I deserialize it using CustomObj from the GenericRecord? Let me just add that I don't have any access to the stream or its config I can just consume from it.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

In Avro, your reader schema needs to be compatible with the writer schema. By giving the smaller object, you're providing a different reader schema

It's not possible to directly deserialize to a subset of the input data, so you must parse the larger object and map it to the smaller one (which isn't what deserialization does)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...