A Synchronization Context Based On The TPL Dataflow Library

The purpose of the synchronization model implemented by the SynchronizationContext class is to allow the asynchronous/synchronization operations of the common language runtime to behave properly with different synchronization models. This model also simplifies some of the requirements that managed applications have to follow in order to work correctly under different synchronization environments. Examples of such synchronization environments are user interface infrastructures like Windows Forms, Windows Presentation Foundation and ASP.NET.


Providers of synchronization models extend this class to provide their own implementations. Because these user interface infrastructures usually run on their own threads, dispatching execution to their execution contexts usually means leaving the current thread.


The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications. These dataflow components are collectively referred to as the TPL Dataflow Library. This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks. The dataflow components build on the types and scheduling infrastructure of the TPL and integrate with the C#, Visual Basic, and F# language support for asynchronous programming. These dataflow components are useful when you have multiple operations that must communicate with one another asynchronously or when you want to process data as it becomes available.


The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.


Sometimes, for demo or testing purposes, I need a synchronization context that behaves like the user interface ones but doesn’t force me to build applications with a user interface and the TPL Dataflow Library seemed like a good option to implement such synchronization context.


It was as easy as this:


public class TplDataflowSynchronizationContext : SynchronizationContext
{
    private ActionBlock<ActionItem> ab
        = new ActionBlock<ActionItem>(
            item =>
            {
                Trace.WriteLine(
                    string.Format("{0}: {1}", Environment.CurrentManagedThreadId, item.operation));

                try
                {
                    item.d(item.state);
                }
                finally
                {
                    item.SetResult(true);
                }
            });

    public override SynchronizationContext CreateCopy()
    {
        return new TplDataflowSynchronizationContext();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        Trace.WriteLine(
            string.Format("{0}: Posting...", Environment.CurrentManagedThreadId));

        this.ab.Post(new ActionItem { d = d, state = state, operation = "Post" });

        Trace.WriteLine(
            string.Format("{0}: Posted.", Environment.CurrentManagedThreadId));
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        Trace.WriteLine(
            string.Format("{0}: Sending...", Environment.CurrentManagedThreadId));

        ActionItem item = new ActionItem { d = d, state = state, operation = "Send" };
        this.ab.SendAsync(item);
        item.Task.Wait();

        Trace.WriteLine(
            string.Format("{0}: Sent.", Environment.CurrentManagedThreadId));
    }

    public class ActionItem : TaskCompletionSource<bool>
    {
        public SendOrPostCallback d;
        public object state;
        public string operation;
    }
}

UPDATE:


Fixed implementation because the SynchronizationContext.Send method returned without having completely executed the operation. Thanks to Svick for pointing that out and Stephen Toub for the help fixing it.

One thought on “A Synchronization Context Based On The TPL Dataflow Library”

  1. Shouldn’t Send() wait until executing the operation is complete? Your code doesn’t do that.

    Also, SendAsync().Wait() is basically the same as Post(). Both will synchronously wait if the message can’t be accepted for now (e.g. when using BoundedCapacity), but they don’t wait until processing of the item is complete.

Comments are closed.