From 41e33f58e6f33253adb00cf0de85fbd665ab6463 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Wed, 14 Jun 2017 23:14:38 +0200 Subject: [PATCH] fix(buffer): subscribe to source and closingNotifier in proper order (#2195) In buffer operator subscribe to source observable first, so that when closingNotifier emits value, all source values emited before land in buffer Closes #1610 BREAKING CHANGE: When source and closingNotifier fire at the same time, it is expected that value emitted by source will first land in buffer and then closingNotifier will close it. Because of reversed subscription order, closingNotifier emitted first, so source was not able to put value in buffer before it was closed. Now source is subscribed before closingNotifier, so if they fire at the same time, source value is put into buffer and then closingNotifer closes it. --- spec/operators/buffer-spec.ts | 13 +++++++++++++ src/operator/buffer.ts | 13 ++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index e8c75e1131..376748b3e6 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -225,6 +225,19 @@ describe('Observable.prototype.buffer', () => { expectSubscriptions(b.subscriptions).toBe(bsubs); }); + it('should work with filtered source as closingNotifier', () => { + const values = {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8}; + + const source = hot('-0-1-2-3-4-5-6-7-8-|', values); + const expected = '-a---b---c---d---e-|'; + + const expectedValues = {a: [0], b: [1, 2], c: [3, 4], d: [5, 6], e: [7, 8]}; + const filteredSource = source.filter(x => x % 2 === 0); + + const result = source.buffer(filteredSource); + expectObservable(result).toBe(expected, expectedValues); + }); + it('should emit last buffer if source completes', () => { const a = hot('-a-b-c-d-e-f-g-h-i-|'); const b = hot('-----B-----B--------'); diff --git a/src/operator/buffer.ts b/src/operator/buffer.ts index 73807fa5a5..e690836c53 100644 --- a/src/operator/buffer.ts +++ b/src/operator/buffer.ts @@ -1,5 +1,6 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { TeardownLogic } from '../Subscription'; import { Observable } from '../Observable'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -47,8 +48,11 @@ class BufferOperator implements Operator { constructor(private closingNotifier: Observable) { } - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier)); + call(subscriber: Subscriber, source: any): TeardownLogic { + const bufferSubscriber = new BufferSubscriber(subscriber); + const subscription = source.subscribe(bufferSubscriber); + bufferSubscriber.subscribeToClosingNotifier(this.closingNotifier); + return subscription; } } @@ -60,8 +64,11 @@ class BufferOperator implements Operator { class BufferSubscriber extends OuterSubscriber { private buffer: T[] = []; - constructor(destination: Subscriber, closingNotifier: Observable) { + constructor(destination: Subscriber) { super(destination); + } + + subscribeToClosingNotifier(closingNotifier: Observable) { this.add(subscribeToResult(this, closingNotifier)); }