Friday, April 04, 2008

I went to my first Nerd Lunch in a long time today.   It was good to see some of the Milwaukee peoples from the user group and introcude Mr. Vanderboom to some new people.  Some nerd celebrities are in town too, for Deeper in .NET tomorrow.  I can now say I've met Richard Campbell of .NET Rocks fame.

I'll be at Deeper in .NET tomorrow, unless my wife goes into labor!

Friday, April 04, 2008 2:55:35 PM (Central Standard Time, UTC-06:00)  #    Disclaimer  |  Comments [0]  |  Trackback
 Thursday, April 03, 2008

This article may not make much sense unless you’ve also read:

Managing Concurrency With Trees[0]

Managing Concurrency With Trees[1]

Managing Concurrency With Trees[2]

As I stated in my initial goal, a Tree structure is a natural way to express a number of tasks that need to happen in a certain order, and yet many of these things could be going on at the same time.  I had only read articles on the Task Parallel Library up to this point, but recalled both a video from channel 9 and a DotNetRocks Podcast with Joe Duffy where he stated that the TPL likes to work on collections that implement IEnumerable<T>.   This got me a little bit excited; I assume very few people out there are interested in the ranting of an MVP from Wisconsin with an Orage blog, but if I could shoehorn my ideas into the TPL that would be something different.

My initial goal, then, was to use the TaskTreeScheduler notion of flattening and queuing eligible tasks and combine it with some sort of clever IEnumerable<IExecutable> implementation such that I could pull in the TPL and write code like this (using the same workTree from the last article):

            Action<IExecutable> executeTask = delegate(IExecutable exeTask)

            {

                exeTask.Execute();

                Console.WriteLine("PFX DONE " + exeTask.FriendlyClassName());

            };

           

            Parallel.ForEach<IExecutable>(workTree, executeTask);

This saves me from having to write most of the threading code.  It wasn’t meant to be, though, read on.

To give you an idea on how I’d like this to work, let’s take a simpler example with a linear list of instances of the OddJob IExecutable implementation:

            Action<OddJob> doJob = delegate(OddJob j)

            {

                j.Execute();

            };

 

            List<OddJob> jobs = new List<OddJob>();

            for (int i = 0; i < 100; ++i)

            {

                jobs.Add(new OddJob());

            }

 

            Parallel.ForEach<OddJob>(jobs, doJob);

You’ll notice that the TPL has made quite a few worker threads for me and set about executing quite a few of my OddJobs at once.  By default the library will build at least one thread per physical core on your machine. 

The first thing I discovered is that the TPL does not instantly execute each piece of work it gets out of your IEnumerable<T> collection.  For example, here is the first iteration of the MoveNext() implementation for TaskTreeEnumerator:

        public override bool MoveNext()

        {

            lock (_syncRoot)

            {

                UpdateWork(_tree);

            }

 

            if (_runnableWork.Count > 0)

            {

                return true;

            }

            else

            {

                while (!_tree.TrueForAll(new Predicate<IExecutable>(Executed)) && _runnableWork.Count == 0)

                {

                    lock (_syncRoot)

                    {

                        UpdateWork(_tree);

                    }

                }

                //use this to block something until a task completes?

                return false;

            }           

        }

An enumerator should return false when you are past the last object in your collection.  I wanted to keep this from ever happening and basically block whatever traffic cop is consuming the enumerator until more work could be added to the Queue of _runnableWork.  With the code as shown above the first task (RootTask) is never executed; we are therefore stuck in an infintie while loop.  Current is requested and returned, but no joy.

Through experimentation I found some additional interesting behavior.  The TPL will allow MoveNext() to return false several times before it actually gives up and believes the collection is empty.  While attampting to decipher this behavior, I thought I could restructure the calling code to take multiple iterations over the tree, which would be FAR from idea but still let me achieve some consolation prize:

            Action<IExecutable> executeTask = delegate(IExecutable exeTask)

            {

                exeTask.Execute();

                Console.WriteLine("PFX DONE " + exeTask.FriendlyClassName());

            };

 

 

            Predicate<IExecutable> allDone = new Predicate<IExecutable>(Executed);

            while (!workTree.TrueForAll(allDone))

            {

                Console.WriteLine("=========TPL Iteration=========");

                Parallel.ForEach<IExecutable>(workTree, executeTask);

            }

 

This, however, met with failure of another sort.  Here’s the entire enumerator implementation, complete with the various debug statements:

public class TaskTreeEnumerator : TreeEnumerator<IExecutable>

    {

        public TaskTreeEnumerator(Tree<IExecutable> tree) : base(tree)

        {

            if (!tree.Value.IsComplete)

            {

                _runnableWork.Enqueue(tree.Value);

                tree.Value.Scheduled = true;

                Console.WriteLine("Setting " + tree.Value.FriendlyClassName() + " as runnable");

            }           

        }

 

        public override bool MoveNext()

        {

            lock (_syncRoot)

            {

                UpdateWork(_tree);

            }

 

            if (_runnableWork.Count > 0)

            {

                return true;

            }

            else

            {

                lock (_syncRoot)

                {

                    UpdateWork(_tree);

                }

                //use this to block something until a task completes?

                Console.WriteLine("MoveNext->false");

                return false;

            }           

        }

 

        public override IExecutable Current

        {

            get

            {

                _current = _runnableWork.Dequeue();

                _current.Scheduled = true;               

                Console.WriteLine("Current Returning:" + base.Current.FriendlyClassName());

                return base.Current;

            }

        }

 

        public bool Executed(IExecutable target)

        {

            return target.IsComplete;

        }

 

        public override void Reset()

        {

            Console.WriteLine("Reset!!");

        }

       

 

        private void UpdateWork(Tree<IExecutable> tree)

        {

            if (!tree.Value.IsComplete

                && !_runnableWork.Contains(tree.Value)

                && !tree.Value.Scheduled

                && null != tree.Parent

                && tree.Parent.Value.IsComplete)

            {               

                _runnableWork.Enqueue(tree.Value);

                Console.WriteLine("Setting " + tree.Value.FriendlyClassName() + " as runnable");

            }

            else if (tree.Value.IsComplete)

            {

                foreach (Tree<IExecutable> subTree in tree.ChildNodes)

                {

                    UpdateWork(subTree);

                }

            }

        }

    }

You can tell by the number of “ifs” in UpdateWork that I was getting frantic.  Given the code above, check out the output:

See anything odd in there?  Despite having four Worker Threads created, the tasks are all ran sequentially.  Not very concurrent of the TPL, is it?  Either I have made some kind of colossal mistake or the code just can’t handle this situation, which would be disappointing.  If the TPL would start executing Tasks AS they are yielded from the Enumerator I’m satisfied this would work.  There’s probably a very good reason why they are doing it this way.    There is a ParallelEnumerable class in the December CTP of the Parallel Extensions but that appears to be for Parallel LINQ and may not resolve the situation.

If I get time, I plan on taking a peek with Reflector to see if I can uncover a method that will work for what I want to do.

{Edit: I couldn’t help myself.  Using reflector I found the code in ForEachWorker<> where it seems to try to get a chunk of at least 8 tasks to run before actually starting the work.  It’s hard to decipher some Generics code via Reflector but I’d seriously like to see the PFX team change how this works.  Granted, it’s easy to write the threading code if you have, say, four long-running tasks.}

And today, look what drifted into my RSS reader from the Parallel FX team blog:

http://blogs.msdn.com/pfxteam/archive/2008/04/03/8354128.aspx

“…Parallel Extensions relies on threads, so you can continue to use existing synchronization primitives with Parallel Extensions just as you would with threads you spin up manually or with threads from the ThreadPool; we're also introducing some new synchronization and coordination primitives in upcoming releases of Parallel Extensions, so stay tuned for those …” (emphasis added)

If I wanted to, I could interpret “coordination primitives” as being the sort of construct I’m trying to create here.  I can’t wait for the next CTP.  For now, though, this wraps up my current thoughts on this subject.

Thursday, April 03, 2008 9:33:03 PM (Central Standard Time, UTC-06:00)  #    Disclaimer  |  Comments [0]  |  Trackback

You may want to be familiar with these articles before continuing:

Managing Concurrency With Trees[0]

Managing Concurrency With Trees[1]

Client Code

Now that I’ve built and populated my Tree<IExecutable>, I need to get something to go to work on it.  The client code is very simple:

            Console.WriteLine("=========Task Tree Scheduler=========");

            TaskTreeScheduler scheduler = new TaskTreeScheduler();

            scheduler.HardwareThreadsOnly = false;

            scheduler.Run(workTree);

The last line of code shown here will block until all the work contained in the tree is done.

The TaskTreeScheduler Class

So, how does the work actually get done?

    public class TaskTreeScheduler

    {

        public TaskTreeScheduler()

        {

            _maxThreads = Environment.ProcessorCount;

            _eligible = new Queue<IExecutable>();

            _syncRoot = new object();

        }

 

        private Tree<IExecutable> _work;

        private int _maxThreads;

        private int _runningThreads;

        private object _syncRoot;

        private Queue<IExecutable> _eligible;

As you can already see, we use the Tree structure to describe dependencies, but we will be slowly creating a flattened view into the tree using a Queue<IExecutable>.  The next two methods show how we get some threads started:

 

        private void AddEligible(IExecutable task)

        {

            lock (_syncRoot)

            {

                _eligible.Enqueue(task);

                Console.WriteLine("{0} tasks elibible to run", _eligible.Count);

            }

        }

 

        public void Run(Tree<IExecutable> workTree)

        {

            _work = workTree;

            AddEligible(_work.Value);

            ThreadStart ts = new ThreadStart(RunTrafficCop);

            Thread cop = new Thread(ts);

            cop.Name = "TrafficCop";

            cop.Start();

            cop.Join();

        }

When we pass a Tree<IExecutable> to Run, a “traffic cop” thread gets started.  The traffic cop could be the UI thread, however I wanted to allow the UI to block or go about it’s business, so Run uses Thread.Join().  The first item in the Queue is the Value of the Root of the tree, and we print some simple debug information to help visualize what goes on as work happens.  The entire RunTrafficCop method is fairly short:

        public void RunTrafficCop()

        {

            while (true)

            {

                int work = 0;

                lock (_syncRoot)

                {

                    work = _eligible.Count;

                }

                bool allComplete = _work.TrueForAll(new Predicate<IExecutable>(Complete));

                while (work > 0 || !allComplete)

                {

                    IExecutable task = null;

                    if (_eligible.Count > 0)

                    {

                        Console.WriteLine("***Waiting for work***");

                        task = _eligible.Dequeue();                       

                    }

                    else

                    {

                        Thread.Sleep(2);//Traffic cop sleeps

                        allComplete = _work.TrueForAll(new Predicate<IExecutable>(Complete));

                        continue;

                    }

                    StartThread(task);

 

                    lock (_syncRoot)

                    {

                        work = _eligible.Count;

                    }

                    allComplete = _work.TrueForAll(new Predicate<IExecutable>(Complete));

                }

                return;

            }

        }

We could do with one less while() loop here, but here’s what is going on.  The root task will be in the queue of eligible work.  As the traffic cop iterates, it will either Dequue() the next item that can be executed or enter a very brief wait state and check the work in the tree using the Predicate<IExecutable> to check the IsComplete flag of each instance.  If work is found, we call StartThread:

        public void StartThread(IExecutable task)

        {

            Interlocked.Increment(ref _runningThreads);

            PrintThreadUsage();

            task.Complete = new CompleteCallback(OnTaskComplete);

            ThreadStart ts = new ThreadStart(task.Execute);

            Thread t = new Thread(ts);

            t.Name = task.FriendlyClassName();

            t.Start();

        }

So here we’ve started a thread for an eligible-to-run IExecutable instance. This is where the light plumbin of the IExecutable contract comes into play and we see the expected behavior of implementing classes:

        public override void Execute()

        {

            base.Execute();

            Console.WriteLine("OddJob::Execute");