Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
81 views
in Technique[技术] by (71.8m points)

java - Error handling in Spring Kafka with properties file?

I'm getting a bunch of deserialization failure before my Kafka Listener is hit. I was looking into the things Gary Russel built, but having issues getting it to work. All my stuff is configured via properties file.

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer

So if I add these, my understanding is it wraps an error in the headers of the consumer record? My ultimate goal is to have any deserialization exception hit some custom class I have so I can handle what I want to do with it. IE, forward to my dead letter handler which uploads failed data to s3.

I tried adding the errorhandler flag to the kafkalistener, but that also didn't do anything.

Updated Property Configuration

I've updated my configuration, it's still unclear to me if this is correct. It's not working, so I assume not.

None of the custom code is getting called

spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider

spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate

BadFoo

public class BadFoo {

    private final FailedDeserializationInfo failedDeserializationInfo;

    public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
        this.failedDeserializationInfo = failedDeserializationInfo;
    }

    public FailedDeserializationInfo getFailedDeserializationInfo() {
        return this.failedDeserializationInfo;
    }
}

FailedFooProvider

public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
    @Override
    public String apply(FailedDeserializationInfo info) {
        System.out.println("");
        return "";
    }
}
question from:https://stackoverflow.com/questions/65602020/error-handling-in-spring-kafka-with-properties-file

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

See the documentation here and here.

Also take a look at the DeadLetterPublishingRecoverer code, which can used to publish the failed record to an other topic. You can model your code after that to obtain the header(s) containing the failed byte[].

https://github.com/spring-projects/spring-kafka/blob/fa5c35e9b15c4cecfc6ea2bbbf9e7745bc5d9f75/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L169-L178

The recoverer is used in conjunction with a SeekToCurrentErrorHandler.

Configure the error handler as a @Bean and Spring Boot will automatically wire it into the container.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...