Archive

Posts Tagged ‘Help’

Asynchronous ThreadPool – Wait Till Complete Functionality

November 2, 2010 Leave a comment

Hi all,
Looking to get some feedback on a .NET 3.5 ThreadPool. I’m using to run tasks and asynchronously but wait to all are finish and bubble all exceptions to the client.

Keen to ensure its thread safe with no races. Help required.


using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ThreadPoolWaitHandles
{

    public class AsyncActionInvoker
    {

        private long _tasks_run_count;
        private bool _already_executed;
        private readonly List<Exception> _exceptions;
        private List<Action> _actions;
        private object _exception_locker = new object();

        public AsyncActionInvoker()
        {
            _tasks_run_count = 0;
            _already_executed = false;
            _exceptions = new List<Exception>();
            _actions = new List<Action>();
        }



        public void AddActionToRunAsync(Action action)
        {
            _actions.Add(action);
        }

        public void Start()
        {

            if (!_actions.Any())
                return;

            if (_already_executed)
                return;

            _already_executed = true;

            EventWaitHandle event_wait_handle = new EventWaitHandle(false, EventResetMode.AutoReset);

            foreach (var action in _actions)
            {
                var tempaction = action;
                ThreadPool.QueueUserWorkItem((x) =>
                                                 {
                                                     try
                                                     {
                                                         tempaction();
                                                         Interlocked.Increment(ref _tasks_run_count);
                                                         if (Interlocked.Read(ref _tasks_run_count) == _actions.Count)
                                                         {
                                                             event_wait_handle.Set();
                                                         }

                                                     }
                                                     catch (Exception ex)
                                                     {
                                                         Interlocked.Increment(ref _tasks_run_count);
                                                         if (Interlocked.Read(ref _tasks_run_count) == _actions.Count)
                                                         {
                                                             lock (_exception_locker)
                                                             {
                                                                 _exceptions.Add(ex);
                                                             }
                                                             event_wait_handle.Set();
                                                         }
                                                     }
                                                 });
            }

            event_wait_handle.WaitOne();

            if (_exceptions.Any())
                throw new AggregateActionException("An error occured while processing at least one of the actions")
                          {
                              TotalActions = _actions.Count,
                              ErroredActions = _exceptions.Count,
                              Errors = _exceptions
                          };

        }
    }

    public class AggregateActionException : Exception
    {
        public AggregateActionException(string message) : base(message) { }
        public int TotalActions { get; set; }
        public int ErroredActions { get; set; }
        public List<Exception> Errors { get; set; }
    }

    public class Program
    {
        public static void Main(string[] args)
        {

            AsyncActionInvoker asyncActionInvoker = new AsyncActionInvoker();

            Enumerable.Range(1, 20).ToList().ForEach(
                    x => asyncActionInvoker.AddActionToRunAsync(() =>
                                                                    {
                                                                        Random r = new Random();
                                                                        int temp = x;
                                                                        Thread.Sleep(r.Next(1000));
                                                                        Console.WriteLine("Action {0}", temp);
                                                                    })
                );

            asyncActionInvoker.Start();

            Console.WriteLine("Press any key to quit...");
            Console.ReadKey();
        }
    }
}

Update…

I had comment this was not really atomic how abt the following.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ThreadPoolWaitHandles
{

    public class AsyncActionInvoker
    {

        private long _tasks_run_count;
        private bool _already_executed;
        private readonly List<Exception> _exceptions;
        private List<Action> _actions;
        private readonly object _locker = new object();

        public AsyncActionInvoker()
        {
            _tasks_run_count = 0;
            _already_executed = false;
            _exceptions = new List<Exception>();
            _actions = new List<Action>();
        }



        public void AddActionToRunAsync(Action action)
        {
            _actions.Add(action);
        }

        public void Start()
        {

            if (!_actions.Any())
                return;

            if (_already_executed)
                return;

            _already_executed = true;

            EventWaitHandle event_wait_handle = new EventWaitHandle(false, EventResetMode.AutoReset);

            foreach (var action in _actions)
            {
                var tempaction = action;
                ThreadPool.QueueUserWorkItem((x) =>
                                                 {
                                                     try
                                                     {
                                                         tempaction();

                                                         lock (_locker)
                                                         {
                                                             ++_tasks_run_count;
                                                             if (_tasks_run_count == _actions.Count)
                                                             {
                                                                 event_wait_handle.Set();
                                                             }
                                                         }

                                                     }
                                                     catch (Exception ex)
                                                     {
                                                         lock (_locker)
                                                         {
                                                             ++_tasks_run_count;
                                                             _exceptions.Add(ex);
                                                             if (_tasks_run_count == _actions.Count)
                                                             {
                                                                 event_wait_handle.Set();
                                                             }
                                                         }

                                                     }
                                                 });
            }

            event_wait_handle.WaitOne();

            if (_exceptions.Any())
                throw new AggregateActionException("An error occured while processing at least one of the actions")
                          {
                              TotalActions = _actions.Count,
                              ErroredActions = _exceptions.Count,
                              Errors = _exceptions
                          };

        }
    }

    public class AggregateActionException : Exception
    {
        public AggregateActionException(string message) : base(message) { }
        public int TotalActions { get; set; }
        public int ErroredActions { get; set; }
        public List<Exception> Errors { get; set; }
    }

    public class Program
    {
        public static void Main(string[] args)
        {

            AsyncActionInvoker asyncActionInvoker = new AsyncActionInvoker();

            Enumerable.Range(1, 20).ToList().ForEach(
                    x => asyncActionInvoker.AddActionToRunAsync(() =>
                                                                    {
                                                                        Random r = new Random();
                                                                        int temp = x;
                                                                        Thread.Sleep(r.Next(1000));
                                                                        Console.WriteLine("Action {0}", temp);
                                                                    })
                );

            asyncActionInvoker.Start();

            Console.WriteLine("Press any key to quit...");
            Console.ReadKey();
        }
    }
}

Cheers

Advertisements
Categories: Development Tags: , , , ,

iTextSharp

May 18, 2010 Leave a comment

Doing some work with this currently. This is a real PITA. If you have used it before and can offer help let me know.

Blair..

Categories: Development Tags: , ,