Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
446 views
in Technique[技术] by (71.8m points)

c# - Single source producer to multiple cosumers working in parallel

I want to have a kind of queue in which a single source inputs data in it and on the other side there will be consumers waiting that when they detect that the queue is not empty will start to execute the data until they are halted. but its important that if the queue is emptied they will still remain watching the queue such that if more data pops in they will be able to consume it. What i found By multiple consumer and multiple producers as the consumers are nested in the producers where in my case i cant do that as i will have a single source and consumers committed to the queue till i stop them. therefore not in series but both the consumer and the producers are executing in parallel.

will be xecutig the consumer and the producers in parallel by

Parallel.Invoke(() => producer(), () => consumers());

the problem as such is how i will execute the content of a queue which is sometimes empty in parallel

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can solve this relatively easily using a BlockingCollection<T>.

You can use one as a queue, and pass a reference to it to the producer() and each of the consumers().

You'll be calling GetConsumingEnumerable() from each consumer thread, and using it with foreach.

The producer thread will add items to the collection, and will call CompleteAdding() when it has finished producing stuff. This will automatically make all the consumer threads exit their foreach loops.

Here's a basic example (with no error handling). The calls to Thread.Sleep() are to simulate load, and should not be used in real code.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code.
            var plant = new ProcessingPlant();
            plant.Process();
            Console.WriteLine("Work complete.");
        }
    }

    public sealed class ProcessingPlant
    {
        private readonly BlockingCollection<string> _queue = new BlockingCollection<string>();

        public void Process()
        {
            Parallel.Invoke(producer, consumers);
        }

        private void producer()
        {
            for (int i = 0; i < 100; ++i)
            {
                string item = i.ToString();
                Console.WriteLine("Producer is queueing {0}", item);
               _queue.Add(item);  // <- Here's where we add an item to the queue.
                Thread.Sleep(0);
            }

            _queue.CompleteAdding(); // <- Here's where we make all the consumers
        }                            //    exit their foreach loops.

        private void consumers()
        {
            Parallel.Invoke(
                () => consumer(1),
                () => consumer(2),
                () => consumer(3),
                () => consumer(4),
                () => consumer(5)
            );
        }

        private void consumer(int id)
        {
            Console.WriteLine("Consumer {0} is starting.", id);

            foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items.
            {
                Console.WriteLine("Consumer {0} read {1}", id, item);
                Thread.Sleep(0);
            }

            Console.WriteLine("Consumer {0} is stopping.", id);
        }
    }
}

(I know this is using an extra thread just to start the consumers, but I did it this way to avoid obscuring the real point - which is to demonstrate the use of BlockingCollection.)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...