Managing Concurrency With Trees[4]
Read the entire saga:
Managing Concurrency With Trees[0]
Managing Concurrency With Trees[1]
Managing Concurrency With Trees[2]
Managing Concurrency With Trees[3]
As I indicated at the end of the last article, I might come back to this topic. I’m back. Two things made me unable to put this topic down. #1 was using Reflector and finding various code in Parallel.ForEach written around things like this:
private const int DEFAULT_LOOP_STRIDE = 8;
As near as I can read the code, the Parallel extensions don’t really get their swerve on until you’ve got quite a few more tasks than I’m using for examples here. My uneducated suspicion is that this has to do with building up a queue of work for each thread and implementing the work stealing.
Recall from my last example I can even frustrate the TPL into executing quite a few things sequentially with that specific Enumerator implementation. This is what I’d like to avoid.
I’ll definitely need to come up with some realistic examples with hundreds or thousands of little tasks, but for now I’ll try to see my original idea through. But not with Parallel FX. It’s time for Payneallel FX.
I made the comment to my muse that I wondered how much it would take to make my own version of Parallel.ForEach: no fancy work stealing and very specific to my problem. The PFX team surely has very good reasons for doing what they are doing but I was frustrated at being unable to use my custom Enumerator. I want to be able to write code like this:
Parallel.ForEach<IExecutable>(workTree, executeTask);// tree based, low # of tasks
… or like this …
Parallel.ForEach<OddJob>(jobs, doJob); //List based, high # of tasks
…and have it work without hinting. Thinking I was going to be blowing my whole weekend on this, I started thinking and typing. Once I decided how to limit the number of threads created and re-use a thread rather than make a new thread for each task, it went much better than I expected. Around 208 lines of code later, I had something that worked 100% the way I wanted it to!
There are three classes: a static class containing a static ForEach method, a WorkerPool class, and a Worker class.
Paralell.ForEach seems like a pretty good way to write this kind of code, so I imitated this interface. Here’s the sample Main method:
int exeCount = 0;
Action<OddJob> doJob = delegate(OddJob j)
{
j.Execute();
Interlocked.Increment(ref exeCount);
Console.WriteLine("Executed " + exeCount);
};
List<OddJob> jobs = new List<OddJob>();
for (int i = 0; i < 20; ++i)
{
jobs.Add(new OddJob());
}
Payneallel.ForEach<OddJob>(jobs, doJob);
This should look just like TPL code. The actual implementation is presented in its entirety here with some basic description afterwards.
public static class Payneallel
{
public static void ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
{
WorkerPool<TSource> pool = new WorkerPool<TSource>();
foreach (TSource src in source)
{
Worker<TSource> worker = pool.GetWorker();
worker.Arg = src;
worker.Work = body;
worker.Go();
}
pool.WaitAll();
}
This couldn’t be simpler. When the method is called a WorkerPool (thread pool) is created. These are my threads, not threadpool threads, so I don’t need to worry about running long tasks on them. We can choose to limit the number of threads created or not without needing to change this method or require anything of the client. Worker<TSource>.Go() means “GO”, so work begins as soon as possible, this is where I had issues with my Tree<T> and the TPL. Next, the WorkerPool implementation:
class WorkerPool<T>
{
public WorkerPool()
{
_workers = new List<Worker<T>>(Environment.ProcessorCount);
for (int i = 0; i < Environment.ProcessorCount; ++i)
{
Worker<T> worker = new Worker<T>("Payneallel " + i);
_workers.Add(worker);
worker.Done = new Action(WorkerDone);
worker.Go();
}
_workerDoneEvent = new ManualResetEvent(false);
}
private ManualResetEvent _workerDoneEvent;
private static List<Worker<T>> _workers;
private object _syncRoot = new object();
/// <summary>
///
/// </summary>
public void WorkerDone()
{
lock (_syncRoot)
{
_workerDoneEvent.Set();
}
}
public Worker<T> GetWorker()
{
Worker<T> worker = GetFreeWorker();
while (null == worker)
{
_workerDoneEvent.WaitOne();
worker = GetFreeWorker();
}
_workerDoneEvent.Reset();
return worker;
}
private Worker<T> GetFreeWorker()
{
foreach (Worker<T> w in _workers)
{
if (!w.Busy)
{
Console.WriteLine("returning worker from pool");
return w;
}
}
return null;
}
public void WaitAll()
{
while (true)
{
foreach(Worker<T> w in _workers)
{
w.Finish();
}
return;
}
}
}
The pool creates one Worker<T> for each physical CPU on the machine. I’ve come back to wait handles as a means of making things work, but in this case I don’t mind because I’m limiting the number of threads I’m creating. If Payneallel.ForEach calls GetWorker() but they are all busy, we block until a Worker is available. WaitAll() makes sure the call to Payneallel.ForEach returns to the calling thread in a synchronous fashion. The rest of the work is done in the Worker<T> class:
class Worker<T>
{
/// <summary>
/// Set up initial state
/// </summary>
/// <param name="name"></param>
public Worker(string name)
{
Name = name;
Work = null;
_active = true;
Busy = false;
_syncRoot = new object();
_acceptingWorkEvent = new ManualResetEvent(true);
_gotWorkEvent = new ManualResetEvent(false);
ThreadStart ts = new ThreadStart(DoWork);
Thread t = new Thread(ts);
t.Name = Name;
t.Start();
}
/// <summary>
/// set _active to false so the thread will exit
/// </summary>
~Worker()
{
_active = false;
}
private ManualResetEvent _acceptingWorkEvent;
private ManualResetEvent _gotWorkEvent;
/// <summary>
/// The name of the worker, used to Name the thread for easier debugging
/// </summary>
public string Name { get; set; }
/// <summary>
/// The worker is currently doing something
/// </summary>
public bool Busy { get; set; }
private bool _active;
private object _syncRoot;
/// <summary>
///