From b9942b2589b8b08bce360337a2bb5c9e32f59057 Mon Sep 17 00:00:00 2001 From: aDisplayName Date: Mon, 4 Mar 2024 09:55:44 -0600 Subject: [PATCH] IntroToRx fixes (#2088) * Fix the code example of RxFsEvents * Fix typos * Update 11_SchedulingAndThreading.md --- .../IntroToRx/03_CreatingObservableSequences.md | 10 +++++----- .../IntroToRx/11_SchedulingAndThreading.md | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Rx.NET/Documentation/IntroToRx/03_CreatingObservableSequences.md b/Rx.NET/Documentation/IntroToRx/03_CreatingObservableSequences.md index e719dabce..69fc569ae 100644 --- a/Rx.NET/Documentation/IntroToRx/03_CreatingObservableSequences.md +++ b/Rx.NET/Documentation/IntroToRx/03_CreatingObservableSequences.md @@ -118,7 +118,7 @@ public class RxFsEvents : IObservable { // The FileSystemWatcher might report multiple errors, but // we're only allowed to report one to IObservable. - if (onErrorAlreadyCalled) + if (!onErrorAlreadyCalled) { observer.OnError(e.GetException()); onErrorAlreadyCalled = true; @@ -149,7 +149,7 @@ IObservable deletions = fs.Where(e => e.ChangeType == WatcherChangeTypes.Deleted); ``` -When you call `Subscribe` on the `IObservable` returned by the `Where` operator, it will call `Subscribe` on its input. So in this case, if we call `Subscribe` on both `configChanges` and `deletions`, that will result in _two_ calls to `Subscribe` on `rs`. So if `rs` is an instance of our `RxFsEvents` type above, each one will construct its own `FileSystemEventWatcher`, which is inefficient. +When you call `Subscribe` on the `IObservable` returned by the `Where` operator, it will call `Subscribe` on its input. So in this case, if we call `Subscribe` on both `configChanges` and `deletions`, that will result in _two_ calls to `Subscribe` on `fs`. So if `fs` is an instance of our `RxFsEvents` type above, each one will construct its own `FileSystemEventWatcher`, which is inefficient. Rx offers a few ways to deal with this. It provides operators designed specifically to take an `IObservable` that does not tolerate multiple subscribers and wrap it in an adapter that can: @@ -402,7 +402,7 @@ You provide this with a delegate that will be executed each time a subscription private IObservable SomeNumbers() { return Observable.Create( - (IObserver observer) => + (IObserver observer) => { observer.OnNext(1); observer.OnNext(2); @@ -416,7 +416,7 @@ private IObservable SomeNumbers() Your delegate must return either an `IDisposable` or an `Action` to enable unsubscription. When the subscriber disposes their subscription in order to unsubscribe, Rx will invoke `Dispose()` on the `IDisposable` you returned, or in the case where you returned an `Action`, it will invoke that. -This example is reminiscent of the `MySequenceOfNumbers` example from the start of this chapter, in that it immediately produces a few fixed values. The main difference in this case is that Rx adds some wrappers that can handle awkward situations such as re-entrancy. Rx will sometimes automatically defer work to prevent deadlocks, so it's possible that code consuming the `IObservable` returned by this method will see a call to `Subscribe` return before the callback in the code above runs, in which case it would be possible for them to unsubscribe inside their `OnNext` handler. +This example is reminiscent of the `MySequenceOfNumbers` example from the start of this chapter, in that it immediately produces a few fixed values. The main difference in this case is that Rx adds some wrappers that can handle awkward situations such as re-entrancy. Rx will sometimes automatically defer work to prevent deadlocks, so it's possible that code consuming the `IObservable` returned by this method will see a call to `Subscribe` return before the callback in the code above runs, in which case it would be possible for them to unsubscribe inside their `OnNext` handler. The following sequence diagram shows how this could occur in practice. Suppose the `IObservable` returned by `SomeNumbers` has been wrapped by Rx in a way that ensures that subscription occurs in some different execution context. We'd typically determine the context by using a suitable [scheduler](11_SchedulingAndThreading.md#schedulers). (The [`SubscribeOn`](11_SchedulingAndThreading.md#subscribeon-and-observeon) operator creates such a wrapper.) We might use the [`TaskPoolScheduler`](11_SchedulingAndThreading.md#taskpoolscheduler) in order to ensure that the subscription occurs on some task pool thread. So when our application code calls `Subscribe`, the wrapper `IObservable` doesn't immediately subscribe to the underlying observable. Instead it queues up a work item with the scheduler to do that, and then immediately returns without waiting for that work to run. This is how our subscriber can be in possession of an `IDisposable` representing the subscription before `Observable.Create` invokes our callback. The diagram shows the subscriber then making this available to the observer. @@ -1297,4 +1297,4 @@ As a quick recap: - IEnumerable<T>.ToObservable - Observable.FromAsyncPattern -Creating an observable sequence is our first step to practical application of Rx: create the sequence and then expose it for consumption. Now that we have a firm grasp on how to create an observable sequence, we can look in more detail at the operators that allow us to describe processing to be applied, to build up more complex observable sequences. \ No newline at end of file +Creating an observable sequence is our first step to practical application of Rx: create the sequence and then expose it for consumption. Now that we have a firm grasp on how to create an observable sequence, we can look in more detail at the operators that allow us to describe processing to be applied, to build up more complex observable sequences. diff --git a/Rx.NET/Documentation/IntroToRx/11_SchedulingAndThreading.md b/Rx.NET/Documentation/IntroToRx/11_SchedulingAndThreading.md index 2bd7168f5..963d67a64 100644 --- a/Rx.NET/Documentation/IntroToRx/11_SchedulingAndThreading.md +++ b/Rx.NET/Documentation/IntroToRx/11_SchedulingAndThreading.md @@ -10,7 +10,7 @@ Rx does not impose constraints on which threads we use. An `IObservable` is f When we first explored these rules, we focused on how they determine the ordering of calls into any single observer. There can be any number of calls to `OnNext`, but once either `OnError` or `OnCompleted` have been invoked, there must be no further calls. But now that we're looking at concurrency, a different aspect of these rules becomes more important: for any single subscription, an observable source must not make concurrent calls into that subscription's observer. So if a source calls `OnNext`, it must wait until that call returns before either calling `OnNext` again, or calling `OnError` or `OnComplete`. -The upshot for observers is that as long as your observer is involved in just one subscription, it will only ever be asked to deal with one thing at a time. If doesn't matter if the source to which it is subscribed is a long and complex processing chain involving many different operators. Even if you build that source by combining multiple inputs (e.g., using [`Merge`](09_CombiningSequences.md#merge)), the fundamental rules require that if you called `Subscribe` just once on a single `IObservable`, that source is never allowed to make multiple concurrent calls into your `IObserver` methods. +The upshot for observers is that as long as your observer is involved in just one subscription, it will only ever be asked to deal with one thing at a time. It doesn't matter if the source to which it is subscribed is a long and complex processing chain involving many different operators. Even if you build that source by combining multiple inputs (e.g., using [`Merge`](09_CombiningSequences.md#merge)), the fundamental rules require that if you called `Subscribe` just once on a single `IObservable`, that source is never allowed to make multiple concurrent calls into your `IObserver` methods. So although each call might come in on a different thread, the calls are strictly sequential (unless a single observer is involved in multiple subscriptions). @@ -1044,4 +1044,4 @@ The main difference between these overloads, and using the `IScheduler` methods As mentioned in the earlier section, although this logically represents recursion, Rx protects us from stack overflows. The schedulers implement this style of recursion by waiting for the method to return before performing the recursive call. -This concludes our tour of scheduling and threading. Next, we will look at the related topic of timing. \ No newline at end of file +This concludes our tour of scheduling and threading. Next, we will look at the related topic of timing.