Archive

Posts Tagged ‘Producer Consumer’

Producer Consumer Scenarios with BlockingCollection

November 15, 2010 Leave a comment

Producer Consumer is really easy with TPL. You create a blocking collection and one task(producer) uses Add() to add items to it and another task(consumer) reads data from it using TryTake().

The consumer must signal to the consumers that it has finished adding by calling CompleteAdding().

using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;

namespace BlockingCollectionSample
{
    public class Program
    {
        static void Main(string[] args)
        {
            Program p = new Program();
            p.TestingBlockingCollection();

            Console.WriteLine("Press any key to quit..");
            Console.ReadKey();
        }

        public void TestingBlockingCollection()
        {

            BlockingCollection<string> collection = new BlockingCollection<string>();

            var p = Task.Factory.StartNew(() =>
            {
                Random r = new Random();
                for (int i = 0; i < 5; i++)
                {
                    Thread.Sleep(1000);
                    int value = r.Next(100);
                    Console.WriteLine("Task {0} produced value {1}", Task.CurrentId, value);
                    collection.Add(value.ToString());
                }
                collection.CompleteAdding();

            });

            var c = Task.Factory.StartNew(() =>
            {
                Task.Factory.StartNew(() =>
                {
                    while (!collection.IsCompleted)
                    {
                        string value = null;
                        collection.TryTake(out value);

                        if (value != null)
                        {
                            Console.WriteLine("Task {0} consumed value {1}", Task.CurrentId, value);
                        }
                    }

                },TaskCreationOptions.AttachedToParent);

                Task.Factory.StartNew(() =>
                {
                    while (!collection.IsCompleted)
                    {
                        
                        string value = null;
                        collection.TryTake(out value);

                        if (value != null)
                        {
                            Console.WriteLine("Task {0} consumed value {1}", Task.CurrentId, value);
                        }
                    }

                }, TaskCreationOptions.AttachedToParent);

            });
            Task.WaitAll(p, c);
        }
    }
}

Could also be coded like the following:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;

namespace BlockingCollectionSample
{
    public class Program
    {
        static void Main(string[] args)
        {
            Program p = new Program();
            p.TestingBlockingCollection();

            Console.WriteLine("Press any key to quit..");
            Console.ReadKey();
        }

        public void TestingBlockingCollection()
        {

            BlockingCollection collection = new BlockingCollection();

            var p = Task.Factory.StartNew(() =&gt;
            {
                Random r = new Random();
                for (int i = 0; i 
            {
                Parallel.ForEach(collection.GetConsumingEnumerable(), x =&gt;
                {
                    Console.WriteLine("Consumer processed: {0}", x);
                });
            });

            Task.WaitAll(p, c);

        }


    }
}
Advertisements

Producer Consumer Queue

November 2, 2010 Leave a comment

I have just build my own producer consumer queue and am keen to get feedback on it from people. Please post your comments.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ProducerConsumerQueueSample
{

    public class ProducerConsumerQueue<TProductionType>
    {
        private Func<IEnumerable<TProductionType>> _producer;
        private readonly IList<Action<TProductionType>> _consumers;
        private readonly EventWaitHandle _producer_wait_handle;
        private readonly EventWaitHandle _consumer_wait_handle;
        private long _totalconsumers;
        private long _processed_consumers;
        private bool _started;
        private readonly object _locker;

        public ProducerConsumerQueue()
        {
            _consumers = new List<Action<TProductionType>>();
            _producer_wait_handle = new EventWaitHandle(true, EventResetMode.AutoReset);
            _consumer_wait_handle = new EventWaitHandle(false, EventResetMode.ManualReset);
            _locker = new object();
            _started = false;
            _totalconsumers = 0;
            _processed_consumers = 0;
        }

        public static ProducerConsumerQueue<TProductionType> Initialize()
        {
            return new ProducerConsumerQueue<TProductionType>();
        }

        public ProducerConsumerQueue<TProductionType> RegisterProducer(Func<IEnumerable<TProductionType>> producer)
        {
            _producer = producer;
            return this;
        }

        public ProducerConsumerQueue<TProductionType> RegisterConsumer(Action<TProductionType> consumer)
        {
            _consumers.Add(consumer);
            return this;
        }

        public void StartProduction()
        {
            if (_started)
                return;

            _started = true;
            _totalconsumers = _consumers.Count;
            _processed_consumers = 0;

            foreach (var item in _producer())
            {
                _producer_wait_handle.WaitOne();
                _consumer_wait_handle.Set();

                foreach (var consumer in _consumers)
                {
                    var tempitem = item;
                    var tempconsumer = consumer;

                    ThreadPool.QueueUserWorkItem(
                        x =>
                        {
                            try
                            {
                                tempconsumer(tempitem);
                                lock (_locker)
                                {
                                    UpdateProcessed();
                                }
                            }
                            catch (Exception e)
                            {
                                lock (_locker)
                                {
                                    UpdateProcessed();
                                }
                            }
                        }
                    );


                }
            }

            _producer_wait_handle.WaitOne(); // Wait for all consumers to notify there done!

        }

        private void UpdateProcessed()
        {
            _processed_consumers += 1;
            if (_processed_consumers == _totalconsumers)
            {
                _processed_consumers = 0;
                _producer_wait_handle.Set();
            }
        }
    }

    public class Program
    {
        static void Main()
        {


            ProducerConsumerQueue<int>.Initialize()
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 1, x))
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 2, x))
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 3, x))
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 4, x))
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 5, x))
                                      .RegisterProducer(() => Enumerable.Range(1, 100))
                                      .StartProduction();

            Console.WriteLine("Press any key to quit...");
            Console.ReadKey();
        }
    }
}