Tagged: concurrency

Storm w/ Reactive Extensions and Dataflow

The Apache Storm project is a distributed dataflow framework. It’s used by Twitter to process a continuous stream of tweets through a network of machines. Microsoft’s TPL Dataflow library is similar, but only works on a single process. To get an approximation in .NET I want to convert MSMQ into a Spout.

One solution is to convert MSMQ into an Observable that pushes elements into a Dataflow block. I packaged this into the constructor for a QueueSourceBlock, which implements ISourceBlock. Here’s the code snippet:

public QueueSourceBlock(string queueAddress)
{
    var queue = new MessageQueue(queueAddress);
    queue.Formatter = new XmlMessageFormatter(new Type[]{typeof(T)});
    var tb = new TransformBlock<Message, T>(m => (T) m.Body);
    block = tb;
    var queueObserveOnce =
        Observable.Defer(
            () =>
                Observable.FromAsync(
                    () => Task<Message>.Factory.FromAsync(queue.BeginReceive(), queue.EndReceive)));

    queueObservable = Observable.While(() => true, queueObserveOnce);
    queueDispose = queueObservable.Subscribe(m => tb.Post(m));
}

The FromAsync method will receive only one message from the queue. The Defer method will generate a new FromAsync call on demand. The While method will keep calling the Defer forever. Whenever a message arrives from the Observable, it calls Post to push it into the TranformBlock. This block will extract the data and send it to the next node it’s linked to. This code doesn’t handle cancellation. AFAIK, there’s no way to cancel the BeginReceive on a queue, but I could support cancellation in other places.

Surprisingly there doesn’t appear to be a way to split a stream in .NET’s Dataflow. They’ve got a JoinBlock that merges streams, but not a SplitStream. I think if you increase the parallelism in a block it behaves sort of like “shuffleGrouping” in Storm. Still, it’s a weird oversight.