Skip to content

Commit

Permalink
IntroToRx fixes (#2088)
Browse files Browse the repository at this point in the history
* Fix the code example of RxFsEvents
* Fix typos
* Update 11_SchedulingAndThreading.md
  • Loading branch information
aDisplayName authored Mar 4, 2024
1 parent 68121df commit b9942b2
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
10 changes: 5 additions & 5 deletions Rx.NET/Documentation/IntroToRx/03_CreatingObservableSequences.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class RxFsEvents : IObservable<FileSystemEventArgs>
{
// The FileSystemWatcher might report multiple errors, but
// we're only allowed to report one to IObservable<T>.
if (onErrorAlreadyCalled)
if (!onErrorAlreadyCalled)
{
observer.OnError(e.GetException());
onErrorAlreadyCalled = true;
Expand Down Expand Up @@ -149,7 +149,7 @@ IObservable<FileSystemEventArgs> deletions =
fs.Where(e => e.ChangeType == WatcherChangeTypes.Deleted);
```

When you call `Subscribe` on the `IObservable<T>` returned by the `Where` operator, it will call `Subscribe` on its input. So in this case, if we call `Subscribe` on both `configChanges` and `deletions`, that will result in _two_ calls to `Subscribe` on `rs`. So if `rs` is an instance of our `RxFsEvents` type above, each one will construct its own `FileSystemEventWatcher`, which is inefficient.
When you call `Subscribe` on the `IObservable<T>` returned by the `Where` operator, it will call `Subscribe` on its input. So in this case, if we call `Subscribe` on both `configChanges` and `deletions`, that will result in _two_ calls to `Subscribe` on `fs`. So if `fs` is an instance of our `RxFsEvents` type above, each one will construct its own `FileSystemEventWatcher`, which is inefficient.

Rx offers a few ways to deal with this. It provides operators designed specifically to take an `IObservable<T>` that does not tolerate multiple subscribers and wrap it in an adapter that can:

Expand Down Expand Up @@ -402,7 +402,7 @@ You provide this with a delegate that will be executed each time a subscription
private IObservable<int> SomeNumbers()
{
return Observable.Create<int>(
(IObserver<string> observer) =>
(IObserver<int> observer) =>
{
observer.OnNext(1);
observer.OnNext(2);
Expand All @@ -416,7 +416,7 @@ private IObservable<int> SomeNumbers()

Your delegate must return either an `IDisposable` or an `Action` to enable unsubscription. When the subscriber disposes their subscription in order to unsubscribe, Rx will invoke `Dispose()` on the `IDisposable` you returned, or in the case where you returned an `Action`, it will invoke that.

This example is reminiscent of the `MySequenceOfNumbers` example from the start of this chapter, in that it immediately produces a few fixed values. The main difference in this case is that Rx adds some wrappers that can handle awkward situations such as re-entrancy. Rx will sometimes automatically defer work to prevent deadlocks, so it's possible that code consuming the `IObservable<string>` returned by this method will see a call to `Subscribe` return before the callback in the code above runs, in which case it would be possible for them to unsubscribe inside their `OnNext` handler.
This example is reminiscent of the `MySequenceOfNumbers` example from the start of this chapter, in that it immediately produces a few fixed values. The main difference in this case is that Rx adds some wrappers that can handle awkward situations such as re-entrancy. Rx will sometimes automatically defer work to prevent deadlocks, so it's possible that code consuming the `IObservable<int>` returned by this method will see a call to `Subscribe` return before the callback in the code above runs, in which case it would be possible for them to unsubscribe inside their `OnNext` handler.

The following sequence diagram shows how this could occur in practice. Suppose the `IObservable<int>` returned by `SomeNumbers` has been wrapped by Rx in a way that ensures that subscription occurs in some different execution context. We'd typically determine the context by using a suitable [scheduler](11_SchedulingAndThreading.md#schedulers). (The [`SubscribeOn`](11_SchedulingAndThreading.md#subscribeon-and-observeon) operator creates such a wrapper.) We might use the [`TaskPoolScheduler`](11_SchedulingAndThreading.md#taskpoolscheduler) in order to ensure that the subscription occurs on some task pool thread. So when our application code calls `Subscribe`, the wrapper `IObservable<int>` doesn't immediately subscribe to the underlying observable. Instead it queues up a work item with the scheduler to do that, and then immediately returns without waiting for that work to run. This is how our subscriber can be in possession of an `IDisposable` representing the subscription before `Observable.Create` invokes our callback. The diagram shows the subscriber then making this available to the observer.

Expand Down Expand Up @@ -1297,4 +1297,4 @@ As a quick recap:
- IEnumerable&lt;T&gt;.ToObservable
- Observable.FromAsyncPattern

Creating an observable sequence is our first step to practical application of Rx: create the sequence and then expose it for consumption. Now that we have a firm grasp on how to create an observable sequence, we can look in more detail at the operators that allow us to describe processing to be applied, to build up more complex observable sequences.
Creating an observable sequence is our first step to practical application of Rx: create the sequence and then expose it for consumption. Now that we have a firm grasp on how to create an observable sequence, we can look in more detail at the operators that allow us to describe processing to be applied, to build up more complex observable sequences.
4 changes: 2 additions & 2 deletions Rx.NET/Documentation/IntroToRx/11_SchedulingAndThreading.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Rx does not impose constraints on which threads we use. An `IObservable<T>` is f

When we first explored these rules, we focused on how they determine the ordering of calls into any single observer. There can be any number of calls to `OnNext`, but once either `OnError` or `OnCompleted` have been invoked, there must be no further calls. But now that we're looking at concurrency, a different aspect of these rules becomes more important: for any single subscription, an observable source must not make concurrent calls into that subscription's observer. So if a source calls `OnNext`, it must wait until that call returns before either calling `OnNext` again, or calling `OnError` or `OnComplete`.

The upshot for observers is that as long as your observer is involved in just one subscription, it will only ever be asked to deal with one thing at a time. If doesn't matter if the source to which it is subscribed is a long and complex processing chain involving many different operators. Even if you build that source by combining multiple inputs (e.g., using [`Merge`](09_CombiningSequences.md#merge)), the fundamental rules require that if you called `Subscribe` just once on a single `IObservable<T>`, that source is never allowed to make multiple concurrent calls into your `IObserver<T>` methods.
The upshot for observers is that as long as your observer is involved in just one subscription, it will only ever be asked to deal with one thing at a time. It doesn't matter if the source to which it is subscribed is a long and complex processing chain involving many different operators. Even if you build that source by combining multiple inputs (e.g., using [`Merge`](09_CombiningSequences.md#merge)), the fundamental rules require that if you called `Subscribe` just once on a single `IObservable<T>`, that source is never allowed to make multiple concurrent calls into your `IObserver<T>` methods.

So although each call might come in on a different thread, the calls are strictly sequential (unless a single observer is involved in multiple subscriptions).

Expand Down Expand Up @@ -1044,4 +1044,4 @@ The main difference between these overloads, and using the `IScheduler` methods

As mentioned in the earlier section, although this logically represents recursion, Rx protects us from stack overflows. The schedulers implement this style of recursion by waiting for the method to return before performing the recursive call.

This concludes our tour of scheduling and threading. Next, we will look at the related topic of timing.
This concludes our tour of scheduling and threading. Next, we will look at the related topic of timing.

0 comments on commit b9942b2

Please sign in to comment.