Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
4.4k views
in Technique[技术] by (71.8m points)

java - Camel always routing messages to 0 partition of intended Kafka topic

My use case: Given, one JMS queue, one Kafka topic with two partitions, route messages from JMS queue to a particular partition of the Kafka topic based on custom header attribute's value.

Problem: Messages are getting routed from JMS to Kafka Topic's partition 0, not to partition 1 even though PARTITION_KEY and KEY are set in headers (code below).

Dependencies with version 2.16.3 used, steps and java code attempted is below:


<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-kafka</artifactId> <!--**2.16.3** used-->
</dependency>
<dependency>
  <groupId>org.apache.camel</groupId> <!--**2.16.3** used-->
  <artifactId>camel-jms</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel</groupId> <!--**2.16.3** used-->
  <artifactId>camel-core</artifactId>            
</dependency>

  1. Created a Kafka topic with two partitions:
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:218 1 --replication-factor 1 --partitions 2 --topic mytopic-with-partitions
  1. Listen to messages on Partition 0:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic-with-partitions --partition 0
  1. Listen to messages on Partition 1:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic-with-partitions --partition 1

public class MyRouteBuilder extends RouteBuilder
{
    private final MessagePreprocessor messagePreProcessor = new MessagePreprocessor();
    
    @Override
    public void configure() throws Exception
    {
     from("jms:queue:my-jms-queue").process(messagePreProcessor)
            .log("Routing message from my-jms-queue to " + toKafka)
            .to("kafka:localhost:9092?mytopic-with-partitions&zookeeperHost=localhost&zookeeperPort=2181&serializerClass=kafka.serializer.StringEncoder");
       
    }
}

=============================

public class MessagePreprocessor implements Processor
{
    @Override
    public void process(final Exchange exchange)
    {
        final String body = exchange.getIn().getBody(String.class);
        exchange.getOut().setBody(body);

        Integer partitionToUse = 0;

        final String flavorGot = (String) exchange.getIn().getHeader("FLAVOR");
        if ("VANILLA".equals(flavorGot)) {
           partitionToUse = 0;
        } else {
           partitionToUse = 1;
        }
        
        
        headers.put(KafkaConstants.PARTITION_KEY, partitionToUse);
        headers.put(KafkaConstants.KEY, flavorGot);

        exchange.getOut().setHeaders(headers);
    }

}

Output:

  1. Messages are making it to Partition 0, displayed by command in Step 2
  2. Messages are NOT making it to Partition 1, displayed by command in Step 3.

Appreciate any suggestions/pointers. Thanks


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神解答

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...