Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe operator cannot infer return type as ConnectableObservable #2972

Closed
kasiditi opened this issue Oct 18, 2017 · 17 comments
Closed

Pipe operator cannot infer return type as ConnectableObservable #2972

kasiditi opened this issue Oct 18, 2017 · 17 comments

Comments

@kasiditi
Copy link
Contributor

kasiditi commented Oct 18, 2017

RxJS version: 5.5.0

Code to reproduce:

const obs = Observable.of(5);
const connectableObs = obs.pipe(
    publishReplay(1)
);

Expected behavior: The inferred type of connectableObs is ConnectableObservable<number>.

Actual behavior: The inferred type of connectableObs is Observable<number>.

Additional information:

TypeScript version: 2.5.3

My current workaround is to manually downcast the return value:

const obs = Observable.of(5);
const connectableObs = obs.pipe(
    publishReplay(1)
) as ConnectableObservable<number>;

Happy to send a PR, but not sure how should I fix this properly.

@cartant
Copy link
Collaborator

cartant commented Oct 19, 2017

The problem can be solved by adding additional overload signatures to pipe, allowing for the last operator to return a ConnectableObservable.

For example:

pipe<A>(op1: UnaryFunction<Observable<T>, ConnectableObservable<A>>): ConnectableObservable<A>;
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;

Using ConnectableObservable in the signtures for a method in Observable seems a little weird, as ConnectableObservable extends Observable, but this is only way I can see this working.

An import of ConnectableObservable would be necessary in Observable.ts, but as it's only used in the typings, the import is erased during transpilation.

The signatures for publish and publishLast would have to be changed, too, as it they do not specify ConnectableObservable in their return types.

Interestingly, adding such overload signatures to pipe seems to fix another problem with the inference of the publishReplay. Without them, the return type of the following:

const o = of(0).pipe(publishReplay(1));

is inferred as Observable<any> (note the any) and with them it's inferred as ConnectableObservable<number>.

@cartant
Copy link
Collaborator

cartant commented Oct 19, 2017

Actually, if the overload signatures for pipe are written like this:

pipe<A, OA extends Observable<A>>(op1: UnaryFunction<Observable<T>, OA>): OA;
pipe<A, B, OB extends Observable<B>>(op1: OperatorFunction<T, A>, op2: UnaryFunction<Observable<A>, OB>): OB;
// etc.

The problem can be solved without specific reference to ConnectableObservable.

This seems to be okay with the version of TypeScript (2.0) that RxJS uses, but I've not investigated this thoroughly.

@Dynalon
Copy link

Dynalon commented Apr 25, 2018

This issue still exists in rxjs 6.0 final. Reproduce:

const connectableObservable = from([1, 2, 3]).pipe(publish());
connectableObservable.connect()

tsc will fail, saying error TS2339: Property 'connect' does not exist on type 'Observable<any>'.. Casting the connectableObservable to any and calling .connect() works as expected.

Edit: Used latest typescript 2.8.3.

@Niladri24dutta
Copy link

Yeah still the issue exists .. Thanks for pointing me to the original issue.

plaffey pushed a commit to plaffey/rxjs that referenced this issue Jul 24, 2018
Allows specific types of Observables to be passed through (as in ReactiveX#2972),
and also prevents incorrect typings from falling through to ellipsis
function due to the use of `any`.
plaffey pushed a commit to plaffey/rxjs that referenced this issue Jul 24, 2018
Allows specific types of Observables to be passed through (as in ReactiveX#2972),
and also prevents incorrect typings from falling through to ellipsis
function due to the use of `any`.
plaffey pushed a commit to plaffey/rxjs that referenced this issue Jul 24, 2018
Allows specific types of Observables to be passed through (as in ReactiveX#2972),
and also prevents incorrect typings from falling through to ellipsis
function due to the use of `any`.
plaffey pushed a commit to plaffey/rxjs that referenced this issue Jul 24, 2018
Allows specific types of Observables to be passed through (as in ReactiveX#2972),
and also prevents incorrect typings from falling through to ellipsis
function due to the use of `any`.

Closes ReactiveX#2972
@BasHamer
Copy link

https://www.learnrxjs.io/operators/multicasting/publish.html did not work for me until doing this.

@samal-rasmussen
Copy link
Contributor

samal-rasmussen commented Sep 12, 2018

I just ran into this issue while updating from rxjs 5.5.6 to 6.3.2. I see it has been a while since any activity here. Should we expect any fix in the future?

The code I was updating was using publish and then connect, which I updated to have the publish inside a pipe like in the example from @Dynalon. I also got a Observable type instead of a ConnectableObservable, so the connect call got a type error.

@ebrehault
Copy link

ebrehault commented Oct 12, 2018

Workaround:

(yourObs as ConnectableObservable<YourType>).connect()

EDIT: that's wrong, it will just type the connectable (see @simeyla comment below)

@fljot
Copy link
Contributor

fljot commented Nov 10, 2018

@ebrehault this looks like (type)safer workaround
#3595

upd: actually that can be wrapped into observable factory function to provide better type inference and more traditional shape: Observable<T> => ConnectedObservable<T>

// NB! Remember that with such import you probably would like tree-shaking
import { publish } from 'rxjs/operators';

export function _publish<T>(source: Observable<T>): ConnectableObservable<T> {
  return publish<T>()(source);
}

upd:

import { UnaryFunction } from 'rxjs/interfaces';

export function _publish<T, R>(
  source: Observable<T>,
  publishFn: UnaryFunction<Observable<T>, ConnectableObservable<R>>
): ConnectableObservable<R> {
  return publishFn(source);
}

// use:
// NB! Remember that with such import you probably would like tree-shaking
import { publishReplay } from 'rxjs/operators';
declare const foo$: Observable<number>;
_publish(foo$, publishReplay(1)); // inferred to ConnectableObservable<number>

@benlesh @cartant what do you say, is it possible to fix with the whole pipe approach?
And another question regarding piping and types: since there's a limited set of overload signatures for pipe, it means that that type-safe long pipelines are impossible? Can this topic be addressed in v7?

@cartant
Copy link
Collaborator

cartant commented Nov 10, 2018

@fljot In v7 it won't be an issue, as the operators that will be pipeable won't return a ConnectableObservable. They will be like the variants that accept a selector. ConnectableObservable instances will only be returned by static creator/factory functions. See #3833 for more information. Or look at the source in the experimental branch.

@simeyla
Copy link

simeyla commented Jan 14, 2019

I got myself super confused by this whole issue, so just wanted to add a couple findings that may help others - especially in understanding what is not the problem:

  • First - this problem only applies to publish and not publishReplay
  • publishReplay does NOT return a connected observable so no issue with the pipe there
  • Ultimately the problem is that publish() in a pipe completely loses both the connectable nature of the observable and its type. (Typescript limitation from an earlier version AFAIK).
  • When you use publish typescript strips away both the ConnectableObservable type AND the fundamental type of your source.

In other words the type of of(1,2,3).pipe(publish()) is Observable<any> instead of ConnectableObservable<number> (which is what it really is).

When publish is most useful you'll often have several 'child' streams that take the output of your primary source stream and transform it. So just by using publish in the pipe you lose the ability inside your operators to know what your incoming observable is.

@fljot's answer is most useful to solve this problem because all it does it to call the publish operator and in isolation typescript can properly figure out all the types. The key usefulness of this method is that you don't need to explicitly provide (or even know) the type of your source observable, so everything will ripple through nicely by itself.

So you can publish a stream, use it to create other streams and then connect to it.

The only recommendation I have for usage is to call it on a separate line like this:


// I used an underscore to suggest this is a temporary variable
const _stockQuotes$ = of(10, 11, 12, 11, 10);   
const publishedStockQuotes$ = publish(_stockQuotes$);  // function from @fjlot

// derivative streams - (publishedStockQuotes$ is Observable<number>)
const throttledStockQuotes$ = publishedStockQuotes$.pipe(throttleTime(5000));
const splitStockQuotes$ = publishedStockQuotes$.pipe(map(quote => quote / 2));   // 

publishedStockQuotes$.connect();   

@ebrehault your answer is misleading. The connect() method isn't generic, so the goal is not to have a 'typed connectable' observable when you call connect(), but a 'connectable' observable when you call connect AND a 'typed' observable when you create further streams.

 // connect(): Subscription;
(yourObs as ConnectableObservable<any>).connect();

@ebrehault
Copy link

@simeyla oh right! true! thanks :)

@jonrimmer
Copy link

@simeyla Why do you claim publishReplay() doesn't return a ConnectableObservable?

It does: https://github.com/ReactiveX/rxjs/blob/6.5.2/src/internal/operators/publishReplay.ts#L24

@dudewad
Copy link

dudewad commented Jul 10, 2019

I'm curious if this is considered a bug that's going to get fixed? I am fine type casting but would prefer not to.
I do want to point out though, that pipe() has always only returned Observable<T> in my experience -- if I'm wrong there, cool, I just wonder if justifying having an operator transform the output type of the piped observable leads to justifying doing it for all types?
What I mean by that is, I created a custom operator to convert an observable to a BehaviorSubject which gives caching. But this method requires the solution provided here -- typecasting to BehaviorSubject<T> at the end of the pipe. This is "fine", but ideally, if an operator is spitting out some subtype of Observable, that subtype, at least logically speaking, should be preserved.

@pie6k
Copy link

pie6k commented Mar 24, 2020

Seems it's still an issue. Is there any update related to this?

image

@codehunter13
Copy link

still an issue????

@samal-rasmussen
Copy link
Contributor

samal-rasmussen commented Nov 24, 2020

Piping the type of ConnectableObservable is currently planned to be removed in v.8: #5431

@cartant
Copy link
Collaborator

cartant commented Apr 30, 2021

Closing this 'cause ConnectableObservable is deprecated in v7.

@cartant cartant closed this as completed Apr 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests