Thursday, August 7, 2008

Threading with .NET ThreadPool Part 2

Let's continue looking at System.Threading.ThreadPool. In the last article, we went over some of the basic features a thread pool should have. We looked specifically at System.Threading.ThreadPool to see how to get your App threading quickly. And, we touched on some aspects missing from Microsoft's thread pool implementation that are needed for an "industrial strength" application. So today, we will examine some of those missing features.

Let's start with a skeleton application. This application starts a master work item that is responsible for feeding service work items to the thread pool. The service work items do some work (right now, they display a message with Console.WriteLine(...) and then sleep a random amount of time in the sub-second range). We get a taste of synchronization with the shared random number generator where I surround it with a lock(...){ } statement to make it thread safe. When service work items are done, they exit. "Exit" is a term you should be careful with. Often, it means an explicit call to some function to exit a program. Here we mean, the thread returns from the callback function. So, it's our task that's exiting, not the thread.

This skeletal application also has a button to interrupt the master work item's thread, which in turn interrupts the service work item threads and waits for their completion which should occur within 1 second or so. To facilitate a clean shutdown, I employ a little more thread synchronization using a simple task counter.

Here's the code. To use it, you can create a new Windows Forms project named ThreadPoolApp, drop a button on it, then view the source code for Form1.cs. Copy the code below and replace the code in Form1.cs. Last, set your button's event handler to use the one in my code.


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Threading;

namespace ThreadPoolApp
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
ThreadPool.QueueUserWorkItem(new MasterWorkItem().ThreadPoolCallback);
}
private void button1_Click(object sender, EventArgs e)
{
WorkItem.Interrupted = true;
}
}
public abstract class WorkItem
{
public abstract void ThreadPoolCallback(object context);

private static bool _interrupted = false;
public static bool Interrupted {
get { return _interrupted; }
set { _interrupted = value; }
}
private static long _numWorkItems = 0;
protected WorkItem()
{
Interlocked.Increment(ref WorkItem._numWorkItems);
}
protected void WorkItemDone()
{
Interlocked.Decrement(ref WorkItem._numWorkItems);
}
protected long NumWorkItems
{
get { return _numWorkItems; }
}
}
public class MasterWorkItem : WorkItem
{
public override void ThreadPoolCallback(object context)
{
try
{
while (!Interrupted)
{
if (NumWorkItems < 500)
{
ThreadPool.QueueUserWorkItem(new ServiceWorkItem().ThreadPoolCallback);
}
else
{
Console.WriteLine("Letting pool drain some");
Thread.Sleep(500);
}
}
while (NumWorkItems > 1)
{
Console.WriteLine("Waiting for child tasks {0}", NumWorkItems);
Thread.Sleep(1000);
}
}
finally
{
WorkItemDone();
}
}
}
public class ServiceWorkItem : WorkItem
{
public override void ThreadPoolCallback(object context)
{
try
{
if (!Interrupted)
{
int val = ServiceWorkItem.MyRand();
int myThread = Thread.CurrentThread.GetHashCode();
Console.WriteLine("Thread {0}, WorkItem {1}, Sleeping {2}",
myThread.ToString(),
this.GetHashCode().ToString(),
val.ToString());
Thread.Sleep(val);
Console.WriteLine("Thread {0}, WorkItem {1}, done.", myThread.ToString(), this.GetHashCode().ToString());
}
else
{
Console.WriteLine("Thread {0}, WorkItem {1}, Interrupted",
Thread.CurrentThread.GetHashCode().ToString(),
this.GetHashCode().ToString());
}
}
finally
{
WorkItemDone();
}
}
private static Random rnd = new Random();
public static int MyRand()
{
lock (rnd)
{
return rnd.Next(100, 1000);
}
}
}
}


I've created three classes, an abstract WorkItem class, the MasterWorkItem class and the ServiceWorkItem class. The MasterWorkItem and the ServiceWorkItem classes derive from the WorkItem class which enforces that the derived classes implement the CallBack according to the function signature required by the the static ThreadPool method QueueUserWorkItem(). The WorkItem class implements a control member whereby I can stop the Master and Service work items. Each work item has its own instance data that the callback can refer to. Each can also refer to some global information in the WorkItem static class.

Now, if you run the code, you will start seeing text information in the debugger output window. The master is limited to having only 500 tasks active. It goes to sleep and lets the queue "drain" a little bit when it hits this limit.

When you click the button after a few seconds, you will see output from the tasks being terminated. The threads are not necessarily terminating, they are just pulling tasks from the queue and the callbacks are returning immediately after reporting that they have been interrupted. When the task count drops to 1, the master work item (task) can return from it's callback.

Maybe you see "thread xxx has exited" messages. I have said that with the thread pool, tasks exit and not threads. So, why is it then that we see messages from the debugger that such-and-such thread has exited? That is because the thread pool is managing it's resources. If it doesn't need as many active threads, some of the threads exit. But that is not a concern of our application, we are only concerned about our task exiting. Whether the thread pool starts or stops threads to service our app is completely under the hood.

As I mentioned in my last article, getting multiple threads working in your application is almost trivial with the ThreadPool, but you must deal with several aspects of thread synchronization to have a useful app. Again, these aspects are...

1. Enqueuing work.
2. Detecting queue overload and throttling.
3. Detecting work start.
4. Reporting work progress.
5. Detecting work completion.
6. Stopping work-in-progress.
7. Cancelling un-started work.

This sample application demonstrates several of these aspects. Of course, it enqueues work, the Master work item self-throttles with a simple count and sleep mechanism. Work completion is detected at the high level by the Master task with a task counter, but it is a weak method in that we cannot really tell which work item completes. Cancelling un-started work and stopping work-in-progress is accomplished with a simple state variable shared by all tasks called "Interrupted". This application does not really report progress or indicate when a task starts (I don't consider debug messages to the console to be accomplishing that). I will try to flesh out the progress communication and startup notification in my next article.

Before I leave, I have just a quick word about thread synchronization. You will notice that I am not using anything fancy like AutoResetEvent, or Mutex, or Monitor, or the Wait{...} type calls. I use Interlock.Increment() and Interlock.Decrement() to manage the changing of the task count, and I use lock(){...} to protect the Random number generator. Even there, the lock() may not be necessary, but the documentation does not say the Random.Next() call is thread safe, so I add it. I can get away with my Interrupted flag being a simple variable because I know there is only one thread that will ever write to it. Threads that read from Interrupted are not harmed by rogue timings or simultaneous access. Similarly, reading the task count is not protected because timing of reads and writes do not significantly affect the result.

In my next article, I will provide more on fancier synchronization, and demonstrate some progress reporting and startup detection.

No comments: