Home > Development > Producer Consumer Queue

Producer Consumer Queue


I have just build my own producer consumer queue and am keen to get feedback on it from people. Please post your comments.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ProducerConsumerQueueSample
{

    public class ProducerConsumerQueue<TProductionType>
    {
        private Func<IEnumerable<TProductionType>> _producer;
        private readonly IList<Action<TProductionType>> _consumers;
        private readonly EventWaitHandle _producer_wait_handle;
        private readonly EventWaitHandle _consumer_wait_handle;
        private long _totalconsumers;
        private long _processed_consumers;
        private bool _started;
        private readonly object _locker;

        public ProducerConsumerQueue()
        {
            _consumers = new List<Action<TProductionType>>();
            _producer_wait_handle = new EventWaitHandle(true, EventResetMode.AutoReset);
            _consumer_wait_handle = new EventWaitHandle(false, EventResetMode.ManualReset);
            _locker = new object();
            _started = false;
            _totalconsumers = 0;
            _processed_consumers = 0;
        }

        public static ProducerConsumerQueue<TProductionType> Initialize()
        {
            return new ProducerConsumerQueue<TProductionType>();
        }

        public ProducerConsumerQueue<TProductionType> RegisterProducer(Func<IEnumerable<TProductionType>> producer)
        {
            _producer = producer;
            return this;
        }

        public ProducerConsumerQueue<TProductionType> RegisterConsumer(Action<TProductionType> consumer)
        {
            _consumers.Add(consumer);
            return this;
        }

        public void StartProduction()
        {
            if (_started)
                return;

            _started = true;
            _totalconsumers = _consumers.Count;
            _processed_consumers = 0;

            foreach (var item in _producer())
            {
                _producer_wait_handle.WaitOne();
                _consumer_wait_handle.Set();

                foreach (var consumer in _consumers)
                {
                    var tempitem = item;
                    var tempconsumer = consumer;

                    ThreadPool.QueueUserWorkItem(
                        x =>
                        {
                            try
                            {
                                tempconsumer(tempitem);
                                lock (_locker)
                                {
                                    UpdateProcessed();
                                }
                            }
                            catch (Exception e)
                            {
                                lock (_locker)
                                {
                                    UpdateProcessed();
                                }
                            }
                        }
                    );


                }
            }

            _producer_wait_handle.WaitOne(); // Wait for all consumers to notify there done!

        }

        private void UpdateProcessed()
        {
            _processed_consumers += 1;
            if (_processed_consumers == _totalconsumers)
            {
                _processed_consumers = 0;
                _producer_wait_handle.Set();
            }
        }
    }

    public class Program
    {
        static void Main()
        {


            ProducerConsumerQueue<int>.Initialize()
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 1, x))
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 2, x))
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 3, x))
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 4, x))
                                      .RegisterConsumer(x => Console.WriteLine("Consumer No {0} - Consuming: {1}", 5, x))
                                      .RegisterProducer(() => Enumerable.Range(1, 100))
                                      .StartProduction();

            Console.WriteLine("Press any key to quit...");
            Console.ReadKey();
        }
    }
}

Advertisements
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: