Damon Payne: Hand waving software architect

103db signal to noise ratio at < .03% total harmonic distortion
Solution Architect, software developer, geek
Damon Payne at Blogged
2009 Microsoft MVP - Client App Dev
2007 Microsoft MVP - Solution Architecture
 Thursday, April 03, 2008
« Managing Concurrency With Trees[2] | Main | Nerd Lunch »

This article may not make much sense unless you’ve also read:

Managing Concurrency With Trees[0]

Managing Concurrency With Trees[1]

Managing Concurrency With Trees[2]

As I stated in my initial goal, a Tree structure is a natural way to express a number of tasks that need to happen in a certain order, and yet many of these things could be going on at the same time.  I had only read articles on the Task Parallel Library up to this point, but recalled both a video from channel 9 and a DotNetRocks Podcast with Joe Duffy where he stated that the TPL likes to work on collections that implement IEnumerable<T>.   This got me a little bit excited; I assume very few people out there are interested in the ranting of an MVP from Wisconsin with an Orage blog, but if I could shoehorn my ideas into the TPL that would be something different.

My initial goal, then, was to use the TaskTreeScheduler notion of flattening and queuing eligible tasks and combine it with some sort of clever IEnumerable<IExecutable> implementation such that I could pull in the TPL and write code like this (using the same workTree from the last article):

            Action<IExecutable> executeTask = delegate(IExecutable exeTask)

            {

                exeTask.Execute();

                Console.WriteLine("PFX DONE " + exeTask.FriendlyClassName());

            };

           

            Parallel.ForEach<IExecutable>(workTree, executeTask);

This saves me from having to write most of the threading code.  It wasn’t meant to be, though, read on.

To give you an idea on how I’d like this to work, let’s take a simpler example with a linear list of instances of the OddJob IExecutable implementation:

            Action<OddJob> doJob = delegate(OddJob j)

            {

                j.Execute();

            };

 

            List<OddJob> jobs = new List<OddJob>();

            for (int i = 0; i < 100; ++i)

            {

                jobs.Add(new OddJob());

            }

 

            Parallel.ForEach<OddJob>(jobs, doJob);

You’ll notice that the TPL has made quite a few worker threads for me and set about executing quite a few of my OddJobs at once.  By default the library will build at least one thread per physical core on your machine. 

The first thing I discovered is that the TPL does not instantly execute each piece of work it gets out of your IEnumerable<T> collection.  For example, here is the first iteration of the MoveNext() implementation for TaskTreeEnumerator:

        public override bool MoveNext()

        {

            lock (_syncRoot)

            {

                UpdateWork(_tree);

            }

 

            if (_runnableWork.Count > 0)

            {

                return true;

            }

            else

            {

                while (!_tree.TrueForAll(new Predicate<IExecutable>(Executed)) && _runnableWork.Count == 0)

                {

                    lock (_syncRoot)

                    {

                        UpdateWork(_tree);

                    }

                }

                //use this to block something until a task completes?

                return false;

            }           

        }

An enumerator should return false when you are past the last object in your collection.  I wanted to keep this from ever happening and basically block whatever traffic cop is consuming the enumerator until more work could be added to the Queue of _runnableWork.  With the code as shown above the first task (RootTask) is never executed; we are therefore stuck in an infintie while loop.  Current is requested and returned, but no joy.

Through experimentation I found some additional interesting behavior.  The TPL will allow MoveNext() to return false several times before it actually gives up and believes the collection is empty.  While attampting to decipher this behavior, I thought I could restructure the calling code to take multiple iterations over the tree, which would be FAR from idea but still let me achieve some consolation prize:

            Action<IExecutable> executeTask = delegate(IExecutable exeTask)

            {

                exeTask.Execute();

                Console.WriteLine("PFX DONE " + exeTask.FriendlyClassName());

            };

 

 

            Predicate<IExecutable> allDone = new Predicate<IExecutable>(Executed);

            while (!workTree.TrueForAll(allDone))

            {

                Console.WriteLine("=========TPL Iteration=========");

                Parallel.ForEach<IExecutable>(workTree, executeTask);

            }

 

This, however, met with failure of another sort.  Here’s the entire enumerator implementation, complete with the various debug statements:

public class TaskTreeEnumerator : TreeEnumerator<IExecutable>

    {

        public TaskTreeEnumerator(Tree<IExecutable> tree) : base(tree)

        {

            if (!tree.Value.IsComplete)

            {

                _runnableWork.Enqueue(tree.Value);

                tree.Value.Scheduled = true;

                Console.WriteLine("Setting " + tree.Value.FriendlyClassName() + " as runnable");

            }           

        }

 

        public override bool MoveNext()

        {

            lock (_syncRoot)

            {

                UpdateWork(_tree);

            }

 

            if (_runnableWork.Count > 0)

            {

                return true;

            }

            else

            {

                lock (_syncRoot)

                {

                    UpdateWork(_tree);

                }

                //use this to block something until a task completes?

                Console.WriteLine("MoveNext->false");

                return false;

            }           

        }

 

        public override IExecutable Current

        {

            get

            {

                _current = _runnableWork.Dequeue();

                _current.Scheduled = true;               

                Console.WriteLine("Current Returning:" + base.Current.FriendlyClassName());

                return base.Current;

            }

        }

 

        public bool Executed(IExecutable target)

        {

            return target.IsComplete;

        }

 

        public override void Reset()

        {

            Console.WriteLine("Reset!!");

        }

       

 

        private void UpdateWork(Tree<IExecutable> tree)

        {

            if (!tree.Value.IsComplete

                && !_runnableWork.Contains(tree.Value)

                && !tree.Value.Scheduled

                && null != tree.Parent

                && tree.Parent.Value.IsComplete)

            {               

                _runnableWork.Enqueue(tree.Value);

                Console.WriteLine("Setting " + tree.Value.FriendlyClassName() + " as runnable");

            }

            else if (tree.Value.IsComplete)

            {

                foreach (Tree<IExecutable> subTree in tree.ChildNodes)

                {

                    UpdateWork(subTree);

                }

            }

        }

    }

You can tell by the number of “ifs” in UpdateWork that I was getting frantic.  Given the code above, check out the output:

See anything odd in there?  Despite having four Worker Threads created, the tasks are all ran sequentially.  Not very concurrent of the TPL, is it?  Either I have made some kind of colossal mistake or the code just can’t handle this situation, which would be disappointing.  If the TPL would start executing Tasks AS they are yielded from the Enumerator I’m satisfied this would work.  There’s probably a very good reason why they are doing it this way.    There is a ParallelEnumerable class in the December CTP of the Parallel Extensions but that appears to be for Parallel LINQ and may not resolve the situation.

If I get time, I plan on taking a peek with Reflector to see if I can uncover a method that will work for what I want to do.

{Edit: I couldn’t help myself.  Using reflector I found the code in ForEachWorker<> where it seems to try to get a chunk of at least 8 tasks to run before actually starting the work.  It’s hard to decipher some Generics code via Reflector but I’d seriously like to see the PFX team change how this works.  Granted, it’s easy to write the threading code if you have, say, four long-running tasks.}

And today, look what drifted into my RSS reader from the Parallel FX team blog:

http://blogs.msdn.com/pfxteam/archive/2008/04/03/8354128.aspx

“…Parallel Extensions relies on threads, so you can continue to use existing synchronization primitives with Parallel Extensions just as you would with threads you spin up manually or with threads from the ThreadPool; we're also introducing some new synchronization and coordination primitives in upcoming releases of Parallel Extensions, so stay tuned for those …” (emphasis added)

If I wanted to, I could interpret “coordination primitives” as being the sort of construct I’m trying to create here.  I can’t wait for the next CTP.  For now, though, this wraps up my current thoughts on this subject.