Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Observer): observer interface now allows partial Observers #1282

Merged
merged 1 commit into from
Feb 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/Notification.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Observer} from './Observer';
import {PartialObserver} from './Observer';
import {Observable} from './Observable';

export class Notification<T> {
Expand All @@ -8,32 +8,32 @@ export class Notification<T> {
this.hasValue = kind === 'N';
}

observe(observer: Observer<T>): any {
observe(observer: PartialObserver<T>): any {
switch (this.kind) {
case 'N':
return observer.next(this.value);
return observer.next && observer.next(this.value);
case 'E':
return observer.error(this.exception);
return observer.error && observer.error(this.exception);
case 'C':
return observer.complete();
return observer.complete && observer.complete();
}
}

do(next: (value: T) => void, error?: (err: any) => void, complete?: () => void): any {
const kind = this.kind;
switch (kind) {
case 'N':
return next(this.value);
return next && next(this.value);
case 'E':
return error(this.exception);
return error && error(this.exception);
case 'C':
return complete();
return complete && complete();
}
}

accept(nextOrObserver: Observer<T> | ((value: T) => void), error?: (err: any) => void, complete?: () => void) {
if (nextOrObserver && typeof (<Observer<T>>nextOrObserver).next === 'function') {
return this.observe(<Observer<T>>nextOrObserver);
accept(nextOrObserver: PartialObserver<T> | ((value: T) => void), error?: (err: any) => void, complete?: () => void) {
if (nextOrObserver && typeof (<PartialObserver<T>>nextOrObserver).next === 'function') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I think this guard will accepts IterableIterator<T> (from the code which have only a runtime dynamically type). In today's RxJS context, can a generator be a Observer<T>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saneyuki ... not really... generators have next, throw and return methods. Very, very early versions of this library where implemented in that manner, but it was abandoned when the es-observable spec moved away from that.

return this.observe(<PartialObserver<T>>nextOrObserver);
} else {
return this.do(<(value: T) => void>nextOrObserver, error, complete);
}
Expand Down
6 changes: 3 additions & 3 deletions src/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Observer} from './Observer';
import {PartialObserver} from './Observer';
import {Operator} from './Operator';
import {Scheduler} from './Scheduler';
import {Subscriber} from './Subscriber';
Expand Down Expand Up @@ -98,7 +98,7 @@ export class Observable<T> implements CoreOperators<T> {

/**
* @method subscribe
* @param {Observer|Function} observerOrNext (optional) either an observer defining all functions to be called,
* @param {PartialObserver|Function} observerOrNext (optional) either an observer defining all functions to be called,
* or the first of three possible handlers, which is the handler for each value emitted from the observable.
* @param {Function} error (optional) a handler for a terminal event resulting from an error. If no error handler is provided,
* the error will be thrown as unhandled
Expand All @@ -107,7 +107,7 @@ export class Observable<T> implements CoreOperators<T> {
* @description registers handlers for handling emitted values, error and completions from the observable, and
* executes the observable's subscriber function, which will take action to set up the underlying data stream
*/
subscribe(observerOrNext?: Observer<T> | ((value: T) => void),
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {

Expand Down
31 changes: 27 additions & 4 deletions src/Observer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
export interface NextObserver<T> {
isUnsubscribed?: boolean;
next: (value: T) => void;
error?: (err: any) => void;
complete?: () => void;
}

export interface ErrorObserver<T> {
isUnsubscribed?: boolean;
next?: (value: T) => void;
error: (err: any) => void;
complete?: () => void;
}

export interface CompletionObserver<T> {
isUnsubscribed?: boolean;
next?: (value: T) => void;
error?: (err: any) => void;
complete: () => void;
}

export type PartialObserver<T> = NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>;

export interface Observer<T> {
isUnsubscribed: boolean;
next(value: T): void;
error(error: any): void;
complete(): void;
isUnsubscribed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

export const empty: Observer<any> = {
Expand Down
18 changes: 9 additions & 9 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {isFunction} from './util/isFunction';
import {Observer} from './Observer';
import {Observer, PartialObserver} from './Observer';
import {Subscription} from './Subscription';
import {rxSubscriber} from './symbol/rxSubscriber';
import {empty as emptyObserver} from './Observer';
Expand All @@ -19,9 +19,9 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
public syncErrorThrowable: boolean = false;

protected isStopped: boolean = false;
protected destination: Observer<any>;
protected destination: PartialObserver<any>;

constructor(destinationOrNext?: Observer<any> | ((value: T) => void),
constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super();
Expand All @@ -37,10 +37,10 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.destination = (<Observer<any>> destinationOrNext);
this.destination = (<Subscriber<any>> destinationOrNext);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <Observer<any>> destinationOrNext);
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
}
break;
}
Expand Down Expand Up @@ -103,7 +103,7 @@ class SafeSubscriber<T> extends Subscriber<T> {
private _context: any;

constructor(private _parent: Subscriber<T>,
observerOrNext?: Observer<T> | ((value: T) => void),
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super();
Expand All @@ -115,9 +115,9 @@ class SafeSubscriber<T> extends Subscriber<T> {
next = (<((value: T) => void)> observerOrNext);
} else if (observerOrNext) {
context = observerOrNext;
next = (<Observer<T>> observerOrNext).next;
error = (<Observer<T>> observerOrNext).error;
complete = (<Observer<T>> observerOrNext).complete;
next = (<PartialObserver<T>> observerOrNext).next;
error = (<PartialObserver<T>> observerOrNext).error;
complete = (<PartialObserver<T>> observerOrNext).complete;
}

this._context = context;
Expand Down
4 changes: 2 additions & 2 deletions src/operator/mergeMapTo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
Expand Down Expand Up @@ -54,7 +54,7 @@ export class MergeMapToSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: any,
destination: Observer<R>,
destination: PartialObserver<R>,
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => R2,
value: T,
index: number): void {
Expand Down
4 changes: 2 additions & 2 deletions src/operator/observeOn.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
import {Notification} from '../Notification';

Expand Down Expand Up @@ -50,6 +50,6 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {

class ObserveOnMessage {
constructor(public notification: Notification<any>,
public destination: Observer<any>) {
public destination: PartialObserver<any>) {
}
}
4 changes: 2 additions & 2 deletions src/operator/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Observable} from '../Observable';
import {ArrayObservable} from '../observable/ArrayObservable';
import {isArray} from '../util/isArray';
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
Expand Down Expand Up @@ -199,7 +199,7 @@ class ZipBufferIterator<T, R> extends OuterSubscriber<T, R> implements LookAhead
buffer: T[] = [];
isComplete = false;

constructor(destination: Observer<T>,
constructor(destination: PartialObserver<T>,
private parent: ZipSubscriber<T, R>,
private observable: Observable<T>,
private index: number) {
Expand Down
16 changes: 8 additions & 8 deletions src/util/toSubscriber.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import {Observer} from '../Observer';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
import {rxSubscriber} from '../symbol/rxSubscriber';

export function toSubscriber<T>(
next?: Observer<T> | ((value: T) => void),
nextOrObserver?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscriber<T> {

if (next && typeof next === 'object') {
if (next instanceof Subscriber) {
return (<Subscriber<T>> next);
} else if (typeof next[rxSubscriber] === 'function') {
return next[rxSubscriber]();
if (nextOrObserver && typeof nextOrObserver === 'object') {
if (nextOrObserver instanceof Subscriber) {
return (<Subscriber<T>> nextOrObserver);
} else if (typeof nextOrObserver[rxSubscriber] === 'function') {
return nextOrObserver[rxSubscriber]();
}
}

return new Subscriber(next, error, complete);
return new Subscriber(nextOrObserver, error, complete);
}