Eduasync part 19: ordering by completion, ahead of time…

Today’s post involves the MagicOrdering project in source control (project 28).

When I wrote part 16 of Eduasync, showing composition in the form of majority voting, one reader mailed me a really interesting suggestion. We don’t really need to wait for any of the tasks to complete on each iteration of the loop – we only need to wait for the next task to complete. Now that sounds impossible – sure, it’s great if we know the completion order of the tasks, but half the point of asynchrony is that many things can be happening at once, and we don’t know when they’ll complete. However, it’s not as silly as it sounds.

If you give me a collection of tasks, I’ll give you back another collection of tasks which will return the same results – but I’ll order them so that the first returned task will have the same result as whichever of your original tasks completes first, and the second returned task will have the same result as whichever of your original tasks completes second, and so on. They won’t be the same tasks as you gave me, reordered – but they’ll be tasks with the same results. I’ll propagate cancellation, exceptions and so on.

It still sounds impossible… until you realize that I don’t have to associate one of my returned tasks with one of your original tasks until it has completed. Before anything has completed, all the tasks look the same. The trick is that as soon as I see one of your tasks complete, I can fetch the result and propagate it to the first of the tasks I’ve returned to you, using TaskCompletionSource<T>. When the second of your tasks completes, I propagate the result to the second of the returned tasks, etc. This is all quite easy using Task<T>.ContinueWith – barring a few caveats I’ll mention later on.

Once we’ve built a method to do this, we can then really easily build a method which is the async equivalent of Parallel.ForEach (and indeed you could write multiple methods for the various overloads). This will execute a specific action on each task in turn, as it completes… it’s like repeatedly calling Task.WhenAny, but we only actually need to wait for one task at a time, because we know that the first task in our "completion ordered" collection will be the first one to complete (duh).

Show me the code!

Enough description – let’s look at how we’ll demonstrate both methods, and then how we implement them.

private static async Task PrintDelayedRandomTasksAsync()
{
    Random rng = new Random();
    var values = Enumerable.Range(0, 10).Select(_ => rng.Next(3000)).ToList();
    Console.WriteLine("Initial order: {0}", string.Join(" ", values));

    var tasks = values.Select(DelayAsync);

    var ordered = OrderByCompletion(tasks);

    Console.WriteLine("In order of completion:");
    await ForEach(ordered, Console.WriteLine);
}

/// <summary>
/// Returns a task which delays (asynchronously) by the given number of milliseconds,
/// then return that same number back.
/// </summary>
private static async Task<int> DelayAsync(int delayMillis)
{
    await TaskEx.Delay(delayMillis);
    return delayMillis;
}

The idea is that we’re going to create 10 tasks which each just wait for some random period of time, and return the same time period back. We’ll create them in any old order – but obviously they should complete in (at least roughly) the same order as the returned numbers.

Once we’ve created the collection of tasks, we’ll call OrderByCompletion to create a second collection of tasks, returning the same results but this time in completion order – so ordered.ElementAt(0) will be the first task to complete, for example.

Finally, we call ForEach and pass in the ordered task collection, along with Console.WriteLine as the action to take with each value. We await the resulting Task to mimic blocking until the foreach loop has finished. Note that we could make this a non-async method and just return the task returned by ForEach, given that that’s our only await expression and it’s right at the end of the method. This would be marginally faster, too – there’s no need to build an extra state machine. See Stephen Toub’s article about async performance for more information.

ForEach

I’d like to get ForEach out of the way first, as it’s so simple: it’s literally just iterating over the tasks, awaiting them and propagating the result to the action. We get the "return a task which will wait until we’ve finished" for free by virtue of making it an async method.

/// <summary>
/// Executes the given action on each of the tasks in turn, in the order of
/// the sequence. The action is passed the result of each task.
/// </summary>
private static async Task ForEach<T>(IEnumerable<Task<T>> tasks, Action<T> action)
{
    foreach (var task in tasks)
    {
        T value = await task;
        action(value);
    }
}

Simple, right? Let’s get onto the meat…

OrderByCompletion

This is the tricky bit, and I’ve actually split it into two methods to make it slightly easier to comprehend. The PropagateResult method feels like it could be useful in other composition methods, too.

The basic plan is:

  • Copy the input tasks to a list: we need to work out how many there are and iterate over them, so let’s make sure we only iterate once
  • Create a collection of TaskCompletionSource<T> references, one for each input task. Note that we’re not associating any particular input task with any particular completion source – we just need the same number of them
  • Declare an integer to keep track of "the next available completion source"
  • Attach a continuation to each input task which will be increment the counter we’ve just declared, and propagate the just-completed task’s status
  • Return a view onto the collection of TaskCompletionSource<T> values, projecting each one to its Task property

Once you’re happy with the idea, the implementation isn’t too surprising (although it is quite long):

/// <summary>
/// Returns a sequence of tasks which will be observed to complete with the same set
/// of results as the given input tasks, but in the order in which the original tasks complete.
/// </summary>
private static IEnumerable<Task<T>> OrderByCompletion<T>(IEnumerable<Task<T>> inputTasks)
{
    // Copy the input so we know it’ll be stable, and we don’t evaluate it twice
    var inputTaskList = inputTasks.ToList();

    // Could use Enumerable.Range here, if we wanted…
    var completionSourceList = new List<TaskCompletionSource<T>>(inputTaskList.Count);
    for (int i = 0; i < inputTaskList.Count; i++)
    {
        completionSourceList.Add(new TaskCompletionSource<T>());
    }

    // At any one time, this is "the index of the box we’ve just filled".
    // It would be nice to make it nextIndex and start with 0, but Interlocked.Increment
    // returns the incremented value…
    int prevIndex = -1;

    // We don’t have to create this outside the loop, but it makes it clearer
    // that the continuation is the same for all tasks.
    Action<Task<T>> continuation = completedTask =>
    {
        int index = Interlocked.Increment(ref prevIndex);
        var source = completionSourceList[index];
        PropagateResult(completedTask, source);
    };

    foreach (var inputTask in inputTaskList)
    {
        // TODO: Work out whether TaskScheduler.Default is really the right one to use.
        inputTask.ContinueWith(continuation,
                               CancellationToken.None,
                               TaskContinuationOptions.ExecuteSynchronously,
                               TaskScheduler.Default);
    }

    return completionSourceList.Select(source => source.Task);
}

/// <summary>
/// Propagates the status of the given task (which must be completed) to a task completion source
/// (which should not be).
/// </summary>
private static void PropagateResult<T>(Task<T> completedTask,
    TaskCompletionSource<T> completionSource)
{
    switch (completedTask.Status)
    {
        case TaskStatus.Canceled:
            completionSource.TrySetCanceled();
            break;
        case TaskStatus.Faulted:
            completionSource.TrySetException(completedTask.Exception.InnerExceptions);
            break;
        case TaskStatus.RanToCompletion:
            completionSource.TrySetResult(completedTask.Result);
            break;
        default:
            // TODO: Work out whether this is really appropriate. Could set
            // an exception in the completion source, of course…
            throw new ArgumentException("Task was not completed");
    }
}

You’ll notice there are a couple of TODO comments there. The exception in PropagateResult really shouldn’t happen – the continuation shouldn’t be called when the task hasn’t completed. I still need to think carefully about how tasks should propagate exceptions though.

The arguments to ContinueWith are more tricky: working through my TimeMachine class and some unit tests with Bill Wagner last week showed just how little I know about how SynchronizationContext, the task awaiters, task schedulers, and TaskContinuationOptions.ExecuteSynchronously all interact. I would definitely need to look into that more deeply before TimeMachine was really ready for heavy use… which means you should probably be looking at the TPL in more depth too.

Conclusion

Sure enough, when you run the code, the results appear in order, as the tasks complete. Here’s one sample of the output:

Initial order: 335 468 1842 1991 2512 2603 270 2854 1972 1327
In order of completion:
270
335
468
1327
1842
1972
1991
2512
2603
2854

TODOs aside, the code in this post is remarkable (which I can say with modesty, as I’ve only refactored it from the code sent to me by another reader and Stephen Toub). It makes me smile every time I think about the seemingly-impossible job it accomplishes. I suspect this approach could be useful in any number of composition blocks – it’s definitely one to remember.

18 thoughts on “Eduasync part 19: ordering by completion, ahead of time…”

  1. I can’t get reliable answers with proper order where task’s delay time difference is quite tight.
    Consider using int[] values = { 2010, 2020, 2080, 2070, 2030, 2050, 2040, 2030, 2020, 2000, 2010 };
    instead your random ones in PrintDelayedRandomTasksAsync method.

  2. What about a solution in Rx;

    from task in new[]
    {
    Task.Factory.StartNew(() =>
    {
    Thread.Sleep(1000);
    return 3;
    }),
    Task.Factory.StartNew(() =>
    {
    Thread.Sleep(800);
    return 2;
    }),
    Task.Factory.StartNew(() =>
    {
    Thread.Sleep(500);
    return 1;
    }),
    }.ToObservable()
    from result in task.ToObservable()
    select result

  3. @Norbert: In that situation I’d expect them to potentially be slightly out – because Task.Delay is still going to be using the system clock. That’s not a problem of the ordering code so much as “waking up at exactly the right time”. The point of this code was solely to introduce some tasks which complete in a random order.

    Having said that, obviously the continuation itself isn’t *absolutely* immediate – so if there are tasks which definitely complete in one order (but on different threads), then the continuations may get to the Interlocked.Increment bit in the other order. Basically this is saying tasks which occur “roughly at the same time” end up racing – which is fairly natural.

    @James: Yes, Rx is certainly another approach to this – but it’s nice to be able to keep within async where possible, IMO.

  4. This abstraction is very nice because it has a small, clearly defined contract and yet is very powerful.

  5. This abstraction is very nice because it has a small, clearly defined contract and yet is very powerful.

    Btw, the captcha reproducibly appears as broken image in Chrome.

  6. It’s worth pointing out that the tasks aren’t *really* ordered by completion. The ordering is not stable like you might naively expect from the name.

    The tasks are instead ordered by completion of *continuations* (as determined by the atomic increment).

  7. @Craig: True – but basically any code which is sensitive to that is going to be broken with regard to race conditions anyway, I suspect.

  8. The MSDN documentation seems pretty sparse on what the default TaskScheduler is. I tend to use TaskScheduler.FromCurrentSynchronizationContext when doing ContinueWiths like that, for the sake of ensuring that the synchronization context is what is executing the continuations, because in many cases I want the continuation to happen on the UI thread anyway.

    http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.fromcurrentsynchronizationcontext.aspx

  9. @Joshua: In this case it makes more sense to keep it in the same thread as the task actually completes on – because we really want to make the original task and the returned task as similar as possible. When the task is awaited, *that* will use the appropriate synchronization context, of course.

  10. @Jon: You bring up a very good point, actually. I hadn’t really thought about that. I’m still not sure about the functionality of TaskScheduler.Default, though. Will that schedule and execute on the thread that scheduled a task on it?

  11. Is there any reason you don’t use the following code to initialize the list?

    var completionSourceList = inputTaskList.Select(t => new TaskCompletionSource).ToList();

    You mentioned that Enumerable.Range could be used, but why not Select()? I’m asking because I use this pattern and am curious to know if there is a downside I’m not seeing.

  12. @Marco: Not particularly – only that I’m not *hugely* keen on “projections” which actually don’t use their inputs. (There’s also the very marginal efficiency reason – ToList won’t be able to build a list of exactly the right size beforehand, whereas we can. That’s not my primary objection though.)

  13. Perhaps I am missing something obvious but why did you need to create wrapper tasks? If you simply want to consume the completed tasks you can use Task.WaitAny like this:

    static IEnumerable> CompleteInOrder(Task[] tasks)
    {
    List> toWait = new List>(tasks);
    while (toWait.Count > 0)
    {
    int completedIdx = Task.WaitAny(toWait.ToArray());
    yield return toWait[completedIdx];
    toWait.RemoveAt(completedIdx);
    }
    }

    The only difference to yours is that you do not block until the first task has completed and I do not schedule a task with a potentially different scheduler which could also cause some overhead.

  14. @Alois: That involves calling WaitAny N times though, which could end up being relatively expensive. If you look at the first link in the post, you’ll see something very similar within my WhenMajority method – this is a more efficient alternative approach.

  15. What happens when you try to propagate exceptions through the ContinueWiths and so on? Do they make it out to the outside of the ForEach?

  16. @Thomas: I’m not entirely sure what you mean – you’d have to give some sample code. Or you could just try it for yourself :)

  17. When using tasks for complex sequences of work in the past, I’ve had problems using ContinueWith for ‘encapsulations’ rather than fire-and-forget ‘continuations’; it doesn’t create a wait context, so you get exceptions bubbling out of the inner task to a an unhandled stack top. For TPL-based control flow I’ve had to resort to a wrapper which continues a task after waiting for it to complete, instead.. this structure might have the same problem.

    Although your propagating closure takes the real task as a parameter, it only evaluates the result on successful completion – obviously you can’t use TaskCompletionOptions.NotOnWhatever or you wouldn’t have enough information for your taskcompletionsource. My question was essentially, does evaluating completedTask.Exception create a wait context which allows the exception – albeit removed from its native call stack – to be successfully handled outside the ForEach()?

    I checked by adding a throw in DelayAsync and the answer is, “mostly”. It wraps it in two more levels of AggregateException than you’d otherwise have, but it’s there :)

Comments are closed.