Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
704 views
in Technique[技术] by (71.8m points)

java - Serialisation error for Kafka avro consumer using Spring boot

I have created a Kafka Avro producer and consumer using spring boot as two different projects. While consuming the data I am getting the following exception.

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value 
for partition bookavro-0 at offset 3. If needed, please seek past the record to continue 
consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message 
for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class 
com.dailycodebuffer.kafka.apachekafkaproducerdemo.BookAvro specified in writer's schema whilst 
finding reader's schema for a SpecificRecord.

2020-12-30 18:44:09.032 ERROR 22344 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer 
: Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's 
directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.4.jar:2.6.4]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.4.jar:2.6.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1425) [spring-kafka-2.6.4.jar:2.6.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1122) [spring-kafka-2.6.4.jar:2.6.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_202]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_202]
at java.lang.Thread.run(Thread.java:813) [na:1.8.0_202]

com.dailycodebuffer.kafka.apachekafkaproducerdemo.BookAvro is the package in producer project

Below is my consumer config:

    @Bean
    public ConsumerFactory<String, BookAvro> BookconsumerFactory(){
        System.out.println("hi");
         Map<String, Object> configProps = new HashMap<>();
            configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
            configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          //  configProps.put(ConsumerConfig.KEY, StringDeserializer.class);
           configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.KafkaAvroDeserializer");
          //  configProps.put("value.deserializer","org.springframework.kafka.support.serializer.JsonDeserializer");
           
           
          // configProps.put(JsonDeserializer.ADD_TYPE_INFO_HEADERS, false);
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG,"group_json");
            configProps.put("auto.offset.reset", "earliest");
            configProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
            configProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
            System.out.println(configProps.toString());
            
            return new DefaultKafkaConsumerFactory<String, BookAvro>(configProps);
    }

    @Bean
    public  ConcurrentKafkaListenerContainerFactory<String, BookAvro> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, BookAvro> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(BookconsumerFactory());
        System.out.println(factory.toString());
        //factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

Following is the Producer Config:

    @Bean
    public ProducerFactory<String, BookAvro> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
       configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaAvroSerializer.class.getName());
        configProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
       //     configProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
       // configProps.put(KafkaAvroSerializerConfig., "true");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, BookAvro> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

Below is the Kafka listner:

    @KafkaListener(groupId = "group_json", topics = "bookavro")
    public void consumeBook( BookAvro book) {
        System.out.println("message3" + book.toString());
    }

BookAvro is the Avro class created using the Avsc file. Could anyone please help me to resolve this exception?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...