Does reactive extensions support rolling buffers?

This is possible by combining the built-in Window and Throttle methods of Observable. First, let’s solve the simpler problem where we ignore the maximum count condition:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

The powerful Window method did the heavy lifting. Now it’s easy enough to see how to add a maximum count:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

I’ll write a post explaining this on my blog. https://gist.github.com/2244036

Documentation for the Window method:

  • http://leecampbell.blogspot.co.uk/2011/03/rx-part-9join-window-buffer-and-group.html
  • http://enumeratethis.com/2011/07/26/financial-charts-reactive-extensions/

Leave a Comment