From a15eae2741f47b55e5385d41c18115728dc63bec Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 18 Aug 2020 11:38:38 -0500 Subject: [PATCH] fix(buffer): Ensure notifier is subscribed after source - Resolves an issue where a multicast observable could not adequately be used to notify a buffer on itself - Corrects a regression that was introduced by a bad merge back in 6.0. This was originally corrected in PR #2195. fixes #1754 --- spec/operators/buffer-spec.ts | 22 +++++++++++++++++++++- src/internal/operators/buffer.ts | 8 +++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 3cda51c918..1f933d8360 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -1,7 +1,8 @@ import { buffer, mergeMap, take } from 'rxjs/operators'; -import { EMPTY, NEVER, throwError, of } from 'rxjs'; +import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; +import { expect } from 'chai'; /** @test {buffer} */ describe('Observable.prototype.buffer', () => { @@ -266,4 +267,23 @@ describe('Observable.prototype.buffer', () => { expectSubscriptions(b.subscriptions).toBe(bsubs); }); }); + + it('should emit properly with an observable using itself as a notifier', () => { + const results: any[] = []; + const subject = new Subject(); + + const source = subject.pipe( + buffer(subject) + ).subscribe({ + next: value => results.push(value), + complete: () => results.push('complete') + }); + + subject.next(1); + expect(results).to.deep.equal([[1]]); + subject.next(2); + expect(results).to.deep.equal([[1], [2]]); + subject.complete(); + expect(results).to.deep.equal([[1], [2], 'complete']); + }); }); diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 2e068d6cab..19fe5dfa7e 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -56,7 +56,10 @@ class BufferOperator implements Operator { } call(subscriber: Subscriber, source: any): any { - return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier)); + const bufferSubscriber = new BufferSubscriber(subscriber); + subscriber.add(source.subscribe(bufferSubscriber)); + subscriber.add(innerSubscribe(this.closingNotifier, new SimpleInnerSubscriber(bufferSubscriber))); + return subscriber; } } @@ -68,9 +71,8 @@ class BufferOperator implements Operator { class BufferSubscriber extends SimpleOuterSubscriber { private buffer: T[] = []; - constructor(destination: Subscriber, closingNotifier: Observable) { + constructor(destination: Subscriber) { super(destination); - this.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this))); } protected _next(value: T) {