Tagged: C#

Storm w/ Reactive Extensions and Dataflow

The Apache Storm project is a distributed dataflow framework. It’s used by Twitter to process a continuous stream of tweets through a network of machines. Microsoft’s TPL Dataflow library is similar, but only works on a single process. To get an approximation in .NET I want to convert MSMQ into a Spout.

One solution is to convert MSMQ into an Observable that pushes elements into a Dataflow block. I packaged this into the constructor for a QueueSourceBlock, which implements ISourceBlock. Here’s the code snippet:

public QueueSourceBlock(string queueAddress)
{
    var queue = new MessageQueue(queueAddress);
    queue.Formatter = new XmlMessageFormatter(new Type[]{typeof(T)});
    var tb = new TransformBlock<Message, T>(m => (T) m.Body);
    block = tb;
    var queueObserveOnce =
        Observable.Defer(
            () =>
                Observable.FromAsync(
                    () => Task<Message>.Factory.FromAsync(queue.BeginReceive(), queue.EndReceive)));

    queueObservable = Observable.While(() => true, queueObserveOnce);
    queueDispose = queueObservable.Subscribe(m => tb.Post(m));
}

The FromAsync method will receive only one message from the queue. The Defer method will generate a new FromAsync call on demand. The While method will keep calling the Defer forever. Whenever a message arrives from the Observable, it calls Post to push it into the TranformBlock. This block will extract the data and send it to the next node it’s linked to. This code doesn’t handle cancellation. AFAIK, there’s no way to cancel the BeginReceive on a queue, but I could support cancellation in other places.

Surprisingly there doesn’t appear to be a way to split a stream in .NET’s Dataflow. They’ve got a JoinBlock that merges streams, but not a SplitStream. I think if you increase the parallelism in a block it behaves sort of like “shuffleGrouping” in Storm. Still, it’s a weird oversight.

Simple Dropbox client w/ Reactive Extensions

Reactive Extensions is ideally suited to the task of monitoring a directory for changes. While the events from FileSystemWatcher are ok, it isn’t efficient. The goal is to send as little data as possible to the server. If a file is moved or copied, there’s no need to upload the file again. Instead, you should recognize the event on the client and simply send a Moved or Copied message pointing to the original file on the server.

Here’s a simple prototype. I left out all the code to track the files with hashes. It gets a little trickier trying to track directories. If a directory is deleted, the entire subtree is deleted. You can use SQLite with the closure.c extension to track hierarchical data.

Anyway, this proof-of-concept is easy. Cut and paste lost some formatting. Stupid tabs.

    public class DropboxClient
    {
        private readonly FileSystemWatcher watcher;

        public IObservable<DropboxEventArg> FileSystemObservable { get; private set; } 

        public DropboxClient(string home)
        {
            watcher = new FileSystemWatcher
            {
                Path = home,
                EnableRaisingEvents = true,
                IncludeSubdirectories = true
            };

            SetupRx();
        }

        private void SetupRx()
        {
            var changed = Observable.FromEventPattern<FileSystemEventArgs>(watcher, "Changed");
            var created = Observable.FromEventPattern<FileSystemEventArgs>(watcher, "Created");
            var deleted = Observable.FromEventPattern<FileSystemEventArgs>(watcher, "Deleted");
            var renamed = Observable.FromEventPattern<FileSystemEventArgs>(watcher, "Renamed");

            // Often it repeats a change event for every little change (timestamp, file size, etc). 
            var dbchanged = changed
                .DistinctUntilChanged()
                //.Do(UpdateRecord)
                .Select(fe => new DropboxEventArg(fe.EventArgs));

            // This seems to work fine, I think
            var dbrenamed = renamed.Select(fe => new DropboxEventArg(fe.EventArgs));

            // Deleted is ok, too
            var dbdeleted = deleted
                //.Do(DeleteRecord)
                .Select(fe => new DropboxEventArg(fe.EventArgs));

            // If file already exists, then a created file is a copy of another file
            var dbcreated = created
                .Select(fe =>
                {
                    if (FileExists(fe.EventArgs.FullPath))
                        return new DropboxEventArg(fe.EventArgs, DropboxChangeTypes.Copied);
                    else
                    {
                        //CreateRecord(fe);
                        return new DropboxEventArg(fe.EventArgs);
                    }
                });

            FileSystemObservable = dbchanged.Merge(dbrenamed).Merge(dbdeleted).Merge(dbcreated);
        }

        private void CreateRecord(EventPattern<FileSystemEventArgs> fe) {
            // Create row in repo
            throw new NotImplementedException();
        }

        private void UpdateRecord(EventPattern<FileSystemEventArgs> obj)
        {
            // If file size is different, rehash and update
            // If dir, maybe do nothing. Not sure.
            throw new NotImplementedException();
        }

        private void DeleteRecord(EventPattern<FileSystemEventArgs> obj) {
            // Delete file from repository
            // If directory, delete entire subtree from repo
            throw new NotImplementedException();
        }

        private bool FileExists(string fpath)
        {
            // If file, hash and lookup 
            // If dir, maybe do nothing
            return false;
        }
    }

    public enum DropboxChangeTypes
    {
	Changed,
	Created,
	Deleted,
	Renamed,
	Moved,
	Copied
    }

    public class DropboxEventArg {
	public DropboxChangeTypes ChangeType;
	public string FullPath;
	public string Name;

	public DropboxEventArg()
	{
	}

	public DropboxEventArg(FileSystemEventArgs fe)
	{
		FullPath = fe.FullPath;
		Name = fe.Name;
		switch (fe.ChangeType)
		{
			case WatcherChangeTypes.Changed: 
				ChangeType = DropboxChangeTypes.Changed; break;
			case WatcherChangeTypes.Created:
				ChangeType = DropboxChangeTypes.Created; break;
			case WatcherChangeTypes.Deleted:
				ChangeType = DropboxChangeTypes.Deleted; break;
			case WatcherChangeTypes.Renamed:
				ChangeType = DropboxChangeTypes.Renamed; break;
		}
	}

	public DropboxEventArg(FileSystemEventArgs fe, DropboxChangeTypes ct)
	{
		FullPath = fe.FullPath;
		Name = fe.Name;
		ChangeType = ct;
	}
    }

Probabilistic Reactive Extensions: ProbablyDistinct

Reactive Extensions has an operator called Distinct. As data streams through, it filters out any items it has already seen before, allowing only unique items to pass to OnNext. The problem is Distinct stores all the unique items in a HashSet, which means it will consume a lot of memory if the number of unique items is large. The solution is to implement a ProbablyDistinct operator that uses a Bloom Filter to store unique items. A Bloom Filter is a very compact data structure that tests each item and replies “item is probably in the set” or “item is definitely not in the set”. In this case, there will be some false positives (it says it’s in the set, but it’s actually not). But it’s a tradeoff some applications might need, particulary long-running server apps that see lots of unique data items.

More broadly, it would be useful to have a few Rx operators that use probabilistic data structures. For example, the HyperLogLog can be used to estimate the count of large numbers of distinct items using very little storage. Another useful operator is the opposite of Distinct (Indistinct? SeenItBefore?). This is how most people use a Bloom Filter. Rather than do an expensive DB lookup, it first checks the filter to see if it is probably in the DB and then does the query.

Here’s the source for Distinct. All you have to do is replace the HashSet with a Bloom Filter (+ minor code tweaks).

        private static IObservable<TSource> Distinct_<TSource, TKey>(IObservable<TSource> source,
            Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
        {
            return new AnonymousObservable<TSource>(observer =>
            {
                var hashSet = new HashSet<TKey>(comparer);
                return source.Subscribe(
                    x =>
                    {
                        var key = default(TKey);
                        var hasAdded = false;

                        try
                        {
                            key = keySelector(x);
                            hasAdded = hashSet.Add(key);
                        }
                        catch (Exception exception)
                        {
                            observer.OnError(exception);
                            return;
                        }

                        if (hasAdded)
                            observer.OnNext(x);
                    },
                    observer.OnError,
                    observer.OnCompleted
                    );
            });
        }

Overriding extension methods in C# 3.0

A language that properly supports modularity must deal with name collisions. In C# 3.0, extension methods allow us to dynamically add new methods to another class. So what happens when methods collide? If I add the same method signature (name + parameters) to a class, what will the compiler do?

  • If I include two different namespaces, and both add the same method, the compiler warns that my method call is ambiguous. That’s good. But how can I explicitly specify which method I want?
  • If the method exists in another namespace and I add the same method, the compiler does not report that I’ve modified an existing method. This means my program could change if anyone working in the same namespace accidently modifies another method. The compiler should warn that I’ve changed an existing method.

F# hits the big time

F# is moving out of research into a first-class language running on .NET. F# is a derivative of OCaml, a strongly-typed functional language with imperative and OO features. I’ve had the great fortune of working with Don Syme on Project 7 (a largely failed attempt to port “academic” languages to .NET) and at MSR Cambridge a long time ago. He’s a very sharp guy who also contributed to the design and implementation of generics in C# and the CLR. Who says nothing ever comes from research groups?