Today we’re going to wrap up our study of the APM pattern by seeing how we can implement the IAsyncResult interface. For those that can’t remember, here’s the interface API again:
public interface IAsyncResult{
object AsyncState { get; }
WaitHandle AsyncWaitHandle { get; }
bool CompletedSynchronously { get; }
bool IsCompleted { get; }
}
As we’ve seen in the previous post, we can encapsulate all the asynchronous code in a custom IAsyncResult implementation. And that’s what we’ll do here. Here’s the code for our class:
class TestAsyncResult<T> : IAsyncResult {
private volatile Boolean _isCompleted;
private ManualResetEvent _evt;
private readonly AsyncCallback _cbMethod;
private readonly Object _state;
private T _result;
private Exception _exception;
public TestAsyncResult( Func<T> workToBeDone, AsyncCallback cbMethod, Object state ){
_cbMethod = cbMethod;
_state = state;
QueueWorkOnThreadPool(workToBeDone);
}
private void QueueWorkOnThreadPool(Func<T> workToBeDone) {
ThreadPool.QueueUserWorkItem(state => {
try {
_result = workToBeDone();
} catch (Exception ex) {
_exception = ex;
} finally {
UpdateStatusToComplete(); //1 and 2
NotifyCallbackWhenAvailable(); //3 callback invocation
}
});
}
public T FetchResultsFromAsyncOperation() {
if (!_isCompleted) {
AsyncWaitHandle.WaitOne();
AsyncWaitHandle.Close();
}
if (_exception != null) {
throw _exception;
}
return _result;
}
private void NotifyCallbackWhenAvailable() {
if (_cbMethod != null) {
_cbMethod(this);
}
}
public object AsyncState {
get { return _state; }
}
public WaitHandle AsyncWaitHandle {
get { return GetEvtHandle(); }
}
public bool CompletedSynchronously {
get { return false; }
}
public bool IsCompleted {
get { return _isCompleted; }
}
private readonly Object _locker = new Object();
private ManualResetEvent GetEvtHandle() {
lock (_locker) {
if (_evt == null) {
_evt = new ManualResetEvent(false);
}
if (_isCompleted) {
_evt.Set();
}
}
return _evt;
}
private void UpdateStatusToComplete() {
_isCompleted = true; //1. set _iscompleted to true
lock (_locker) {
if (_evt != null) {
_evt.Set(); //2. set the event, when it exists
}
}
}
}
Our custom IAsyncResult type will receive a Func<T> (a function with no arguments which returns T) which initiates the asynchronous task in a thread from thread pool. Besides that, it will also receive references to the callback and to the state parameters that can be passed to the BeginXXX method.
The constructor is responsible for queuing the passed Func<T> on the thread pool. Since it uses the thread pool, it will always be executed asynchronously (that’s why the IsCompleted property returns false).Notice that we need to catch all exceptions so that we can propagate them later. As you might guess, this will destroy the stack trace, reducing its utility. However, since this is the expected behavior, we need to comply with it (do keep in mind that not handling an exception from the pool leads to a process crash from .NET 2.0 onwards).
The code on the finally block is important. When the operation is completed,you should:
- set the completed flag to true;
- signal the event;
- invoke the callback,when available.
The order by which you perform these steps is important because it ensures things work out correctly (Joe Duffy presents some problems that might happen when
you don’t use the correct order). The previous implementation initializes the event object lazily. And that’s why we need a lock to ensure that it gets proper initialized. Locking might seem unnecessary, but it is not and ensures correctness. You can even even try more exotic things here (take a look at this post by Joe Duffy). However, I prefer to play safe and use a simple lock on these scenarios.
Since we’re talking about the event object, let’s take a look at the GetEvtHandle method which is responsible for getting a reference to the internal ManualResetEvent field. As you can see, the first thing it does is check for a null event reference. When that happens, it will simply create a new ManualResetEvent instance on the non signaled state. After creating the new reference, it will automatically check if the current async processing has ended. If it has, then it will automatically signal the event before returning a reference to it.
The FetchResultsFromAsyncOperation is a helper method which we can use from within the EndXXX method. As you can see, it checks for the _isCompleted flag before waiting on the wait handle (WaitOne is always a costly operation). If there was an exception, it will be thrown from the end method. If not, then we’ll return the result.
Since the hard work is encapsulated in your custom IAsyncResult implementation, the BeginXXX and EndXXX methods are simple wrappers. Here’s a possible use for this custom IAsyncResult implementation:
class DoSomeWork {
public Boolean IsPrimeNumber(Int32 number) {
if (number < 2) {
return false;
}
var half = number / 2;
for (var i = 2; i < half; i++) {
if (number % i == 0) {
return false;
}
}
return true;
}
public IAsyncResult BeginIsPrimeNumber(Int32 number, AsyncCallback cbMethod, Object state) {
return new TestAsyncResult<Boolean>( () => IsPrimeNumber(number),
cbMethod,
state );
}
public Boolean EndIsPrimeNumber(IAsyncResult result) {
var customAsync = result as TestAsyncResult<Boolean>;
if(customAsync == null ){
throw new ArgumentException();
}
return customAsync.FetchResultsFromAsyncOperation();
}
}
As you can see, the
BeginIsPrimeNumber returns a new TestAsyncResult instance which kicks off the async processing (in this case, it will simply invoke the synchronous version on the thread pool). The EndIsPrimeNumber method delegates all the work to the FetchResultsFromAsyncOperation.
Now, you’ve got several options for using this class:
//1. using a callback
var waiter = new ManualResetEvent(false);
var work = new DoSomeWork();
var asyncResult = work.BeginIsPrimeNumber(
51243,
result => { Console.WriteLine(work.EndIsPrimeNumber(result)); waiter.Set(); },
null);
waiter.WaitOne();
//2. polling IsCompleted
var work = new DoSomeWork();
var async = work.BeginIsPrimeNumber(
51243,
null,
null);
while (!async.IsCompleted) {
Console.WriteLine("Sleeping");
Thread.Sleep(100);
}
Console.WriteLine(work.EndIsPrimeNumber(async));
//3. waiting on the handle
var work = new DoSomeWork();
var async = work.BeginIsPrimeNumber(
51243,
null,
null);
while (!async.AsyncWaitHandle.WaitOne(100, false) {
Console.WriteLine("Sleeping");
}
Console.WriteLine(work.EndIsPrimeNumber(async));
//4. blocking until it completes
var work = new DoSomeWork();
var async = work.BeginIsPrimeNumber(
51243,
null,
null);
Console.WriteLine(work.EndIsPrimeNumber(async));
Even though you can reuse this class in several places, I couldn’t finish this post before pointing out that Jeffrey Richter has created a much more reusable implementation of the interface in his MSDN’s concurrent affairs column (which, btw, also shows how you can put the code that starts the async action in the main class itself, instead of putting it all on the IAsyncResult implementation).
And that’s all for today. Keep tuned for more on multithreading.