Iterating atomically

The IEnumerable<T> and IEnumerator<T> interfaces in .NET are interesting. They crop up an awful lot, but hardly anyone ever calls them directly – you almost always use a foreach loop to iterate over the collection. That hides all the calls to GetEnumerator(), MoveNext() and Current. Likewise iterator blocks hide the details when you want to implement the interfaces. However, sometimes details matter – such as for this recent Stack Overflow question. The question asks how to create a thread-safe iterator – one that can be called from multiple threads. This is not about iterating over a collection n times independently on n different threads – this is about iterating over a collection once without skipping or duplicating. Imagine it’s some set of jobs that we have to complete. We assume that the iterator itself is thread-safe to the extent that calls from different threads at different times, with intervening locks will be handled reasonably. This is reasonable – basically, so long as it isn’t going out of its way to be thread-hostile, we should be okay. We also assume that no-one is trying to write to the collection at the same time.


Sounds easy, right? Well, no… because the IEnumerator<T> interface has two members which we effectively want to call atomically. In particular, we don’t want the collection { “a”, “b” } to be iterated like this:


Thread 1 Thread 2
MoveNext()  
  MoveNext()
Current  
  Current

That way we’ll end up not processing the first item at all, and the second item twice.


There are two ways of approaching this problem. In both cases I’ve started with IEnumerable<T> for consistency, but in fact it’s IEnumerator<T> which is the interesting bit. In particular, we’re not going to be able to iterate over our result anyway, as each thread needs to have the same IEnumerator<T> – which it won’t do if each of them uses foreach (which calls GetEnumerator() to start with).


Fix the interface


First we’ll try to fix the interface to look how it should have looked to start with, at least from the point of view of atomicity. Here are the new interfaces:


public interface IAtomicEnumerable<T>
{
    IAtomicEnumerator<T> GetEnumerator();
}

public interface IAtomicEnumerator<T>
{
    bool TryMoveNext(out T nextValue);
}

One thing you may notice is that we’re not implementing IDisposable. That’s basically because it’s a pain to do so when you think about a multi-threaded environment. Indeed, it’s possibly one of the biggest arguments against something of this nature. At what point do you dispose? Just because one thread finished doesn’t mean that the rest of them have… don’t forget that “finish” might mean “an exception was thrown while processing the job, I’m bailing out”. You’d need some sort of co-ordinator to make sure that everyone is finished before you actually do any clean-up. Anyway, the nice thing about this being a blog post is we can ignore that little thorny issue :)


The important point is that we now have a single method in IAtomicEnumerator<T> – TryMoveNext, which works the way you’d expect it to. It atomically attempts to move to the next item, returns whether or not it succeeded, and sets an out parameter with the next value if it did succeed. Now there’s no chance of two threads using the method and stomping on each other’s values (unless they’re silly and use the same variable for the out parameter).


It’s reasonably easy to wrap the standard interfaces in order to implement this interface:


/// <summary>
/// Wraps a normal IEnumerable[T] up to implement IAtomicEnumerable[T].
/// </summary>
public sealed class AtomicEnumerable<T> : IAtomicEnumerable<T>
{
    private readonly IEnumerable<T> original;

    public AtomicEnumerable(IEnumerable<T> original)
    {
        this.original = original;
    }

    public IAtomicEnumerator<T> GetEnumerator()
    {
        return new AtomicEnumerator(original.GetEnumerator());
    }

    /// <summary>
    /// Implementation of IAtomicEnumerator[T] to wrap IEnumerator[T].
    /// </summary>
    private sealed class AtomicEnumerator : IAtomicEnumerator<T>
    {
        private readonly IEnumerator<T> original;
        private readonly object padlock = new object();

        internal AtomicEnumerator(IEnumerator<T> original)
        {
            this.original = original;
        }

        public bool TryMoveNext(out T value)
        {
            lock (padlock)
            {
                bool hadNext = original.MoveNext();
                value = hadNext ? original.Current : default(T);
                return hadNext;
            }
        }
    }
}

Just ignore the fact that I never dispose of the original IEnumerator<T> :)


We use a simple lock to make sure that MoveNext() and Current always happen together – that nothing else is going to call MoveNext() between our TryMoveNext() calling it, and it fetching the current value.


Obviously you’d need to write your own code to actually use this sort of iterator, but it would be quite simple:


T value;
while (iterator.TryMoveNext(out value))
{
    // Use value
}

However, you may already have code which wants to use an IEnumerator<T>. Let’s see what else we can do.


Using thread local variables to fake it


.NET 4.0 has a very useful type called ThreadLocal<T>. It does basically what you’d expect it to, with nice features such as being able to supply a delegate to be executed on each thread to provide the initial value. We can use a thread local to make sure that so long as we call both MoveNext() and Current atomically when we’re asked to move to the next element, we can get back the right value for Current later on. It has to be thread local because we’re sharing a single IEnumerator<T> across multiple threads – each needs its own separate storage.


This is also the approach we’d use if we wanted to wrap an IAtomicEnumerator<T> in an IEnumerator<T>, by the way. Here’s the code to do it:


public class ThreadSafeEnumerable<T> : IEnumerable<T>
{
    private readonly IEnumerable<T> original;

    public ThreadSafeEnumerable(IEnumerable<T> original)
    {
        this.original = original;
    }

    public IEnumerator<T> GetEnumerator()
    {
        return new ThreadSafeEnumerator(original.GetEnumerator());
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    private sealed class ThreadSafeEnumerator : IEnumerator<T>
    {
        private readonly IEnumerator<T> original;
        private readonly object padlock = new object();
        private readonly ThreadLocal<T> current = new ThreadLocal<T>();

        internal ThreadSafeEnumerator(IEnumerator<T> original)
        {
            this.original = original;
        }

        public bool MoveNext()
        {
            lock (padlock)
            {
                bool ret = original.MoveNext();
                if (ret)
                {
                    current.Value = original.Current;
                }
                return ret;
            }
        }

        public T Current
        {
            get { return current.Value; }
        }

        public void Dispose()
        {
            original.Dispose();
            current.Dispose();
        }

        object IEnumerator.Current
        {
            get { return Current; }
        }

        public void Reset()
        {
            throw new NotSupportedException();
        }
    }
}

I’m going to say it one last time – we’re broken when it comes to disposal. There’s no way of safely disposing of the original iterator at “just the right time” when everyone’s finished with it. Oh well.


Other than that, it’s quite simple. This code has the serendipitous property of actually implementing IEnumerator<T> slightly better than C#-compiler-generated implementations from iterator blocks – if you call the Current property without having called MoveNext(), this will throw an InvalidOperationException, just as the documentation says it should. (It doesn’t do the same at the end, admittedly, but that’s fixable if we really wanted to be pedantic.


Conclusion


I found this an intriguing little problem. I think there are better ways of solving the bigger picture – a co-ordinator which takes care of disposing exactly once, and which possibly mediates the original iterator etc is probably the way forward… but I enjoyed thinking about the nitty gritty.


Generally speaking, I prefer the first of these approaches. Thread local variables always feel like a bit of a grotty hack to me – they can be useful, but it’s better to avoid them if you can. It’s interesting to see how an interface can be inherently thread-friendly or not.


One last word of warning – this code is completely untested. It builds, and I can’t immediately see why it wouldn’t work, but I’m making no guarantees…

33 thoughts on “Iterating atomically”

  1. Another approach is immutable iterators:

    class IEnumerator
    {
    T Current { get; }
    IEnumerator
    GetNext();
    }

    The compiler team would have a lot of fun (I’m sure!) implementing yield return. How about making each step of the iterator method into a separate class representing each state. Each state class would implement the above interface, and implement the GetNext method to allocate the following state class (passing it whatever state it needs to continue the computation). That way, each thread would be able to independently plow through the iteration at its own pace.

    No doubt you’re aware of Scala – it typically runs on the JVM, and it exposes lazy sequences in two ways: the native Java way (which is stateful, just like the .NET way) but also in something like the above immutable fashion.

  2. @Daniel: I don’t see how that helps. If two threads both have a reference to the same initial immutable value, then they’re *both* going to see the same sequence. That’s explicitly *not* what we’re trying to do here – otherwise we’d just call GetEnumerator() from each different thread to start with.

    How can you use your immutable iterator to act on each item exactly *once*, but in parallel?

    Jon

  3. As far as i know there exist also a Thread Local Attribute in Versions earlier then .Net 4.0. Applied to the right Variables the second Solution would work also for non .Net 4.0 Applications.

  4. @Mike: There’s ThreadStaticAttribute, but that can only be applied to *static* variables, so it wouldn’t work here.

  5. @Skeet:

    ThreadStatic is exactly what i meant.
    You could make ‘current’ static and apply the Thread StaticAttribute.
    It should work.

  6. Could Linq to Events or perhaps even PLinq be bent to achieve similar functionality?

    Also, thread-static storage is unfortunately slower than might be expected. If this new threadlocal thing is similar (or, quite likely, more expensive), then if at all possible, you don’t want to be using this in any kind of performance sensitive fashion, esp. not for a low-level building block like this where you’ll simply limit the applicability of your solution.

    PLinq comes with a variety of datastructures that are low on locking and may help you here.

  7. I think I would solve the problem differently. Instead of combining Current with MoveNext, I would cause Current to return the last value MoveNext reached for this view of the enumerator.

    So, essentially, I have a class called AtomicEnumerator which takes a sub-enumerator and a lock object. It stores the last reached value, allowing multiple AtomicEnumerators to work on the same sub-enumerator provided they use the same lock.

    public class AtomicEnumerator : IEnumerator
    {
    private readonly IEnumerator
    subEnumerator;
    private readonly Object syncLock;

    private T last;

    public AtomicEnumerator(IEnumerator subEnumerator, Object syncLock) {
    this.subEnumerator = subEnumerator;
    this.syncLock = syncLock;
    }

    public bool MoveNext() {
    lock (syncLock) {
    if (!subEnumerator.MoveNext()) return false;
    last = subEnumerator.Current;
    return true;
    }
    }

    public T Current { get { return last; } }
    public void Reset() { throw new NotSupportedException(); }
    public void Dispose() { subEnumerator.Dispose(); }
    Object System.Collections.IEnumerator.Current { get { return last; } }
    }

  8. @Mike: If you make “current” static, then you run into problems if you ever need to iterate over two collections at the same time in the same thread (e.g. alternating between two collections).

    @Strilanc: Yes, that works – but it requires more work on the part of the caller.

  9. @Skeet:
    Ups you have right. I only have seen the standard case with foreach and sequentiell. Was late yesterday, Brain was in hibernate :-)

  10. Would something like this work as a simpler solution? It seems to in the test app I made.

    class ThreadSafeWrapper {
    readonly IEnumerator
    _original;

    public ThreadSafeWrapper(IEnumerable original) {
    _original = original.GetEnumerator();
    }

    public IEnumerable Enumerable {
    get {
    while (true) {
    lock (_original) {
    if (_original.MoveNext())
    yield return _original.Current;
    else
    yield break;
    }
    }
    }
    }
    }

    There is at least one functional difference; that any call to threadsafeWrapper.Enumerable.GetEnumerator() will automatically start wherever other threads left off. To reset the sequence you’d have to create a new wrapper. Don’t know if this is good or bad; probably depends on the use case.

  11. @Dex: No, that’s not thread-safe in the way that is intended – if multiple threads are “simultaneously” calling MoveNext()/Current, they could both end up seeing the same value and skipping another value, as per the top of my post.

  12. I used the [ThreadStatic] approach in my implementation of ThreadSafeEnumerator some time ago: http://www.signumframework.com/Synchronization.ashx and http://signum.codeplex.com/SourceControl/changeset/view/25903#510505 but I’ve to use a combination of Dictionary to solve the concurrency problems. Using ThreadLocal is more preatty.

    I also thing that making IEnumerable a two member interface was a bad idea. The TryMoveNext or using a immutable interface synchronized using Sync.SafeSet would be a better idea. http://www.signumframework.com/Sync.ashx

    Sorry for so many ads, but I really thing it fits into the topic.

  13. @Skeet: Here’s a sample app that shows it works. Simply iterates through a Range, and increments TestArray[Current]. If using ThreadSafeWrapper, all the TestArray cells == 1 at the end, whereas without it, the final values are all over the place. I put a sleep between MoveNext() and Current to intensify the thread sync errors.

    The one issue that I don’t understand is that I occasionally see a SynchronizationLockException. My MSIL foo isn’t good enough to see what’s going on behind the scene to cause this to break sometimes. Any idea what’s going on there?

    class Program {
    const int Size = 1000;
    const int ThreadCount = 10;
    static readonly int[] TestArray = new int[Size];
    static IEnumerator _enumerator;

    static void Main() {
    // Create an enumerable and wrap it.
    var range = Enumerable.Range(0, Size);
    _enumerator = new ThreadSafeWrapper(range).Enumerable.GetEnumerator();

    // Launch the threads on Run with wait handles
    var waitHandles = new List();
    for (var i = 0; i < ThreadCount; ++i) {
    var resetEvent = new AutoResetEvent(false);
    waitHandles.Add(resetEvent);
    ThreadPool.QueueUserWorkItem(Run, resetEvent);
    }
    WaitHandle.WaitAll(waitHandles.ToArray());

    // If we used a ThreadSafeWrapper, every int in TestArray should have been
    // incremented exactly once, so let's count the cells equal to 1.
    Console.WriteLine((from i in TestArray where i == 1 select i).Count());
    Console.ReadKey();
    }

    ///

    /// Iterate through every int in _enumerator and increment that int in TestArray.
    ///
    /// reset event
    static void Run(Object state) {
    var resetEvent = (AutoResetEvent)state;
    while (_enumerator.MoveNext()) {
    Thread.Sleep(1); // Sleep a bit between MoveNext() and Current.
    ++TestArray[_enumerator.Current];
    }
    resetEvent.Set();
    }
    }

  14. @Dax: That’s actually failing every single time for me. Interestingly, it *looks* like you’ve actually discovered a potential compiler bug. I suspect that the IEnumerator generated by iterator blocks isn’t meant to be thread-safe… it keeps fields representing the current state, so if you’ve got more than one thread in the MoveNext() method at a time, the threads stomp on the state without any concern for each other. At some point, one thread is calling Monitor.Exit when it doesn’t actually have the lock.

    You’d have to implement the same sort of behaviour explicitly in your own IEnumerator implementation – but at that point, I’m sure I could provoke the skipping/duplication issue I mentioned.

  15. @Skeet: Yeah, that’s exactly what’s happening. I (finally after all these years) got red gate’s Reflector and cleaned up the generated code to be more readable, and it essentially creates a new IEnumerator with the following:

    enum State {
    FirstTime,
    Normal,
    Done
    }
    State _status = State.FirstTime;

    public bool MoveNext() {
    if (_status == State.Done) {
    return false;
    }
    if (_status != State.FirstTime) {
    Monitor.Exit(_lock);
    }
    Monitor.Enter(_lock);
    _status = State.Done;
    if (_original.MoveNext()) {
    Current = _original.Current;
    _status = State.Normal;
    return true;
    }
    Dispose();
    return false;
    }

    So if all the threads manage to get to the Monitor.Enter at the same time, you can see they’ll gladly play tag until everything is finished. Or if a thread sets Status.Done as the other threads enter MoveNext(), then all those threads will simply exit the loop and let the first thread do all the work. But if one thread gets to _status = State.Normal before another thread gets to the “if(_status != State.FirstTime)” check, then we’ll have the situation where a 2nd thread tries to release a lock it doesn’t own.

    I wonder why the exception happens all the time on your machine. It’s about 50/50 on mine.

    Of course, I agree with your first comment that building a more robust class to handle all this is the better solution, but yeah, it’s interesting seeing what the lower-level stuff actually generates.

  16. Ooh, this gets really cool: if, in my Program class above, instead of storing the member variable “IEnumerator _enumerator”, we instead change that to “IEnumerable _enumerable”, and make the two corresponding changes to Main and Run, then ThreadSafeWrapper works, always! (Not to mention you can use a foreach in Run).

    So what’s different? If you look at the disassembly, you see that the generated class’s GetEnumerator() method first checks that the threadId == the initialThreadId. If so, then all is good, otherwise it creates a /new/ enumerator and returns that instead.

    So how does that fix things? Well look at the generated MoveNext() method I posted above. Since each thread has its own enumerator wrapper, each thread therefore maintains its own _status. Thus, since _status is initialized at State.FirstTime for each thread, there is no chance that any thread will call Monitor.Exit() before it calls Monitor.Lock(). So cool!

  17. The obvious bad thing about my solution is that each thread locks until it calls MoveNext() again, so really there’s no point to having it be multithreaded at that point anyway. Oh well.

  18. @Dax: The main problem is that at that point you’re back to each thread iterating over the whole set, unless I’ve missed something. The point is to share an IEnumerator safely.

  19. @Skeet: No because in the generated class’s GetEnumerator() method, if it creates a new enumerator, it passes “_original” (the original, non-threadsafe enumerator) to the new enumerator’s constructor. Therefore, while each new enumerator maintains its own _status, they all reference to the /original/ non-threadsafe enumerator. Thus everything happens in shared sequence and yet is completely threadsafe. (tested and verified). (Though, as I mentioned, each thread blocks till it calls MoveNext() again). Pretty cool though. Kudos to the compiler people.

  20. Ah, I’d misread your previous comment. I now understand better… but yes, it’s now not usefully multithreaded :(

  21. Actually, it won’t compile.

    To make it compile, change

    current.Value = original.Value;

    to

    current.Value = original.Current;

    Also, it still won’t always work – it assumes that the original enumerator isn’t bound to its thread. For example, if the original enumerator accesses WinForms controls (which is more likely than you’d think), this will still fail. In such a case, it’s completely impossible, unless you enumerate into a thread-safe queue on the original thread.

  22. @SLaks: Not sure how that typo crept in… Thanks.

    I explicitly mentioned that the original enumerator has to behave reasonably when called from multiple threads at the start: “We assume that the iterator itself is thread-safe to the extent that calls from different threads at different times, with intervening locks will be handled reasonably. This is reasonable – basically, so long as it isn’t going out of its way to be thread-hostile, we should be okay.”

  23. Gah! I posted a solution to a completely different problem. Oh dear.

    Having read the first paragraph properly, the key interface is basically that of a shared input queue. In which case I wouldn’t bother with IAtomicEnumerable. If you’re really sharing the iteration, why create an interface to the collection as well? You only need a public class that implements IAtomicEnumerator over IEnumerator, and then:

    public static IAtomicEnumerator GetAtomicEnumerator(this IEnumerable source)
    {
    return new AtomicEnumerator
    (source.GetEnumerator());
    }

    … to get a single instance of the wrapper to hand out to multiple threads.

    You could also let the compiler do the work of converting back to the standard interface for convenience (calling this separately from each thread):

    public static IEnumerable ToEnumerable(
    this IAtomicEnumerator
    source)
    {
    T value;
    while (source.TryMoveNext(out value))
    yield return next;
    }

    Disposal would be important/useful in this situation, I think, as long as it is coordinated via the same lock object. Just a reference count that is incremented for each client, and decremented each time Dispose is called on the IAtomicEnumerator. When the count falls to zero, Dispose the original iterator.

  24. @Daniel: Yes, a disposal count is the way I’d implement disposal if I’d bothered :)

    My reason for having IAtomicEnumerable as well as IAtomicEnumerator was to suggest that it could have been a complete alternative to IEnumerable/IEnumerator – in other words, if the framework had been implemented in that way to start with, it could have been simpler. On the other hand, it wouldn’t actually make much sense for *most* iterators to be thread-safe.

  25. I think we may need to have a discussion with Eric Lippert — C# prevents most undesirable interactions between yield return and other features, and it seems to me that yield return inside a lock block should also be forbidden.

  26. @Ben Voigt – “it seems to me that yield return inside a lock block should also be forbidden.”

    I wonder if that would achieve much, given that:

    lock (l) { yield return x; }

    Is the same as:

    Monitor.Enter(l);
    try { yield return x; }
    finally { Monitor.Exit(l); }

    So if people wanted to do it, they still could. And ultimately, if the user has control of what will be on the consuming side, then this is not necessarily wrong. It’s only wrong if the consumer doesn’t understand that they are potentially holding a lock in the body of their foreach loop. That might conceivably be exactly what you want, in some situations.

    To ban anything that might be wrong inside a lock, you’d have to ban quite a lot of things, e.g. any method call to a 3rd party library that might ultimately (after a few years of evolutionary code maintenance) take out more locks, in an unpredictable order.

  27. Here’s an way to solve it, while maintaining an easy IEnumerable interface, and also handling disposal.

    public class AtomicEnumerable : IEnumerable, IDisposable {
    readonly IEnumerator
    _underlyingEnumerator;
    public AtomicEnumerable(IEnumerable
    inner) {
    _underlyingEnumerator = inner.GetEnumerator();
    }
    public void Dispose() {
    _underlyingEnumerator.Dispose();
    }
    public IEnumerator
    GetEnumerator() {
    return new AtomicEnumerator(_underlyingEnumerator);
    }
    IEnumerator IEnumerable.GetEnumerator() {
    return GetEnumerator();
    }

    class AtomicEnumerator : IEnumerator {
    readonly IEnumerator
    _underlyingEnumerator;
    public AtomicEnumerator(IEnumerator
    atomic) {
    _underlyingEnumerator = atomic;
    }
    public bool MoveNext() {
    lock (_underlyingEnumerator) {
    var b = _underlyingEnumerator.MoveNext();
    Current = _underlyingEnumerator.Current;
    return b;
    }
    }

    public void Dispose() { }
    public void Reset() { }
    public T Current { get; private set; }
    object IEnumerator.Current {
    get { return Current; }
    }
    }
    }

    The key is that this new AtomicEnumerable only ever creates a single Enumerator of the underlying Enumerable. Any time a AtomicEnumerable.GetEnumerator() is called, the new AtomicEnumerator is passed a reference to that one single underlying enumerator instead of creating a new one from scratch. That way, the AtomicEnumerator’s Dispose method doesn’t have to do anything; it’s not the one in control of the underlying enumerator. To solve the disposal problem, AtomicEnumerable also implements IDisposable, and kills the underlying enumerator in its Dispose method.

    I think this fits better than reference counting, since you never know for sure when the app is really done. Maybe all the current AtomicEnumerators have been disposed, but there’s no way to know apriori whether AtomicEnumerable.GetEnumerator() will be called again.

    If you really wanted to, with this design you could put reference counting in pretty easily too.

  28. @Dax: That means you can’t just use foreach on it and rely on the C# compiler putting in the Dispose call though, because it only does that on the IEnumerators. Other than that, I agree it should work.

  29. One place where the ThreadLocal would work better though is if the requirement was that all the threads had to share the enumerator, rather than the enumerable. That wouldn’t be possible in the version I posted. (Unless of course the consumers handled it explicitly by locking a shared object and caching a local Current copy only inside the lock; but they could do that with ANY enumerator).

    Thus the ThreadLocal soln would be the only way to handle an additional requirement that AtomicEnumerable.GetEnumerator() should always start at the beginning; ie, that iterating through the list doesn’t change the list.

    Essentially it just comes down to having a place to cache Current in each thread, whether by having each thread create its own caching enumerator, by storing in a ThreadLocal, or by explicitly locking and caching a local copy from within the multithreaded routine.

    Anyway, interesting topic. Thanks for posting it.

  30. Isn’t there a problem in how you use the padlock object?

    When two threads are calling GetEnumerator, they each get their own separate instance of AtomicEnumerator which locks on its own private padlock object.

    Or maybe I’m missing something.

  31. @Mehmet: The point is that the threads don’t each call GetEnumerator – they would each be given the single result of calling GetEnumerator. We’re using *one* iterator across multiple threads.

  32. Oh, you would call GetEnumerator once and use the same iterator instance in all threads, I didn’t think of it that way. Thanks for clarifying.

    The name AtomicEnumerable is misleading then, because only the iterator is atomic not the enumerable. If threads are given the enumerable instance, it’s named “Atomic” after all, it won’t work.

    I’d put the lock object in the Enumerable and pass it to the iterator, so every iterator created would lock on the same object.

Comments are closed.