Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.2k views
in Technique[技术] by (71.8m points)

rabbitmq - Is it possible for workers to consume based on topic?

I understand that you can send messages directly to a queue using channel.sendToQueue, and this creates a tasks-and-workers situation, where only one consumer will handle each task.

I also understand that you can use channel.publish with a topic-based exchange, and messages will be routed to queues based on the routing key. To my understanding, though, this will always broadcast to all subscribers on any matching queues.

I would essentially like to use the topic-based exchange, but only have one consumer handle each task. I've been through the documentation, and I don't see a way to do this.

My use-case:

I have instances of a microservice set up in multiple locations. There might be two in California, three in London, one in Singapore, etc. When a task is created, the only thing that matters is that it's handled by one of the instances in a given location.

Of course, I can create hundreds of queues named "usa-90210", "uk-ec1a", etc. It just seems like using topics would be much cleaner. It would also be more flexible, given the ability to use wildcards.

If this isn't a feature of RabbitMQ, I'm open to other thoughts or ideas as well.

UPDATE

As per istepaniuk's suggestion, I've tried creating two workers, each binding their own queue to the exchange:

const connectionA = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const channelA = await connectionA.createChannel();
channelA.assertExchange('test_exchange', 'topic', { durable: false });
await channelA.assertQueue('test_queue_a', { exclusive: true });
channelA.bindQueue('test_queue_a', 'test_exchange', 'usa.*');
await channelA.consume('test_queue_a', () => { console.log('worker a'); }, { noAck: true });

const connectionB = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const channelB = await connectionB.createChannel();
channelB.assertExchange('test_exchange', 'topic', { durable: false });
await channelB.assertQueue('test_queue_b', { exclusive: true });
channelB.bindQueue('test_queue_b', 'test_exchange', 'usa.*');
await channelB.consume('test_queue_b', () => { console.log('worker b'); }, { noAck: true });

const pubConnection = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const pubChannel = await pubConnection.createChannel();
pubChannel.assertExchange('test_exchange', 'topic', { durable: false });
pubChannel.publish('test_exchange', 'usa.90210', Buffer.from(''));

Unfortunately, both consumers are still receiving the message.

worker a
worker b
question from:https://stackoverflow.com/questions/65860805/is-it-possible-for-workers-to-consume-based-on-topic

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Yes.

I would essentially like to use the topic-based exchange, but only have one consumer handle each task. I've been through the documentation, and I don't see a way to do this.

Use a topic exchange and have your consumers declare and bind their own queues. What you describe is a very common scenario. It is outlined in the tutorial 5, "Topics".

Additionally, you can have multiple consumers share a queue (just don't declare it exclusive). This is described in tutorial 2, "Workers".

The multiple instances of consumers can declare the same queue and bindings, the operation is idempotent. Using durable queues (as opposed to exclusive) also means that the messages will queue up if all your consumers disappear or the network fails.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...