Does reactive extensions support rolling buffers?

This is possible by combining the built-in Window and Throttle methods of Observable. First, let’s solve the simpler problem where we ignore the maximum count condition:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());

The powerful Window method did the heavy lifting. Now it’s easy enough to see how to add a maximum count:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
    var closes = stream.Throttle(delay);
    if (max != null)
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    return stream.Window(() => closes).SelectMany(window => window.ToList());

I’ll write a post explaining this on my blog.

Documentation for the Window method:


Leave a Comment