diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index bdbefa4bcf..bf2226adbe 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -337,4 +337,40 @@ describe('Observable.prototype.throttle', () => { } ); }); + + describe('throttle(fn, { leading: true, trailing: true })', () => { + asDiagram('throttle(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => { + const e1 = hot('-a-xy-----b--x--cxxx--|'); + const e1subs = '^ !'; + const e2 = cold( '----| '); + const e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + const expected = '-a---y----b---x-c---x-|'; + + const result = e1.throttle(() => e2, { leading: true, trailing: true }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + }); + + describe('throttle(fn, { leading: false, trailing: true })', () => { + asDiagram('throttle(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => { + const e1 = hot('-a-xy-----b--x--cxxx--|'); + const e1subs = '^ !'; + const e2 = cold( '----| '); + const e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + const expected = '-----y--------x-----x-|'; + + const result = e1.throttle(() => e2, { leading: false, trailing: true }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + }); }); diff --git a/spec/operators/throttleTime-spec.ts b/spec/operators/throttleTime-spec.ts index 3c40c6ffc1..df33d664d5 100644 --- a/spec/operators/throttleTime-spec.ts +++ b/spec/operators/throttleTime-spec.ts @@ -7,6 +7,7 @@ declare const hot: typeof marbleTestingSignature.hot; declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; +declare const time: typeof marbleTestingSignature.time; declare const rxTestScheduler: Rx.TestScheduler; const Observable = Rx.Observable; @@ -141,4 +142,32 @@ describe('Observable.prototype.throttleTime', () => { expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); + + describe('throttleTime(fn, { leading: true, trailing: true })', () => { + asDiagram('throttleTime(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => { + const e1 = hot('-a-xy-----b--x--cxxx--|'); + const e1subs = '^ !'; + const t = time( '----| '); + const expected = '-a---y----b---x-c---x-|'; + + const result = e1.throttleTime(t, rxTestScheduler, { leading: true, trailing: true }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + }); + + describe('throttleTime(fn, { leading: false, trailing: true })', () => { + asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => { + const e1 = hot('-a-xy-----b--x--cxxx--|'); + const e1subs = '^ !'; + const t = time( '----| '); + const expected = '-----y--------x-----x-|'; + + const result = e1.throttleTime(t, rxTestScheduler, { leading: false, trailing: true }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + }); }); \ No newline at end of file diff --git a/src/operator/throttle.ts b/src/operator/throttle.ts index a3e06488a5..e840629182 100644 --- a/src/operator/throttle.ts +++ b/src/operator/throttle.ts @@ -7,6 +7,16 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +export interface ThrottleConfig { + leading?: boolean; + trailing?: boolean; +} + +export const defaultThrottleConfig: ThrottleConfig = { + leading: true, + trailing: false +}; + /** * Emits a value from the source Observable, then ignores subsequent source * values for a duration determined by another Observable, then repeats this @@ -40,61 +50,85 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @param {function(value: T): SubscribableOrPromise} durationSelector A function * that receives a value from the source Observable, for computing the silencing * duration for each source value, returned as an Observable or a Promise. + * @param {Object} config a configuration object to define `leading` and `trailing` behavior. Defaults + * to `{ leading: true, trailing: false }`. * @return {Observable} An Observable that performs the throttle operation to * limit the rate of emissions from the source. * @method throttle * @owner Observable */ -export function throttle(this: Observable, durationSelector: (value: T) => SubscribableOrPromise): Observable { - return this.lift(new ThrottleOperator(durationSelector)); +export function throttle(this: Observable, + durationSelector: (value: T) => SubscribableOrPromise, + config: ThrottleConfig = defaultThrottleConfig): Observable { + return this.lift(new ThrottleOperator(durationSelector, config.leading, config.trailing)); } class ThrottleOperator implements Operator { - constructor(private durationSelector: (value: T) => SubscribableOrPromise) { + constructor(private durationSelector: (value: T) => SubscribableOrPromise, + private leading: boolean, + private trailing: boolean) { } call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new ThrottleSubscriber(subscriber, this.durationSelector)); + return source.subscribe( + new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing) + ); } } /** - * We need this JSDoc comment for affecting ESDoc. + * We need this JSDoc comment for affecting ESDoc * @ignore * @extends {Ignored} */ class ThrottleSubscriber extends OuterSubscriber { private throttled: Subscription; + private _trailingValue: T; + private _hasTrailingValue = false; constructor(protected destination: Subscriber, - private durationSelector: (value: T) => SubscribableOrPromise) { + private durationSelector: (value: T) => SubscribableOrPromise, + private _leading: boolean, + private _trailing: boolean) { super(destination); } protected _next(value: T): void { - if (!this.throttled) { - this.tryDurationSelector(value); + if (this.throttled) { + if (this._trailing) { + this._hasTrailingValue = true; + this._trailingValue = value; + } + } else { + const duration = this.tryDurationSelector(value); + if (duration) { + this.add(this.throttled = subscribeToResult(this, duration)); + } + if (this._leading) { + this.destination.next(value); + if (this._trailing) { + this._hasTrailingValue = true; + this._trailingValue = value; + } + } } } - private tryDurationSelector(value: T): void { - let duration: SubscribableOrPromise = null; + private tryDurationSelector(value: T): SubscribableOrPromise { try { - duration = this.durationSelector(value); + return this.durationSelector(value); } catch (err) { this.destination.error(err); - return; + return null; } - this.emitAndThrottle(value, duration); - } - - private emitAndThrottle(value: T, duration: SubscribableOrPromise) { - this.add(this.throttled = subscribeToResult(this, duration)); - this.destination.next(value); } protected _unsubscribe() { - const throttled = this.throttled; + const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this; + + this._trailingValue = null; + this._hasTrailingValue = false; + if (throttled) { this.remove(throttled); this.throttled = null; @@ -102,13 +136,24 @@ class ThrottleSubscriber extends OuterSubscriber { } } + private _sendTrailing() { + const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this; + if (throttled && _trailing && _hasTrailingValue) { + destination.next(_trailingValue); + this._trailingValue = null; + this._hasTrailingValue = false; + } + } + notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { + this._sendTrailing(); this._unsubscribe(); } notifyComplete(): void { + this._sendTrailing(); this._unsubscribe(); } } diff --git a/src/operator/throttleTime.ts b/src/operator/throttleTime.ts index 4b502fddd3..a6ef08cb38 100644 --- a/src/operator/throttleTime.ts +++ b/src/operator/throttleTime.ts @@ -4,6 +4,7 @@ import { IScheduler } from '../Scheduler'; import { Subscription, TeardownLogic } from '../Subscription'; import { async } from '../scheduler/async'; import { Observable } from '../Observable'; +import { ThrottleConfig, defaultThrottleConfig } from './throttle'; /** * Emits a value from the source Observable, then ignores subsequent source @@ -44,17 +45,24 @@ import { Observable } from '../Observable'; * @method throttleTime * @owner Observable */ -export function throttleTime(this: Observable, duration: number, scheduler: IScheduler = async): Observable { - return this.lift(new ThrottleTimeOperator(duration, scheduler)); +export function throttleTime(this: Observable, + duration: number, + scheduler: IScheduler = async, + config: ThrottleConfig = defaultThrottleConfig): Observable { + return this.lift(new ThrottleTimeOperator(duration, scheduler, config.leading, config.trailing)); } class ThrottleTimeOperator implements Operator { constructor(private duration: number, - private scheduler: IScheduler) { + private scheduler: IScheduler, + private leading: boolean, + private trailing: boolean) { } call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler)); + return source.subscribe( + new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler, this.leading, this.trailing) + ); } } @@ -65,23 +73,39 @@ class ThrottleTimeOperator implements Operator { */ class ThrottleTimeSubscriber extends Subscriber { private throttled: Subscription; + private _hasTrailingValue: boolean = false; + private _trailingValue: T = null; constructor(destination: Subscriber, private duration: number, - private scheduler: IScheduler) { + private scheduler: IScheduler, + private leading: boolean, + private trailing: boolean) { super(destination); } protected _next(value: T) { - if (!this.throttled) { + if (this.throttled) { + if (this.trailing) { + this._trailingValue = value; + this._hasTrailingValue = true; + } + } else { this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })); - this.destination.next(value); + if (this.leading) { + this.destination.next(value); + } } } clearThrottle() { const throttled = this.throttled; if (throttled) { + if (this.trailing && this._hasTrailingValue) { + this.destination.next(this._trailingValue); + this._trailingValue = null; + this._hasTrailingValue = false; + } throttled.unsubscribe(); this.remove(throttled); this.throttled = null;