Introducing Postal.NET

Update: Postal.NET is now available in Nuget.

Introduction

Postal.NET is a library I wrote for writing in-process decoupled applications, using the Publish/Subscribe and Domain Event patterns. Is is based loosely upon the Postal.js JavaScript library. When I say loosely, I mean, I didn’t look at the source code, but I like the ideas that it implements! Smile

Postal.NET allows us to post messages to a “bus”, identified by a channel and topic pair. A channel can have any number of topics, and a topic can be in any number of channels, these are just strings.

We can subscribe to one or more channel/topic pairs and cancel each subscription when we no longer need it.

Messages are either posted synchronously or asynchronously. Each message is delivered “as-is”, but it is wrapped in an envelope that includes a couple of properties, like, the timestamp and the origin channel and topic.

Nothing too complex, yet, I hope you find it useful!

Usage

Some usages, first, a subscription and two messages being sent, one synchronously (Publish) and the other asynchronously (PublishAsync):

using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data)))

{

    //sync

    Postal.Box.Publish("channel", "topic", "Hello, World!");


    //async

    await Postal.Box.PublishAsync("channel", "topic", "Hello, Async World!");

}


Postal.Box.Publish("channel", "topic", "Does not appear because the subscription was disposed!");

Notice the using block that wraps a Subscribe call. Subscribe returns an IDisposable, which, when disposed of, terminates the subscription, hence the third message published will not be caught by the subscription.

using (Postal.Box.Subscribe("*", "*", (env) => Console.WriteLine("Catch all!")))

{

    using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data)))

    {

        Postal.Box.Publish("channel", "topic", "Hello, World!");

    }

}

Now we have two subscriptions, one for * (any) channel and topic, and the other for a concrete one. Both subscriptions will receive the message.

using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data), (env) => env.Data is int))

{

    Postal.Box.Publish("channel", "topic", "Does not show!");

    Postal.Box.Publish("channel", "topic", 12345);

}

This time we are filtering the subscription: it will only fire if the message payload is an integer.

using (var evt = new ManualResetEvent(false))

using (Postal.Box.Subscribe("channel", "topic", (env) => {

    Console.WriteLine(env.Data);

    evt.Set();

}))

{

    Postal.Box.PublishAsync("channel", "topic", "Hello, World!");


    evt.WaitOne();

}

In this example, we wait for an event to be signaled, which only happens once an asynchronous message is handled.

using (Postal.Box.AnyChannelAndTopic().Subscribe((env) => Console.WriteLine("Catch all!")))

{

    using (Postal.Box.Channel("channel").Topic("topic").Subscribe((env) => Console.WriteLine(env.Data)))

    {

        Postal.Box.Channel("channel").Topic("topic").Publish("Hello, World!");

    }

}

This final example shows the alternative fluent interface. It offers the exact same functionality.

Code

Let’s start by the Postal class, which merely acts as an holder for an IBox:

public static class Postal

{

    public static readonly IBox Box = new Box();

}

The IBox interface, the core of Postal.NET, is defined as:

public interface IBox

{

    IDisposable Subscribe(string channel, string topic, Action<Envelope> subscriber, Func<Envelope, bool> condition = null);


    void Publish(string channel, string topic, object data);

    Task PublishAsync(string channel, string topic, object data);

}

The included implementation, Box, is as follows:

public sealed class Box : IBox

{

    class SubscriberId

    {

        private readonly Guid id;

        private readonly string channel;

        private readonly string topic;

        private readonly int hash;

        private readonly Func<Envelope, bool> condition;


        public SubscriberId(Guid id, string channel, string topic, Func<Envelope, bool> condition)

        {

            this.id = id;

            this.channel = channel;

            this.topic = topic;

            this.condition = condition;


            unchecked

            {

                this.hash = 13;

                this.hash = (this.hash * 17) ^ this.id.GetHashCode();

                this.hash = (this.hash * 17) ^ this.channel.GetHashCode();

                this.hash = (this.hash * 17) ^ this.topic.GetHashCode();

            }

        }


        private string Normalize(string str)

        {

            return str

                .Replace(".", "\\.")

                .Replace("*", ".*");

        }


        public bool Matches(string channel, string topic)

        {

            var channelRegex = new Regex(this.Normalize(this.channel));

            var topicRegex = new Regex(this.Normalize(this.topic));


            return channelRegex.IsMatch(channel) == true

                   && topicRegex.IsMatch(topic);

        }


        public override bool Equals(object obj)

        {

            var other = obj as SubscriberId;


            if (other == null)

            {

                return false;

            }


            return (other.id == this.id) && (other.channel == this.channel) && (other.topic == this.topic);

        }


        public override int GetHashCode()

        {

            return this.hash;

        }


        public bool Passes(Envelope env)

        {

            return this.condition(env);

        }

    }


    class DisposableSubscription : IDisposable

    {

        private readonly SubscriberId id;

        private readonly IDictionary<SubscriberId, Action< Envelope>> subscribers;


        public DisposableSubscription(SubscriberId id, IDictionary<SubscriberId, Action<Envelope>> subscribers)

        {

            this.id = id;

            this.subscribers = subscribers;

        }


        public void Dispose()

        {

            this.subscribers.Remove(this.id);

        }

    }


    private readonly ConcurrentDictionary<SubscriberId, Action<Envelope>> subscribers = new ConcurrentDictionary<SubscriberId, Action<Envelope>>();


    public void Publish(string channel, string topic, object data)

    {

        this.Validate(channel, topic);

        this.PublishAsync(channel, topic, data).GetAwaiter().GetResult();

    }


    public IDisposable Subscribe(string channel, string topic, Action<Envelope> subscriber, Func<Envelope, bool> condition = null)

    {

        this.Validate(channel, topic);

        this.Validate(subscriber);


        if (condition == null)

        {

            condition = (env) => true;

        }


        var id = new SubscriberId(Guid.NewGuid(), channel, topic, condition);

        this.subscribers[id] = subscriber;


        return new DisposableSubscription(id, this.subscribers);

    }


    public async Task PublishAsync(string channel, string topic, object data)

    {

        this.Validate(channel, topic);


        var env = new Envelope(channel, topic, data);


        foreach (var subscriber in this.GetSubscribers(channel, topic, env).AsParallel())

        {

            await Task.Run(() => subscriber(env));

        }

    }


    private void Validate(string channel, string topic)

    {

        if (string.IsNullOrWhiteSpace(channel) == true)

        {

            throw new ArgumentNullException("channel");

        }


        if (string.IsNullOrWhiteSpace(topic) == true)

        {

            throw new ArgumentNullException("topic");

        }

    }


    private void Validate(Action<Envelope> subscriber)

    {

        if (subscriber == null)

        {

            throw new ArgumentNullException("subscriber");

        }

    }


    private void Validate(Func<Envelope, bool> condition)

    {

        if (condition == null)

        {

            throw new ArgumentNullException("condition");

        }

    }


    private bool Matches(SubscriberId id, string channel, string topic)

    {

        return id.Matches(channel, topic);

    }


    private IEnumerable<Action<Envelope>> GetSubscribers(string channel, string topic, Envelope env)

    {

        foreach (var subscriber in this.subscribers)

        {

            if (this.Matches(subscriber.Key, channel, topic) == true)

            {

                if (subscriber.Key.Passes(env) == true)

                {

                    yield return subscriber.Value;

                }

            }

        }

    }

}

And finally, the Envelope class:

public sealed class Envelope

{

    public Envelope(string channel, string topic, object data)

    {

        this.Timestamp = DateTime.UtcNow;

        this.Channel = channel;

        this.Topic = topic;

        this.Data = data;

    }


    public DateTime Timestamp { get; private set; }

    public string Channel { get; private set; }

    public string Topic { get; private set; }

    public object Data { get; private set; }

}

The code for the fluent extensions will not be covered here, but you can find it in GitHub.

Limitations

Postal.NET will not:

  • serialize messages or send them across process boundaries;
  • apply any scheduling, messages are sent immediately;
  • apply any transformation to the messages;
  • keep sent messages internally.

But it will:

  • keep the order by which messages were sent;
  • send messages to multiple channels and topics, if the * is used;
  • send any kind of data, it really doesn’t matter.

Extensions

In the GitHub repository you will find some other projects:

  • PostalRX.NET: a Reactive Extensions (RX.NET) adapter for Postal.NET;
  • PostalConventions.NET: using conventions and a fluent interface for automatically inferring the channel and topic from the message itself.

Future Works

I have a couple of ideas, but, for now, I want to keep this simple. I will update the repository as I make progress. I’d like to hear from you on this.

Conclusion

Postal.NET will hopefully let you build more decoupled applications more easily. I hope you got the idea and you see value in it. As always, I’m always happy to hear your comments!

Published by

Ricardo Peres

Tech Lead at RedLight Software.

Leave a Reply

Your email address will not be published. Required fields are marked *