Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
633 views
in Technique[技术] by (71.8m points)

c# - Handle rabbitmq messages concurrenrtly

I asked a question here about why starting a process using Thread.Run did not execute as many concurrent requests as I expected.

The reason behind this question was that I was trying to create a class which can pull messages off a rabbitmq queue and process them concurrently up to a maximum number of concurrent messages.

To do this I ended up with the following in the Received handler of the EventingBasicConsumer class.

async void Handle(EventArgs e) 
{
    await _semaphore.WaitAsync();

    var thread = new Thread(() =>
    {
        Process(e);
        _semaphore.Release(); 
        _channel.BasicAck(....);
    });
    thread.Start();
} 

However the comments on the previous post were not to start a thread unless doing CPU bound work.

The above handler does not know whether the work will be CPU bound, Network, Disk or otherwise. (Process is an abstract method).

Even so I think I have to start a thread or task here, otherwise the Process method blocks the rabbitmq thread and the event handler is not called again until it is finished. So I can only handle one method at once.

Is starting a new Thread here okay? Originally I had used Task.Run but this didn't produce as many workers as wanted. See other post.

FYI. The number of concurrent threads is capped by setting the InitialCount on the semaphore.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

As already been said in linked question, big number of threads doesn't guarantee the performance, as if their number gets more than the number of logical cores, you got a thread starvation situation with no real work being done.

However, if you still need to handle the number of a concurrent operations, you may give a try to the TPL Dataflow library, with settings up the MaxDegreeOfParallelism, like in this tutorial.

var workerBlock = new ActionBlock<EventArgs>(
    // Process event
    e => Process(e),
    // Specify a maximum degree of parallelism.
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = InitialCount
    });
var bufferBlock = new BufferBlock();
// link the blocks for automatically propagading the messages
bufferBlock.LinkTo(workerBlock);

// asynchronously send the message
await bufferBlock.SendAsync(...);
// synchronously send the message
bufferBlock.Post(...);

BufferBlock is a queue, so the order of messages will be preserved. Also, you can add the different handlers (with a different degree of parallelism) with linking the blocks with filter lambda:

bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs);
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs);
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs);

but in this case you should setup a default handler at the end of the chain, so the message wouldn't disappear (you may use a NullTarget block for this):

bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>);

Also, the block could be an observers, so they perfectly work with Reactive Extensions on UI side.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...