Monday, September 22, 2008

A Simple Task Queue

I’ve spent the last 4 posts talking about Regular Expressions and some difficult patterns. But, this is a C# blog, so I really want to be talking about C#. Today, I hope to provide you with a nice little start on a multi-threading "Task Queue" application. A Task Queue will place task requests in a queue that will be serviced Asynchronously and in the order received.

Rather than keep you in suspense, here’s the code up front. If you'd like an explanation, I've attempted that below. (Update 10/6/2008: Sorry folks about the bug below. The Enqueue method must set the the Busy field to true when queing the first task in order to avoid the thread race. It's now been fixed.)


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Threading;

namespace TaskQueuePOC
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}

int counter = 0;
private void button1_Click(object sender, EventArgs e)
{
new MyTaskQueue(counter++).Enqueue();
}
}

public class MyTaskQueue : TaskQueue
{
public MyTaskQueue(object UserData)
: base(UserData)
{
}
protected override void Task()
{
Thread.Sleep(2000);
Console.WriteLine(UserData.ToString());
}
}

public abstract class TaskQueue
{
public object UserData { get; private set; }
public TaskDelegate TaskDlgt { get; private set; }
protected abstract void Task();

public void Enqueue()
{
TaskDlgt = new TaskDelegate(Task);
lock(lockObject)
{
if(Busy)
_q.Enqueue(TaskDlgt);
else {
Busy = true;
TaskDlgt.BeginInvoke(new AsyncCallback(this.TaskCallback), TaskDlgt);
}
}
}

private static Queue _q = new Queue();
private static bool Busy = false;
private static object lockObject = new object();

public TaskQueue(object Data)
{
UserData = Data;
}

public delegate void TaskDelegate();

private void TaskCallback(IAsyncResult ar)
{
TaskDelegate dlgt = ar.AsyncState as TaskDelegate;
if(dlgt.Equals(TaskDlgt))
dlgt.EndInvoke(ar);
NextTask();
}
private void NextTask()
{
TaskDelegate dlgt;
lock(lockObject)
{
if(_q.Count > 0)
{
dlgt = _q.Dequeue();
dlgt.BeginInvoke(TaskCallback,dlgt);
}
else
Busy = false;
}
}
}
}


I've titled the article "A Simple Task Queue", but simple is a little mis-leading. That's because it's difficult to tell by looking (for the beginner) where the threads are or even how it works. The key to understanding this implementation is to understand the "delegate BeginInvoke" call. I've covered that in other articles on threading, in particular, I refer you to "Threading with .NET ThreadPool Part 4" for a deeper discussion.

Overall, the work horse of this application is the Abstract Class TaskQueue. The class implements all that's needed to queue tasks and execute them in order. All the user need supply is a derived class that overrides "Task()" and a Constructor that passes in some UserData. The application would instantiate a new derived TaskQueue object with the data needed for the task and then call the Enqueue() method.

The TaskQueue works by creating 3 private static control members. There is the queue which holds delegates to run. There's also a Busy member to say when there is an active thread running. There is also a LockObject used internally to synchronize thread access to the the queue and the Busy indicator. This is needed because as one thread completes and tries to update the Busy indicator or take a new delegate off of the queue for execution, the "producer" thread (our UI in this case) may be trying to enqueue another delegate. Since two threads could be accessing these variables simultaneously, we synchronize them with a Mutex on the LockObject.

The multi-threading comes into play when a delegate's BeginInvoke(...) method is called. This method will allocate a thread from the ThreadPool and execute the Task in that thread. BeginInvoke is provided a TaskCallback() function and some state information. In this case, the state information is a reference to the delegate.

The Callback function is responsible for checking the queue and launching the next Task or setting Busy to false.

The magic in all of this is that the delegate is a reference to a specific instance of a TaskQueue object's Task() method. That way, the delegate's Task() method has access to local information about that specific task. So, each delegate will operate with its own version of UserData. Notice, this is also the reason that NextTask() dequeue's a delegate and places the reference in a local variable rather than the TaskDlgt member. TaskDlgt is a reference to one's self, while the delegate taken from the queue is a different delegate. So, each task's completion callback is responsible to start the next task (if any).

Also, notice that the Tasks are highly encapsulated. Once the task is on the queue, all that is available is the delegate. The rest of the task is somewhat hidden, though reflection can be used to make tweaks to the tasks and task data if necessary.

There are many improvements that can be made. Most notably, one could add a way to stop the queue and cancel remaining tasks. One could add controls to their derived class and overridden Task() member to allow individual task control or gross control over the entire queue.

There you have it, a "simple" Task Queue for your amusement and edification. Hopefully, you find it as useful as I have. Also, I have written several other articles on threads and threading. Please feel free to poke around my archives, you may find these articles useful as well.

10/6/2008: Recently, a kind reader pointed out a bug in my code (since fixed) where "Busy" was not being set to "true" anywhere. The obvious behavior was that all tasks ran immediately, no queing was done at all.

2 comments:

leo4ever said...

Hello there,
Thanks for sharing your sample code. I have to achieve something similar, in my case I need to have three queues and I should be able to find when a task gets completed, which I assume could be found easily. But, once a task gets completed, based on certain conditions I might have to add it to another queue.
Can you tell me how can I approach this issue?

Les Potter - Xalnix Corporation said...

If you want multiple Task Queues, you have a little work to do. This design does not lend itself quickly to multiple queues because the TaskQueue base class contains a static Queue for all tasks to use. This kept the implementation simple. With this design, you just set and forget.

To have multiple separate task queues, you need to manage the backend queues somehow. Ideas that come to mind is to keep a list or array of queues in the base class rather than just one. This could retain the set-and-forget behavior, but you need to provide the parameters to get your tasks queued to the right place.

Another option is to not use static queues, but have a separate instance of TaskQueue with its own instance of a queue. You could nolonger derive a Task (MyTaskQueue) from the TaskQueue as I did, so you could not override Task().

I know this is not much detail and leaves a lot to the reader. If I should get some time, I may attempt another more general purpose queue like you describe.