EventStream distinctUntilChanged() vs RxJS distinctUntilChanged() (v2)

Revision 2 of this benchmark created on


Preparation HTML

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>

Setup

"use strict";
var EventStream = /** @class */ (function () {
    function EventStream(handleActive, initial) {
        this.handleActive = handleActive;
        this.initial = initial;
        this.hasDispatched = false;
        this.subscribers = [];
        this.handleInactive = null;
        this.dispatchOnSubscription = false;
        this.value = initial;
        this.handleActive = handleActive;
        if (initial !== undefined) {
            // If an initial value is provided via the constructor, act as a `BehaviourSubject` such that:
            // - Any subscriber will receive the latest value immediately on subscription
            // - The latest value will be available immediately for any operator that requires it
            // (e.g. `withLatestFrom`)
            this.dispatchOnSubscription = true;
            this.hasDispatched = true;
        }
    }
    EventStream.prototype.dispatch = function (data) {
        this.value = data;
        this.subscribers.slice().forEach(function (listener) { return listener(data); });
        this.hasDispatched = true;
    };
    EventStream.prototype.subscribe = function (subscriber) {
        var _this = this;
        this.subscribers.push(subscriber);
        if (this.subscribers.length > 0 && !this.handleInactive && this.handleActive) {
            this.handleInactive = this.handleActive(this);
        }
        if (this.dispatchOnSubscription)
            this.dispatch(this.value);
        return {
            unsubscribe: function () { return _this.unsubscribe(subscriber); },
        };
    };
    EventStream.prototype.pipe = function () {
        var operators = [];
        for (var _i = 0; _i < arguments.length; _i++) {
            operators[_i] = arguments[_i];
        }
        // eslint-disable-next-line @typescript-eslint/no-this-alias
        var next = this;
        for (var _a = 0, operators_1 = operators; _a < operators_1.length; _a++) {
            var operator = operators_1[_a];
            next = operator(next);
        }
        return next;
    };
    EventStream.prototype.unsubscribe = function (subscriber) {
        this.subscribers.splice(this.subscribers.indexOf(subscriber), 1);
        if (this.subscribers.length === 0 && this.handleInactive) {
            this.handleInactive(this);
            this.handleInactive = null;
        }
    };
    return EventStream;
}());
var createEventStream = function () { return new EventStream(); };
var createEventStreamWithValue = function (initialValue) {
    return new EventStream(undefined, initialValue);
};
var pluckByKey = function (key) {
    return function (eventStream) {
        var prevData;
        var hasData = false;
        return new EventStream(function (context) {
            return eventStream.subscribe(function (nextData) {
                if (hasData && (prevData === null || prevData === void 0 ? void 0 : prevData[key]) === nextData[key])
                    return;
                var dataOut = nextData[key];
                context.dispatch(dataOut);
                prevData = nextData;
                hasData = true;
            }).unsubscribe;
        });
    };
};
var filter = function (predicate) {
    return function (eventStream) {
        return new EventStream(function (context) {
            return eventStream.subscribe(function (data) {
                if (!predicate(data))
                    return;
                context.dispatch(data);
            }).unsubscribe;
        });
    };
};
var map = function (mapper) {
    return function (eventStream) {
        return new EventStream(function (context) {
            return eventStream.subscribe(function (dataIn) {
                var dataOut = mapper(dataIn);
                context.dispatch(dataOut);
            }).unsubscribe;
        });
    };
};
var withLatestFrom = function () {
    var sources = [];
    for (var _i = 0; _i < arguments.length; _i++) {
        sources[_i] = arguments[_i];
    }
    return function (eventStream) {
        return new EventStream(function (context) {
            return eventStream.subscribe(function (dataIn) {
                var dataOut = [dataIn];
                for (var _i = 0, sources_1 = sources; _i < sources_1.length; _i++) {
                    var source = sources_1[_i];
                    if (!source.hasDispatched)
                        return;
                    dataOut.push(source.value);
                }
                context.dispatch(dataOut);
            }).unsubscribe;
        });
    };
};
var distinctUntilChanged = function (comparator) {
    return function (eventStream) {
        var prevData;
        var hasData = false;
        return new EventStream(function (context) {
            return eventStream.subscribe(function (nextData) {
                if (hasData) {
                    if ((typeof comparator === 'function' && comparator(prevData, nextData)) ||
                        (typeof comparator === 'undefined' && prevData === nextData))
                        return;
                }
                context.dispatch(nextData);
                prevData = nextData;
                hasData = true;
            }).unsubscribe;
        });
    };
};

const eventStream$ = createEventStream();
const subject$ = new rxjs.Subject();

Test runner

Ready to run.

Testing in
TestOps/sec
EventStream distinctUntilChanged()
const foo$ = eventStream$.pipe(
    distinctUntilChanged()
);

foo$.subscribe(() => void 0);

eventStream$.dispatch(true);
eventStream$.dispatch(true);
eventStream$.dispatch(false);
ready
RxJS distinctUntilChanged()
const foo$ = subject$.pipe(
    rxjs.distinctUntilChanged()
);

foo$.subscribe(() => void 0);

subject$.next(true);
subject$.next(true);
subject$.next(false);
ready

Revisions

You can edit these tests or add more tests to this page by appending /edit to the URL.