LINQ to Rx: second impressions

My previous post had the desired effect: it generated discussion on the LINQ to Rx forum, and Erik and Wes very kindly sent me a very detailed response too. There’s no better way to cure ignorance than to display it to the world.

Rather than regurgitating the email verbatim, I’ve decided to try to write it in my own words, with extra thoughts where appropriate. That way if I’ve misunderstood anything, I may be corrected – and the very act of trying to explain all this is likely to make me explore it more deeply than I would otherwise.

I’m leaving out the bits I don’t yet understand. One of the difficulties with LINQ to Rx at the moment is that the documentation is somewhat sparse – there are loads of videos, and at least there is a CHM file for each of the assemblies bundled in the framework, but many methods just have a single sentence of description. This is entirely understandable – the framework is still in flux, after all. I’d rather have the bits but sparse docs than immaculate docs for a framework I can’t play with – but it makes it tricky to go deeper unless you’ve got time to experiment extensively. There’s an rxwiki site which looks like it may be the community’s attempt to solve this problem – but it needs a bit more input, I think. When I get a bit of time to breathe, I’d like to try to contribute there.

The good news is that I don’t think there were any mechanical aspects that I got definitively wrong in what I wrote… but the bad news is that I wasn’t thinking in Rx terms. We’ll look at the different aspects separately.

Subscriptions and collections

My first "complaint" was about the way that IEnumerable<T>.ToObservable() worked. Just to recap, I was expecting a three stage startup process:

  • Create the observable
  • Subscribe any observers
  • Tell the observable to "start"

Instead, as soon as an observer subscribes, the observable publishes everything in the sequence to it (on a different thread, by default). Separate calls to Subscribe make the observable iterate over the sequence multiple times.

Now, my original viewpoint makes sense if you think of Subscribe as being like an event subscription. It feels like something which should be passive: another viewer turning on their television to watch a live broadcast.

However, as soon as you think of IObservable.Subscribe as being the dual of IEnumerable.GetEnumerator, the Rx way makes more sense. Each call to GetEnumerator starts the sequence from scratch, and so does each call to Subscribe. This is more like inserting a disc into the DVD player – you’re still watching something, but there’s a more active element to it. You put the DVD in, it starts playing. I guess following the analogy further would make my suggested model more like a PVR :)

Additionally, this "subscription as an action" model makes more sense of methods like Return and Repeat, and also works better as a reusable object: my own idea of "now push the collection" feels dreadfully stateful: why can’t I push the collection twice? What happens if an observer subscribes after I’ve pushed?

I suspect this will trip up many an Rx neophyte; the video Wes recorded on hot and cold observables should help. Admittedly I’d already watched it before writing the blog post, so I’ve no excuse…

The subscription model can effectively be modified via composition though; using Subject (as per the blog post), AsyncSubject (which remembers the first value it sees, and only yields that), BehaviorSubject (which remembers the last value it’s seen), and ReplaySubject (which remembers everything it sees, optionally limited by a buffer) you can do quite a bit.

Wes included in his email a StartableObservable which one could start and stop. I’d come up with a slightly similar idea at home, an ObservableSequence (not nearly such a good name) but which was limited to sequences: effectively it made the steps listed above explicit for a pull sequence. The code Wes provided was completely isolated from IEnumerable<T> – you would create a StartableObservable from any existing observable, then subscribe to it, then start it. It uses a Subject to effectively collect the subscriptions – starting the observable merely subscribed the subject to the original observable passed into the constructor.

The difference between Wes’s solution and mine is more fundamental than whether his is more general-purpose than mine or not (although it clearly is). Wes didn’t have to go outside the world of Rx at all. All the building blocks were there, he just put them together – and ended up with another building block, ready to be used with the rest. That’s a common theme in this blog post :)

Asynchronous aggregation

I did get one thing right in my previous post: my suggestion that there should be asynchronous versions of the aggregation operators is apparently not a bad one. We may see the framework come with such things in the future… but they won’t revolve around Task<T>.

What do we have to represent an asynchronous computation? Why, IObservable<T> of course. It will present the result at some future point in time. Ideally, I suppose you would deal with the count (or maximum line length, or whatever) by reacting to it asynchronously too. If necessary though, you can always just take the value produced and stash it somewhere… which is exactly what an AsyncSubject does, as mentioned above. You can get the value from that by just calling First() on it, which will block if the value hasn’t been seen yet – and you don’t need to worry about "missing" it, because of the caching within the subject.

When I started this blog post, I didn’t understand Prune, but I’ve found that writing about the whole process has made it somewhat clearer to me. Calling Prune on an observable returns an AsyncSubject – but which also unsubscribes itself from the original observable when the subject is disposed, basically allowing a more orderly cleanup. So, all we need to do is call Prune on the result of our asynchronous aggregation, and we’re away.

That’s one part of the "non-Rx" framework removed… what else can we take out of the code from the previous blog post? Well, if you look at the FutureAggregate method I posted, it does two things: maintains a running aggregate, and publishes the last result (via a Task<T>). Now the "maintain a running aggregate" looks remarkably like Scan, doesn’t it? All the future aggregates (FutureCount etc) can be built from one new building block: an observable which subscribes to an existing one, and yields the last value it sees before completion.

I’ll check with Wes whether he’s happy for me for me to share his code – if so, I’ll put that and the original code into a zip file so it’s easy to compare the dull version with the shiny one.

Conclusion

It’s not enough to be able to think about Rx. To really appreciate it, you’ve got to be able to think in Rx. As I’d written a sort of "mini-Rx" before, I was arrogant enough to assume I already knew how to think in observable sequences… but apparently not. (To be fair to myself, it’s been a while and Push LINQ didn’t try to do anything genuinely asynchronous.)

I’m certainly not "in the zone" yet when it comes to Rx… but I think I can see it in the distance now. I’m heartily glad I raised my concerns over asynchronous aggregation – partly as encouragement to the team to consider including them in the framework, but mostly because it’s helped me appreciate the framework a lot better. With any luck, these two somewhat "stream of consciousness" posts may have helped you too.

Now to go over what I wrote last night for the book, and see how much of it was rubbish :)

2 thoughts on “LINQ to Rx: second impressions”

  1. You write:

    “It’s not enough to be able to think about Rx. To really appreciate it, you’ve got to be able to think in Rx.”

    That’s yet another way it’s the dual of LINQ/Enumerable/Queryable. :)

Comments are closed.