I'm getting a bunch of deserialization failure before my Kafka Listener
is hit. I was looking into the things Gary Russel built, but having issues getting it to work. All my stuff is configured via properties file.
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
So if I add these, my understanding is it wraps an error in the headers of the consumer record? My ultimate goal is to have any deserialization exception hit some custom class I have so I can handle what I want to do with it. IE, forward to my dead letter handler which uploads failed data to s3.
I tried adding the errorhandler flag to the kafkalistener, but that also didn't do anything.
Updated Property Configuration
I've updated my configuration, it's still unclear to me if this is correct. It's not working, so I assume not.
None of the custom code is getting called
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate
BadFoo
public class BadFoo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
FailedFooProvider
public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
@Override
public String apply(FailedDeserializationInfo info) {
System.out.println("");
return "";
}
}
question from:
https://stackoverflow.com/questions/65602020/error-handling-in-spring-kafka-with-properties-file 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…