Home > Development > Asynchronous ThreadPool – Wait Till Complete Functionality

Asynchronous ThreadPool – Wait Till Complete Functionality


Hi all,
Looking to get some feedback on a .NET 3.5 ThreadPool. I’m using to run tasks and asynchronously but wait to all are finish and bubble all exceptions to the client.

Keen to ensure its thread safe with no races. Help required.


using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ThreadPoolWaitHandles
{

    public class AsyncActionInvoker
    {

        private long _tasks_run_count;
        private bool _already_executed;
        private readonly List<Exception> _exceptions;
        private List<Action> _actions;
        private object _exception_locker = new object();

        public AsyncActionInvoker()
        {
            _tasks_run_count = 0;
            _already_executed = false;
            _exceptions = new List<Exception>();
            _actions = new List<Action>();
        }



        public void AddActionToRunAsync(Action action)
        {
            _actions.Add(action);
        }

        public void Start()
        {

            if (!_actions.Any())
                return;

            if (_already_executed)
                return;

            _already_executed = true;

            EventWaitHandle event_wait_handle = new EventWaitHandle(false, EventResetMode.AutoReset);

            foreach (var action in _actions)
            {
                var tempaction = action;
                ThreadPool.QueueUserWorkItem((x) =>
                                                 {
                                                     try
                                                     {
                                                         tempaction();
                                                         Interlocked.Increment(ref _tasks_run_count);
                                                         if (Interlocked.Read(ref _tasks_run_count) == _actions.Count)
                                                         {
                                                             event_wait_handle.Set();
                                                         }

                                                     }
                                                     catch (Exception ex)
                                                     {
                                                         Interlocked.Increment(ref _tasks_run_count);
                                                         if (Interlocked.Read(ref _tasks_run_count) == _actions.Count)
                                                         {
                                                             lock (_exception_locker)
                                                             {
                                                                 _exceptions.Add(ex);
                                                             }
                                                             event_wait_handle.Set();
                                                         }
                                                     }
                                                 });
            }

            event_wait_handle.WaitOne();

            if (_exceptions.Any())
                throw new AggregateActionException("An error occured while processing at least one of the actions")
                          {
                              TotalActions = _actions.Count,
                              ErroredActions = _exceptions.Count,
                              Errors = _exceptions
                          };

        }
    }

    public class AggregateActionException : Exception
    {
        public AggregateActionException(string message) : base(message) { }
        public int TotalActions { get; set; }
        public int ErroredActions { get; set; }
        public List<Exception> Errors { get; set; }
    }

    public class Program
    {
        public static void Main(string[] args)
        {

            AsyncActionInvoker asyncActionInvoker = new AsyncActionInvoker();

            Enumerable.Range(1, 20).ToList().ForEach(
                    x => asyncActionInvoker.AddActionToRunAsync(() =>
                                                                    {
                                                                        Random r = new Random();
                                                                        int temp = x;
                                                                        Thread.Sleep(r.Next(1000));
                                                                        Console.WriteLine("Action {0}", temp);
                                                                    })
                );

            asyncActionInvoker.Start();

            Console.WriteLine("Press any key to quit...");
            Console.ReadKey();
        }
    }
}

Update…

I had comment this was not really atomic how abt the following.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ThreadPoolWaitHandles
{

    public class AsyncActionInvoker
    {

        private long _tasks_run_count;
        private bool _already_executed;
        private readonly List<Exception> _exceptions;
        private List<Action> _actions;
        private readonly object _locker = new object();

        public AsyncActionInvoker()
        {
            _tasks_run_count = 0;
            _already_executed = false;
            _exceptions = new List<Exception>();
            _actions = new List<Action>();
        }



        public void AddActionToRunAsync(Action action)
        {
            _actions.Add(action);
        }

        public void Start()
        {

            if (!_actions.Any())
                return;

            if (_already_executed)
                return;

            _already_executed = true;

            EventWaitHandle event_wait_handle = new EventWaitHandle(false, EventResetMode.AutoReset);

            foreach (var action in _actions)
            {
                var tempaction = action;
                ThreadPool.QueueUserWorkItem((x) =>
                                                 {
                                                     try
                                                     {
                                                         tempaction();

                                                         lock (_locker)
                                                         {
                                                             ++_tasks_run_count;
                                                             if (_tasks_run_count == _actions.Count)
                                                             {
                                                                 event_wait_handle.Set();
                                                             }
                                                         }

                                                     }
                                                     catch (Exception ex)
                                                     {
                                                         lock (_locker)
                                                         {
                                                             ++_tasks_run_count;
                                                             _exceptions.Add(ex);
                                                             if (_tasks_run_count == _actions.Count)
                                                             {
                                                                 event_wait_handle.Set();
                                                             }
                                                         }

                                                     }
                                                 });
            }

            event_wait_handle.WaitOne();

            if (_exceptions.Any())
                throw new AggregateActionException("An error occured while processing at least one of the actions")
                          {
                              TotalActions = _actions.Count,
                              ErroredActions = _exceptions.Count,
                              Errors = _exceptions
                          };

        }
    }

    public class AggregateActionException : Exception
    {
        public AggregateActionException(string message) : base(message) { }
        public int TotalActions { get; set; }
        public int ErroredActions { get; set; }
        public List<Exception> Errors { get; set; }
    }

    public class Program
    {
        public static void Main(string[] args)
        {

            AsyncActionInvoker asyncActionInvoker = new AsyncActionInvoker();

            Enumerable.Range(1, 20).ToList().ForEach(
                    x => asyncActionInvoker.AddActionToRunAsync(() =>
                                                                    {
                                                                        Random r = new Random();
                                                                        int temp = x;
                                                                        Thread.Sleep(r.Next(1000));
                                                                        Console.WriteLine("Action {0}", temp);
                                                                    })
                );

            asyncActionInvoker.Start();

            Console.WriteLine("Press any key to quit...");
            Console.ReadKey();
        }
    }
}

Cheers

Advertisements
Categories: Development Tags: , , , ,
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: