Probabilistic Reactive Extensions: ProbablyDistinct

Reactive Extensions has an operator called Distinct. As data streams through, it filters out any items it has already seen before, allowing only unique items to pass to OnNext. The problem is Distinct stores all the unique items in a HashSet, which means it will consume a lot of memory if the number of unique items is large. The solution is to implement a ProbablyDistinct operator that uses a Bloom Filter to store unique items. A Bloom Filter is a very compact data structure that tests each item and replies “item is probably in the set” or “item is definitely not in the set”. In this case, there will be some false positives (it says it’s in the set, but it’s actually not). But it’s a tradeoff some applications might need, particulary long-running server apps that see lots of unique data items.

More broadly, it would be useful to have a few Rx operators that use probabilistic data structures. For example, the HyperLogLog can be used to estimate the count of large numbers of distinct items using very little storage. Another useful operator is the opposite of Distinct (Indistinct? SeenItBefore?). This is how most people use a Bloom Filter. Rather than do an expensive DB lookup, it first checks the filter to see if it is probably in the DB and then does the query.

Here’s the source for Distinct. All you have to do is replace the HashSet with a Bloom Filter (+ minor code tweaks).

        private static IObservable<TSource> Distinct_<TSource, TKey>(IObservable<TSource> source,
            Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
        {
            return new AnonymousObservable<TSource>(observer =>
            {
                var hashSet = new HashSet<TKey>(comparer);
                return source.Subscribe(
                    x =>
                    {
                        var key = default(TKey);
                        var hasAdded = false;

                        try
                        {
                            key = keySelector(x);
                            hasAdded = hashSet.Add(key);
                        }
                        catch (Exception exception)
                        {
                            observer.OnError(exception);
                            return;
                        }

                        if (hasAdded)
                            observer.OnNext(x);
                    },
                    observer.OnError,
                    observer.OnCompleted
                    );
            });
        }
Advertisements