Tuesday, April 08, 2008

Managing Concurrency With Trees[4]

Read the entire saga:

Managing Concurrency With Trees[0]

Managing Concurrency With Trees[1]

Managing Concurrency With Trees[2]

Managing Concurrency With Trees[3]

 

As I indicated at the end of the last article, I might come back to this topic.  I’m back.  Two things made me unable to put this topic down.  #1 was using Reflector and finding various code in Parallel.ForEach written around things like this:

private const int DEFAULT_LOOP_STRIDE = 8;

 

As near as I can read the code, the Parallel extensions don’t really get their swerve on until you’ve got quite a few more tasks than I’m using for examples here.  My uneducated suspicion is that this has to do with building up a queue of work for each thread and implementing the work stealing.

Recall from my last example I can even frustrate the TPL into executing quite a few things sequentially with that specific Enumerator implementation.  This is what I’d like to avoid.

I’ll definitely need to come up with some realistic examples with hundreds or thousands of little tasks, but for now I’ll try to see my original idea through.  But not with Parallel FX.  It’s time for Payneallel FX.

I made the comment to my muse  that I wondered how much it would take to make my own version of Parallel.ForEach: no fancy work stealing and  very specific to my problem.  The PFX team surely has very good reasons for doing what they are doing but I was frustrated at being unable to use my custom Enumerator.  I want to be able to write code like this:

Parallel.ForEach<IExecutable>(workTree, executeTask);// tree based, low # of tasks

… or like this …

            Parallel.ForEach<OddJob>(jobs, doJob); //List based, high # of tasks

…and have it work without hinting.   Thinking I was going to be blowing my whole weekend on this, I started thinking and typing.  Once I decided how to limit the number of threads created and re-use a thread rather than make a new thread for each task, it went much better than I expected.  Around 208 lines of code later, I had something that worked 100% the way I wanted it to!

There are three classes: a static class containing a static ForEach method, a WorkerPool class, and a Worker class.

Paralell.ForEach seems like a pretty good way to write this kind of code, so I imitated this interface.  Here’s the sample Main method:

            int exeCount = 0;

 

            Action<OddJob> doJob = delegate(OddJob j)

            {

                j.Execute();

                Interlocked.Increment(ref exeCount);

                Console.WriteLine("Executed " + exeCount);

            };

 

            List<OddJob> jobs = new List<OddJob>();

            for (int i = 0; i < 20; ++i)

            {

                jobs.Add(new OddJob());

            }

           

            Payneallel.ForEach<OddJob>(jobs, doJob);

This should look just like TPL code.  The actual implementation is presented in its entirety here with some basic description afterwards.

    public static class Payneallel

    {

        public static void ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)

        {

            WorkerPool<TSource> pool = new WorkerPool<TSource>();

            foreach (TSource src in source)

            {

                Worker<TSource> worker = pool.GetWorker();

                worker.Arg = src;

                worker.Work = body;

                worker.Go();

            }

            pool.WaitAll();

        }

This couldn’t be simpler.  When the method is called a WorkerPool (thread pool) is created.  These are my threads, not threadpool threads, so I don’t need to worry about running long tasks on them.  We can choose to limit the number of threads created or not without needing to change this method or require anything of the client.  Worker<TSource>.Go() means “GO”, so work begins as soon as possible, this is where I had issues with my Tree<T> and the TPL.  Next, the WorkerPool implementation:

class WorkerPool<T>

        {

            public WorkerPool()

            {

                _workers = new List<Worker<T>>(Environment.ProcessorCount);

                for (int i = 0; i < Environment.ProcessorCount; ++i)

                {

                    Worker<T> worker = new Worker<T>("Payneallel " + i);

                    _workers.Add(worker);

                    worker.Done = new Action(WorkerDone);

                    worker.Go();

                }

                _workerDoneEvent = new ManualResetEvent(false);

            }

 

            private ManualResetEvent _workerDoneEvent;

            private static List<Worker<T>> _workers;

            private object _syncRoot = new object();

 

            /// <summary>

            ///

            /// </summary>

            public void WorkerDone()

            {

                lock (_syncRoot)

                {

                    _workerDoneEvent.Set();

                }

            }

 

            public Worker<T> GetWorker()

            {

                Worker<T> worker = GetFreeWorker();

                while (null == worker)

                {                   

                    _workerDoneEvent.WaitOne();

                    worker = GetFreeWorker();

                }

                _workerDoneEvent.Reset();

                return worker;

            }

 

            private Worker<T> GetFreeWorker()

            {

                foreach (Worker<T> w in _workers)

                {

                    if (!w.Busy)

                    {

                        Console.WriteLine("returning worker from pool");

                        return w;

                    }

                }

                return null;

            }

 

            public void WaitAll()

            {

                while (true)

                {

                    foreach(Worker<T> w in _workers)

                    {

                        w.Finish();

                    }

                    return;                

                }

            }

               

        }

The pool creates one Worker<T> for each physical CPU on the machine.  I’ve come back to wait handles as a means of making things work, but in this case I don’t mind because I’m limiting the number of threads I’m creating.  If Payneallel.ForEach calls GetWorker() but they are all busy, we block until a Worker is available.  WaitAll() makes sure the call to Payneallel.ForEach returns to the calling thread in a synchronous fashion.  The rest of the work is done in the Worker<T> class:

class Worker<T>

        {

            /// <summary>

            /// Set up initial state

            /// </summary>

            /// <param name="name"></param>

            public Worker(string name)

            {

                Name = name;

                Work = null;

                _active = true;

                Busy = false;

                _syncRoot = new object();

               

                _acceptingWorkEvent = new ManualResetEvent(true);

                _gotWorkEvent = new ManualResetEvent(false);

 

                ThreadStart ts = new ThreadStart(DoWork);

                Thread t = new Thread(ts);

                t.Name = Name;

                t.Start();

            }

 

            /// <summary>

            /// set _active to false so the thread will exit

            /// </summary>

            ~Worker()

            {

                _active = false;               

            }

 

            private ManualResetEvent _acceptingWorkEvent;

            private ManualResetEvent _gotWorkEvent;

 

            /// <summary>

            /// The name of the worker, used to Name the thread for easier debugging

            /// </summary>

            public string Name { get; set; }

 

            /// <summary>

            /// The worker is currently doing something

            /// </summary>

            public bool Busy { get; set; }

            private bool _active;

            private object _syncRoot;

 

            /// <summary>

            ///

            /// </summary>

            public Action<T> Work { get; set; }

 

            /// <summary>

            ///

            /// </summary>

            public T Arg { get; set; }

 

            /// <summary>

            /// Call back to the Pool to signify we're done

            /// </summary>

            public Action Done { get; set; }

 

            /// <summary>

            /// Block until done

            /// </summary>

            public void Finish()

            {

                _acceptingWorkEvent.WaitOne();

                _active = false;

            }

 

            /// <summary>

            /// Reset the wait flag so the thread running DoWork will start up again

            /// </summary>

            public void Go()

            {

                _gotWorkEvent.Set();

            }

 

            public void DoWork()

            {

                while (_active)

                {

                    _gotWorkEvent.WaitOne();//Wait for someone to call Go on us

                    if (null != Work && null != Arg) //make sure they set the callback action and the argument to the action

                    {

                        lock (_syncRoot)

                        {

                            Busy = true;

                            _acceptingWorkEvent.Reset();//Not accepting work now..

                        }

                        Work(Arg);//Do the action

                        Work = null;

                        Arg = default(T);

                        lock (_syncRoot)

                        {

                            Busy = false;

                            _acceptingWorkEvent.Set();//We are accepting work

                            _gotWorkEvent.Reset();//Don't have work

                            Done();//Call back to the pool in case someone is waiting on a free worker

                        }

                    }

                }

            }

        }

The Worker<T> has two wait handles to indicate working and available.  I believe the overhead in creating a thread is just enough that it makes sense to do things this way.  Creating a dozen threads is not a big deal (my x64 machine has around 1000 going most of the time) but I would probably avoid creating hundreds or thousands in order to do hundreds or thousands of Units of Work.  In the constructor for Worker<T>, the thread is created and started, however it isn’t doing anything until Go() is called, at which time the Action<T> is executed by the thread.  Note that I’m not taking any care whatsoever to think about thread local storage and the like.  I should probably look at this again to be sure the appropriate granularity of locking occurs.  The PFX team has got my head going 1000km/h and I’m just playing with petty alternate implementations of what they are doing.

I altered my TaskTreeEnumerator back to its original state before I had tweaked it beyond recognition trying to get PFX to play with me. Here is the new/original implementation for use with Paynellel:

        public override bool MoveNext()

        {

            lock (_syncRoot)

            {

                UpdateWork(_tree);

            }

 

            if (_runnableWork.Count > 0)

            {

                return true;

            }

            else

            {

                while (0 == _runnableWork.Count && !_tree.TrueForAll( new Predicate<IExecutable>(Executed) ))

                {

                    Thread.Sleep(200);

                    lock (_syncRoot)

                    {

                        UpdateWork(_tree);

                    }

                    if (_runnableWork.Count > 0)

                    {

                        return true;

                    }

                }

                //use this to block something until a task completes?

                Console.WriteLine("MoveNext->false");

                return false;

            }           

        }

 

        </