From a7f5bca6ab4641d11da47ec0de47a0add367517d Mon Sep 17 00:00:00 2001 From: Hoarfroster Date: Thu, 11 Mar 2021 21:45:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=8E=20RxJava=202=20=E8=BD=AC=E5=90=91?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=20Kotlin=20=E6=B5=81=EF=BC=9A=E5=A4=9A?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=20(#8008)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 粗译 * Apply suggestions from code review * Apply suggestions from code review * Update article/2021/from-rxjava-2-to-kotlin-flow-threading.md Co-authored-by: zenblo <71975958+zenblo@users.noreply.github.com> Co-authored-by: samyu2000 Co-authored-by: zenblo <71975958+zenblo@users.noreply.github.com> Co-authored-by: samyu2000 --- .../from-rxjava-2-to-kotlin-flow-threading.md | 302 +++++++++++------- 1 file changed, 191 insertions(+), 111 deletions(-) diff --git a/article/2021/from-rxjava-2-to-kotlin-flow-threading.md b/article/2021/from-rxjava-2-to-kotlin-flow-threading.md index ecbc434ef74..9228a66cfce 100644 --- a/article/2021/from-rxjava-2-to-kotlin-flow-threading.md +++ b/article/2021/from-rxjava-2-to-kotlin-flow-threading.md @@ -2,177 +2,256 @@ > * 原文作者:[Vasya Drobushkov](https://medium.com/@krossovochkin) > * 译文出自:[掘金翻译计划](https://github.com/xitu/gold-miner) > * 本文永久链接:[https://github.com/xitu/gold-miner/blob/master/article/2021/from-rxjava-2-to-kotlin-flow-threading.md](https://github.com/xitu/gold-miner/blob/master/article/2021/from-rxjava-2-to-kotlin-flow-threading.md) -> * 译者: -> * 校对者: +> * 译者:[霜羽 Hoarfroster](https://github.com/PassionPenguin) +> * 校对者:[samyu2000](https://github.com/samyu2000)、[zenblo](https://github.com/zenblo) -# From RxJava 2 to Kotlin Flow: Threading +# 从 RxJava 2 转向使用 Kotlin 流:多线程 -![[Source](https://unsplash.com/photos/vyyVbUOYNPc)](https://cdn-images-1.medium.com/max/2000/0*piI5NnrRuivMUOKD) +![[来源](https://unsplash.com/photos/vyyVbUOYNPc)](https://cdn-images-1.medium.com/max/2000/0*piI5NnrRuivMUOKD) -## Introduction +## 简介 -For a long time RxJava was undisputed leader for reactive solutions on Android, though with Kotlin expansion and introducing cold streams (Flow) seems situation might be rapidly changing in the coming years. -Though reactive programming is not related to threading in the first place, concurrency and parallelism are very important anyway. -In this article we’ll try to make short recap on threading in RxJava 2 (with some basic caveats on its usage) and then take a look at how threading works in Kotlin Flow, so if anyone would like to migrate their code without affecting functionality it would be nice and smooth. +长期以来,RxJava 一直是 Android 响应式解决方案的无可争议的引领者。虽然 Kotlin 不断扩展,并且引入了冷流(Flow)的概念,情况可能在未来几年迅速改变,但这并不意味着 RxJava 会退出历史舞台。虽说响应式编程最初与线程无关,但合适的线程的并发和并行对程序而言仍然非常重要。在本文中,我们将简要回顾 RxJava 2 中的线程(对其用法做一些基本说明),然后讨论一下 Kotlin Flow 中线程的工作原理,让我们能在不影响功能的情况下能够顺利地完成代码迁移。 -Let’s start from the short recap on RxJava 2. +让我们从对 RxJava 2 的简短回顾开始。 ## RxJava 2 -Both RxJava 2 `Observable` and Kotlin Flow are cold streams. That means that code inside is not executed until there is subscription. +RxJava 2 的 `Observable` 和 Kotlin 的 Flow 都是冷流,意味着其中的代码在订阅之前都不会被执行。 -> In RxJava there are also other types such as `Flowable`, `Single`, etc. In the article we’ll talk about `Observable` only because everything else applies to other types as well. +> 除了 `Observable` 以外,在 RxJava 中其实还有其它类型,例如 `Flowable` 或是 `Single` 等。但在本文中,我们只会讲讲 `Observable`,因为它们的原理是相似的。 -Basic usage looks like the following: +RxJava `Observable` 的最基础使用如下所示: -![](https://cdn-images-1.medium.com/max/2000/1*Y8LjGynLPUwJ6ZNzVAG2HQ.png) +```kotlin +observeSomething() + .subscribeOn(io()) + .observeOn(mainThread()) + .subscribe { result -> println(result) } +``` -Here we see that we observe for some changes with subscribing on io. And each result received we print on main thread (because we are observing on main). +在上述代码中,我们订阅了输入输出事件。程序运行时,只要发生了 I/O 事件,就会把接收到的结果在主线程打印出来。 #### subscribeOn -This is an operator which declares on which scheduler observable will be subscribed to. “Where it will be subscribed” means — “on which scheduler our chain will be started”. +subscribeOn 是一个运算符,用于声明将在哪一个调度器上设置一个 `Observable` 的订阅。“将在哪一个调度器上订阅”是指“将在哪个调度器上启动我们的执行程序”。 -The first important thing about subscribeOn is that it doesn’t matter where it will be: +第一个划重点的地方是:我们并不需要管 `subscribeOn` 具体在哪个地方声明。例如我们可以这样: -![](https://cdn-images-1.medium.com/max/2000/1*l2-3g3OFyw_IOLIwTKLl7A.png) +```kotlin +observeSomething() + .subscribeOn(io()) + .observeOn(mainThread()) + .subscribe { result -> println(result) } +``` -![](https://cdn-images-1.medium.com/max/2000/1*zwQicVXtWi_uvcHNmQH8rQ.png) +也可以这样: -Both these cases produce same results and it is not surprise. By providing this operator in the chain we declare where chain will be started — and this knowledge can’t be dependent on position. +```kotlin +observeSomething() + .observeOn(mainThread()) + .subscribeOn(io()) + .subscribe { result -> println(result) } +``` -Second thing is that as chain can’t be started on multiple schedulers simultaneously — there is no need to put multiple subscribeOn in the chain as only one will take effect. If you for some reason put multiple subscribeOn operators in the chain, then the top one will win and the bottom will be ignored: +上述两种情况的运行结果是一样的,这并不意外。我们通过在这一串代码链中声明了这些运算符声明了这个链的开始,但其中的某些运算符本身并不依赖于声明的位置,就例如上面的例子那样。 -![](https://cdn-images-1.medium.com/max/2000/1*LBKRVDIpr_rxjylKadaSgg.png) +第二个划重点的地方是,因为一个链并不能同时在多个调度器处启动,因此我们无需在链中添加多个 `subscribeOn`,因为只有其中之一会起作用。如果出于某种原因你将多个 `subscribeOn` 运算符放在链中,则最上面的一个将被使用,而最下面的将被忽略: + +```kotlin +observeSomething() + .subscribeOn(io()) // 这个会被使用上 + .observeOn(mainThread()) + .subscribeOn(io()) // 这个会直接被忽略 + .subscribe { result -> println(result) } +``` #### observeOn -When subscribeOn says on which scheduler chain will be started, observeOn says on which scheduler thread will proceed. Effectively that means that observeOn changes scheduler in the chain below itself. +`subscribeOn` 表示将在哪个调度器上启动链,而 `observeOn` 表示将在哪个调度器上运行线程。实际上,这意味着 `observeOn` 会更改下面的链中的运行的线程的调度器。 -![](https://cdn-images-1.medium.com/max/2000/1*uCiZwTFtiPoh8KZdMUDLpg.png) +```kotlin +/* 1 */ observeSomething() +/* 1 */ .subscribeOn(io()) +/* 2 */ .observeOn(mainThread()) +/* 2 */ .subscribe { result -> println(result) } +``` -Here we see that from the chain started till the observeOn we’re on io (red line) and then observeOn changes chain to be run on mainThread scheduler — so everything below is on mainThread now (green line). +在这里,我们看到从链开始,直到链上的 `observeOn` 定义是第一部分,然后 `observeOn` 更改链要在 `mainThread` 调度器上运行,因此现在以下全部内容都在 `mainThread` (第二部分)上执行。 -Unlike subscribeOn it is actually has some sense to add multiple observeOn if there is a need: +与 `subscribeOn` 不同,实际上如果真的需要,我们可以添加多个 `observeOn`: -![](https://cdn-images-1.medium.com/max/2000/1*ES5NJCpKCmVVhdD36UurzQ.png) +```kotlin +/* 1 */ observeSomething() +/* 1 */ .subscribeOn(io()) +/* 2 */ .observeOn(computation()) +/* 2 */ .map { result -> result.length } +/* 3 */ .observeOn(mainThread()) +/* 3 */ .subscribe { result -> println(result) } +``` -If we look at example above: here we might say that we load something from network, then calculate something and then print result. Adding multiple observeOn first switches to computation scheduler (to make computation in background thread — it is blue line), and then switch to mainThread to print result. +如上面的例子那样,我们先从网络上获取一些数据,接着进行一些计算处理,再打印结果。。我们添加了多个 `observeOn`,让程序首先切换到 `computation` 调度器(这样做的目的是在后台线程中进行计算,这是第二部分),然后切换到 `mainThread` 打印结果。 #### just + defer -One common mistake with subscribeOn is its usage with `Observable.just`. - -![](https://cdn-images-1.medium.com/max/2000/1*4EVWYZCJ4u9mPrJazHQaeg.png) - -Value inside `just` is calculated immediately and not upon subscription. That means that if you create such observable on main thread, then that potentially heavy computation will be done on main thread. Subscription would be done correctly on io, but value for `just` will be calculated before that. - -One of the ways to fix this is to wrap your `Observable.just` into `Observable.defer`, so everything inside will be calculated upon subscription and on the scheduler we’ve declared in subscribeOn: +`subscribeOn` 的一个常见错误是将它与 `Observable.just` 一起使用。 -![](https://cdn-images-1.medium.com/max/2000/1*x8ASbBk5N1KnVIDaT8KRHA.png) +```kotlin +Observable.just(loadDataSync()) + .subscribeOn(io()) + .observeOn(mainThread()) + .subscribe { result -> println(result) } +``` -#### flatMap concurrency and parallelism +`just` 参数的值是立即计算的,而不是在订阅时才计算的。这意味着,如果您在主线程上创建此类可观察的对象,那么可能会在主线程上进行大量潜在的计算。虽说订阅将在 `io` 上正确完成,但是 `just` 的值将在订阅之前就被计算出来了。 -Another tricky thing comes from the usage of operator `flatMap` and understanding of the concurrency and parallelism. +解决此问题的方法之一是将你的 `Observable.just` 调用包装到 `Observable.defer` 中,这样调用所执行的所有内容都将在订阅时以及在我们位于 `subscribeOn` 处所声明的调度器上进行计算: -One example to understand a problem is when we have stream of list of ids and for each we’d like to load some data from the network: +```kotlin +Observable.defer { Observable.just(loadDataSync()) } + .subscribeOn(io()) + .observeOn(mainThread()) + .subscribe { result -> println(result) } +``` -![](https://cdn-images-1.medium.com/max/2000/1*KmPkrVtEedgkXu08JxSMew.png) +#### flatMap 的并发和并行 -What we could expect here is that we’ve subscribed to io, io() has thread pool under the hood, therefore our `loadData` calls for each id was successfully paralleled. But that’s not the case. -We wrote concurrent code using `flatMap`, but it is not run in parallel and the reason of that is that we’ve declared our chain to be started on io. Our chain start is on `flatMapIterable` and that means that upon subscription one thread from io pool will be taken and on that single thread everything will be run. -In order to change behavior and make our code run in parallel we need to move subscribeOn inside `flatMap`: +另一个棘手的事情来自使用运算符 `flatMap` 和我们对并发性和并行性的理解。 -![](https://cdn-images-1.medium.com/max/2000/1*RfeS9DYoGIMoj05cI-zkpA.png) +例如,当我们拥有 ID 列表流,并且我们需要对每一个 ID 都执行一次从网络中加载数据: -Each inner observable (observable inside `flatMap`) will be subscribed as soon as event comes into `flatMap`. On each event there will be subscription on which new thread from io pool will be taken. And this way we achieve parallelism. +```kotlin +Observable.fromIterable(listOf("id1", "id2", "id3")) + .flatMap { id -> + loadData(id) + } + .subscribeOn(io()) + .observeOn(mainThread()) + .toList() + .subscribe { result -> println(result) } +``` -So, when we use some operators like `flatMap` our chain has more than one subscription points: one for original chain start and one for each inner observable: +我们在这里的预期是,我们已经订阅了 `io`,`io()` 的底层有线程池,因此对每个 `id` 的 `loadData` 的调用是并行的。但是事实并非如此。我们使用 `flatMap` 编写了并发代码,但它不是并行运行的,其原因是我们告诉了程序我们要在 `io` 上启动链。我们的链的起点在 `flatMapIterable` 上,这意味着在订阅后,将使用 `io` 池中的一个线程,并在该单个线程上运行所有线程。为了改变行为并使我们的代码并行运行,我们需要将 `subscribeOn` 移动到 `flatMap` 之内: + +```kotlin +Observable.fromIterable(listOf("id1", "id2", "id3")) + .flatMap { id -> + loadData(id) + .subscribeOn(io()) + } + .observeOn(mainThread()) + .toList() + .subscribe { result -> println(result) } +``` -![](https://cdn-images-1.medium.com/max/2000/1*qft5P6_SBwP8sbYVL5IETg.png) +一旦执行到了 `flatMap`,每个内部的可观察对象(`flatMap` 内部的可观察对象)都将被订阅。意味着在每一次执行 `loadData` 函数,都会有一个订阅,从 `io` 池中获取新线程。这样我们就达到了并行性。 + +因此,当我们使用诸如 `flatMap` 之类的运算符时,我们的链应该有多个订阅点:一个用于原始链起点,每个用于内部的可观察点: + +```kotlin +/* 订阅点 1 */ Observable.fromIterable(listOf("id1", "id2", "id3")) + .flatMap { id -> +/* 订阅点 2 */ loadData(id) + .subscribeOn(io()) + .flatMap { +/* 订阅点 3 */ loadData2(id) + .subscribeOn(io()) + } + } + .observeOn(mainThread()) + .toList() + .subscribe { result -> println(result) } +``` -On the picture arrows point where subscription happens. Using subscribeOn we can declare on which scheduler subscription in such a points should happen. +代码中的注释指向了每一个订阅发生的位置。通过使用 `subscribeOn`,我们可以声明在这些情况下应该在哪个调度程序订阅上进行。 -#### No threading +#### 单线程运行 -Last but not least if we don’t use subscribeOn or observeOn, then code will be synchronous. All the execution will be sequential and before observable completed execution of next statements will be blocked. +最后,但也同样重要的是,如果我们不使用 `subscribeOn` 或 `observeOn`,那么代码将是同步进行的,所有执行将是顺序执行的,并且在下一个语句的可观察到的完整执行之前是暂停的。 -That’s is basically it on the threading in RxJava, now let’s move on to Kotlin Flow. +以上就基本上是 RxJava 中的线程的全部内容,让我们现在走进 Kotlin 流。 -## Kotlin Flow +## Kotlin 流 -Basic usage with Kotlin Flow is the following: +Kotlin 流的最基础使用方法如下所示: -![](https://cdn-images-1.medium.com/max/2000/1*i95ne1PCxci1yCcizF14cQ.png) +```kotlin +CoroutineScope(Job() + Dispatchers.Main).launch { + observeSomething() + .flowOn(Dispatchers.IO) + .collect { result -> println(result) } +} +``` -> And here we immediately have many concepts which are related to coroutines, which might be needed to explain. We’ll not dive deep into explaining coroutines stuff, article is about Kotlin Flow, so it might be a good idea to read the documentation on the coroutines first if you are not familiar with them. +> 现在,我们有了许多与协程相关的概念,可能需要对其进行解释。我们不会深入介绍协程这个功能或是 Kotlin Flow,因此,如果您不熟悉协程,最好先阅读有关协程的文档。 -This example is identical (to some extent) to the example we’ve used in RxJava part: we again observe some changes on io and then print result on main, though the structure is different. Let’s find out the difference and how this works. +该示例在某种程度上与 RxJava 部分中使用的示例相同:我们观察了 `io` 的一些变化,然后在 `main` 上打印结果,尽管代码有所不同。让我们找出区别以及它是如何工作的。 -First thing which we should note is that flow can be collected only inside some coroutine scope (because `collect` method is a `suspend` function). Because of that we’ve created scope and in that scope `launch`’ed new coroutine. In that launched coroutines we now can collect the flow. +首先要注意的是,只能在某些协程范围内收集流(因为 `collect` 方法需要在 `suspend` 函数下执行)。因此,我们创建了合并范围,并在该范围内“启动”了新的协程。在启动的协程中,我们现在可以收集流。 -One important thing about Kotlin flow and collect function is a feature called **context preservation**. That means that we don’t need to declare on which Dispatcher to collect the data — that dispatcher always will be same as in the scope in which we’re collecting data from our flow. +关于 Kotlin 流和收集功能的重要一件事是称为上下文保留的功能。这意味着我们无需声明要在哪个调度器上收集数据,该调度程序始终与我们从流中收集数据的范围相同。 -So if we want to collect in Main, then we need to call `collect` function in the coroutine with `Dispatchers.Main` in the context. +因此,如果要在 Main 中进行收集,则需要在协程中使用 `Dispatchers.Main` 来调用 `collect` 函数。 #### flowOn -This is an operator which changes the context (dispatcher particularly) on which flow is working. +这是一个运算符,用于更改工作流所处的上下文(尤其是线程调度)。 ![](https://cdn-images-1.medium.com/max/2000/1*QOMRfQTktM17z2xHUYmcrQ.png) -So in our example above, by writing `flowOn(Dispatchers.IO)` we say that we want everything **before** it run on the IO. +因此,在上面的示例中,通过编写 `flowOn(Dispatchers.IO)`,我们告诉了程序我们希望在输入输出上运行执行所有代码。 -If we add some computation (inside `map`) as we’ve done before with RxJava we’ll have the following result: +如果像以前的 RxJava 一样在 `map` 内部添加一些计算,我们将得到以下结果: ![](https://cdn-images-1.medium.com/max/2000/1*zpbvxCRXjGLSEuFlnrWarg.png) -We’ll see that basically we can change where our operators should work by declaring `flowOn` after them. +我们将看到,我们基本上可以通过在运算符之后声明 `flowOn` 来更改运算符的工作位置。 #### launchIn -One important thing about `collect` function is that it is suspending. That means that when we call `collect` execution is suspended until flow is finished. +关于 `collect` 函数的一件重要事情是它带有 `suspend` 声明的。这意味着当我们调用 `collect` 函数的时候,执行会被暂停直到流的完成。 -So if you put inside same coroutine two `collect` functions, then first one will effectively block second from execution: +因此,如果在同一个协程中放入两个 `collect` 函数,那么第一个将有效地阻止第二个执行: ![](https://cdn-images-1.medium.com/max/2000/1*xopZFayVenZK03PQ9RoZ0Q.png) -Here we’ll see result printed, but “second $result” not, because first `collect` function will suspend and not allow second collect to happen. +在这里我们能够看到打印了返回值出来,但并没有显示第二次的返回值,因为第一个 `collect` 函数会被暂停并且不允许进行第二次 `collect` 函数的执行。 -To fix that we need to launch each flow in a separate coroutine: +要想解决这个问题我们需要在不同的协程中启动各自流。 ![](https://cdn-images-1.medium.com/max/2000/1*511-boC1pDMN9gLmK9ySKg.png) -But it doesn’t look pretty and to make it look a bit better (without additional nested level) we can use `launchIn` extension function (which is just syntactic sugar over that wrapped launch) with `onEach`: +但是这个代码看起来并不漂亮,并且要使其看起来更好一点(没有附加的嵌套级别),我们可以将 `launchIn` 扩展功能(只是个在包装的启动中的语法糖)与 `onEach` 结合使用: ![](https://cdn-images-1.medium.com/max/2000/1*atQKeG0bwwMfjBD7nBIZHg.png) -This way we create code which looks more similar to us (who wrote on RxJava before), because subscription in RxJava usually not blocking (unless some `blockingXXX` method is used), so seems `launchIn` should be primary option for similar use cases. +这样,我们创建的代码看起来更像我们之前在 RxJava上写过的那样,因为 RxJava 中的订阅通常不会被阻塞(除非使用了某些 `blockingxxx` 函数)。因此对于类似的情景,似乎 `launchin` 应该是我们的首选。 #### flowOf -With `flowOf` we have similar situation as with `Observable.just`. If you put some calculation (suspending) then it will be done in the outer scope and not affected by `flowOn`: +为 `flowOf` 假设的情景与 `Observable.just` 类似:我们现在需要进行一些计算(挂起的),那么它将在外部范围内完成,不受 `flowOn` 的影响: ![](https://cdn-images-1.medium.com/max/2000/1*jTX93fFjuwjxR33NLaSkmA.png) -If run inside context with `Dispatchers.Main`, then `calculate()` will be run on main and not on io. +如果在带有 `Dispatchers.Main` 的上下文中运行,那么 `calculate` 将在主线程上完成而不是输入输出上运行。 -To fix that you can use `flow` builder and explicitly emit value inside: +为了解决这个问题,您可以使用 `flow` 构建器并在其中明确定义内容: ![](https://cdn-images-1.medium.com/max/2000/1*JSkHKLjh9X-YDL1Olkl5hQ.png) -Then calculation will be done on IO thread. +然后将在输入输出线程上进行计算。 -#### flatMapMerge concurrency and parallelism +#### FlatMapMerge 并发和并行 -To find out how Kotlin Flow works with flatMapMerge (analog of RxJava `flatMap`) we’ll use few test examples: +为了了解 Kotlin Flow 如何与 `flatMapMerge`(对 RxJava 中 `flatMap` 的模拟)一起使用,我们将使用一些测试示例: ![](https://cdn-images-1.medium.com/max/2000/1*_G4_NwfgY1wmO7aVoamsrw.png) -Here we have flow which is collected on `d1` dispatcher. The flow has two items, which are flat mapped onto two other items each. And we have single `flowOn` on the `d2` dispatcher. -In the code we’ve added `onEach` call with information on the thread on which execution happens. +在这里,我们有在 `d1` 调度器上收集的流。这个流有两个项目,每个项目都平面映射到其他两个项目上。而且我们在 `d2` 调度器上有一个 `flowOn` 定义。 + +在代码中,我们添加了 `onEach` 的调用,用于输出执行的线程的信息。 -In this example the output would be: +在此示例中,输出为: ``` inner: pool-2-thread-2 @coroutine#4 @@ -185,9 +264,9 @@ collect: pool-1-thread-2 @coroutine#2 collect: pool-1-thread-2 @coroutine#2 ``` -So, we see that unlike RxJava even when we’ve put `flowOn` outside (below) the inner `flatMapMerge`,`flowOn` anyway affected the inner code by running it in parallel on multiple threads. +因此,我们发现与 RxJava 不同的是,即使我们将 `flowOn` 放在其中的 `flatMapMerge` 之外(之后),`flowOn` 也会通过在多个线程上并行运行来影响其中的代码执行。 -If we put `flowOn` inside `flatMapMerge:` +那么我们将 `flowOn` 放在 `flatMapMerge` 中: ![](https://cdn-images-1.medium.com/max/2000/1*sv6HwmwwsOufpc-00wZpCQ.png) @@ -204,9 +283,9 @@ collect: pool-1-thread-3 @coroutine#2 collect: pool-1-thread-3 @coroutine#2 ``` -Again each inner flow runs on its own thread from second pool. Therefore there seems no difference where we put `flowOn`. +同样,每个内部流都在其自己的线程的第二个池在上运行。因此,在哪里定义 `flowOn` 似乎没有什么区别。 -But there is a difference and let’s see what it is by adding `onEach` below first `flowOf` call: +但是其实有一个区别 —— 让我们通过在第一个 `flowOf` 调用下面添加 `onEach` 来看看它是什么: ![](https://cdn-images-1.medium.com/max/2000/1*isZ3b5z8Jg7f-V9tOqdFlw.png) @@ -272,35 +351,37 @@ In Kotlin Flow for threading **Dispatchers** are used ****(most common IO, Defau In RxJava we declare on which scheduler chain should be **subscribed (started)** using **subscribeOn**, and where it should **proceed** using **observeOn**. -In Kotlin Flow we declare on which context (dispatcher) chain should be **collected (ended)** using scope in which flow is collected, and where it works **before** that using **flowOn**. +在 Kotlin Flow 中,我们使用收集流的 Scope 定义了在使用在哪个上下文(调度器)上收集完这个链,以及在 `flowOn` 之前它的执行的地方。 + +这就像是个倒过来的方法。在 RxJava 中,我们在下面声明启动和修改链。 -So it is like reversed approaches. In RxJava we declare start and modify chain below. -In Kotlin Flow we have end declared and can modify chain above. +而在 Kotlin Flow 中,我们声明用于在上面修改链。 -#### Migration Example +#### 迁移例子 -Consider we have some complex RxJava chain we’d like to migrate to Kotlin Flow keeping the threading logic as before. From above we already understand that we basically need flip upside-down our mental model and do not forget to test. +考虑一下,我们有一些复杂的 RxJava 链,我们希望迁移到 Kotlin Flow,并保持线程逻辑不变。从上文中我们已经了解到,基本上遵循上下颠倒的思维方式就可以了。当然,我们完成代码迁移后,还要记得进行测试。 -Also we should already keep in mind that non-blocking threading in RxJava and suspending with thread reusing between coroutines are different approaches and we won’t be able to have exact one-to-one relation. Though we can put some constraints, like we want to keep parallelism where we had it and have same blocks of code run on same thread pools. +同样,我们应该已经记住: RxJava 中的非阻塞线程和在协程之间重用线程并挂起是不同的方法,而且两者是无法建立精确的一对一关系的。尽管我们可以施加一些约束,但是我们希望将并行性保持在我们拥有并行性的位置,并在相同的线程池上运行相同的代码块。 -To make our test example as correct as possible we’ll use java executors under the hood of the Scheduler and Dispatcher. -We’ll create a number of them for Rx: +为了使我们的测试示例尽可能正确,我们将在调度器之间使用 Java 执行程序。 + +RxJava 的部分: ![](https://cdn-images-1.medium.com/max/2000/1*24TAclWSQTvfOlYIw9I65w.png) -And for Kotlin Flow: +Kotlin Flow 的那一部分: ![](https://cdn-images-1.medium.com/max/2000/1*6WmwuDO_EMLlyHLDycCu8A.png) -We’ll have 4 pools with 3 threads and main executor with only one thread. +我们将拥有 4 个包含 3 个线程的池,而主要执行者只有 1 个线程。 -Our RxJava example will look like the following: +我们的 RxJava 的示例将如下所示: ![](https://cdn-images-1.medium.com/max/2000/1*nYL6iK4SOlMEh9YMOsSbKQ.png) -Here we have stream of three items, which is started on s1, then we switch execution to s2. Inside flatMap we have inner observable with its own subscribe (allowing parallelism) and also some thread switching. Then after flat mapping we do some work and print result in main thread. +在这里,我们有三个项目流,它们从 s1 开始,然后将执行切换到 s2。在 `flatMap` 内部,我们设下了可观察的订阅(允许并行)以及一些线程切换。然后,在平面映射之后,我们进行了一些代码的执行并在主线程中打印结果。 -After we run the program we’ll see such an output: +运行程序后,我们将看到以下输出: ``` 1: pool-1-thread-1 @@ -347,20 +428,19 @@ end: pool-6-thread-1 end: pool-6-thread-1 ``` -It is pretty long, but should match our assumptions written before. -Let’s visualize this: +它很长,但是应该符合我们之前写的假设。 + +让我们直观化显示一下数据: ![](https://cdn-images-1.medium.com/max/2000/1*VNsQnjyftFkMvtcPD8x_rQ.png) -So here we see exactly what we’ve described above. The main trick is that “3” is run on the same scheduler as “inner 2”. -We had two starting points (original and inner), where we put the subscribeOn allowing paralleling inside inner. And then moved below the chain adding where necessary observeOn. +在这里,我们可以精确地看到上面所述的内容。其中需要重点关注的内容是,`3` 与 `inner 2` 在同一调度器上运行。我们有两个起点(初始起点和内部起点),在这里我们将 `subscribeOn` 允许在内部并行。然后移动到链的下方,并在必要时添加了 `observeOn` 的定义。 -Now we’ll switch to the Kotlin Flow version: +现在,我们切换到 Kotlin Flow 的版本: ![](https://cdn-images-1.medium.com/max/2000/1*qIELqmv38MzyvsUml8QUYw.png) -From the very beginning we fix the main thread as being our end thread. Then we start from the bottom and add `flowOn` where needed. First we add d4 and note that “inner 2” should also run on it. Then we switch to d3 and so on up to the very top of the chain. -And here is the result: +从一开始,我们就将主线程固定为结束的线程。然后我们从最下面开始看起,我们在需要的地方添加 `flowOn`。首先,我们添加了 `d4` 并注意到 `inner 2` 也应在其上运行。然后,我们切换到 `d3`,依此类推,直到链的最顶端。结果如下: ``` 1: pool-1-thread-1 @coroutine#6 @@ -407,9 +487,9 @@ inner 2: pool-4-thread-1 @coroutine#7 end: pool-5-thread-1 @coroutine#2 ``` -Besides logs look differently (because RxJava is not the same as coroutines) we still can see that all the logic still applies and we haven’t broken parallel execution. +除了日志的外观不同(因为 RxJava 与协程不同),我们仍然可以看到所有逻辑仍然适用,并且我们没有破坏并行执行。 -Though we still can see some differences. For example our code which runs “3” in RxJava example was running on: +虽然我们仍然可以看到一些差异。例如,我们在RxJava示例中运行“ 3”的代码在以下位置运行: ``` 3: pool-5-thread-1 @@ -417,27 +497,27 @@ Though we still can see some differences. For example our code which runs “3 3: pool-5-thread-3 ``` -And in coroutines example it was always run on one thread: +在协程示例中,它总是在一个线程上运行: ``` 3: pool-4-thread-1 @coroutine#3 ``` -This could be just a coincidence because of concurrency, or maybe it is because of the coroutines better utilizing threads usage (or maybe not, actually I don’t know, so if somebody has some other ideas do not hesitate to post a response). Though we don’t care that much because usage of thread pool was anyway correct. +这可能是由于并发而引起的巧合,也可能是因为协程程序更好地利用了线程的使用(或者可能不是,实际上我不知道,所以如果有人有其他想法,可以毫不犹豫地发布响应)。尽管我们不太在乎,因为无论如何线程池的使用都是正确的。 -If we visualize threading, we can do something like: +如果我们可视化线程,则可以执行以下操作: ![](https://cdn-images-1.medium.com/max/2000/1*z6XDLAkVgMLmyeYsvSTbbA.png) -## Conclusion +## 小结 -Kotlin Flow is good and can be compared to RxJava Observable. They have similar look, similar operators and they both handle threading inside their chains. They have similar tricks with usage of `Observable.just` or `flowOf`. But in terms of concurrency and parallelism seems Kotlin Flow is simpler. Also Kotlin Flow has no such an issue as RxJava with `subscribeOn`, as in flow we declare end of the chain with the scope and it is technically impossible to put multiple of them. +Kotlin Flow 真的不错,可以跟 RxJava Observable 相媲美。它们的使用方法和支持的运算符都相似,相似的运算符,并且都在其链内处理线程。他们在使用 `Observable.just` 或 `flowOf` 时也有类似的技巧。但是就并发性和并行性而言,Kotlin Flow 似乎更简洁。同样,Kotlin Flow 也没有诸如带有 `subscribeOn` 的 RxJava 这样的问题,因为在流程中,我们用作用域声明了链的末尾,从技术上讲,不可能将它们放多个。 -On the approaches to handling threading Kotlin Flow and RxJava have opposite concepts: in RxJava we think in terms of top-to-bottom, when in Kotlin Flow from bottom-to-top. But anyway it is possible to migrate your code vice versa if there is need to without breaking much of the functionality. +在处理线程的方式上,Kotlin Flow 和 RxJava 是相反的。在 RxJava 中,我们认为是从上到下,而在 Kotlin Flow 中则是从下到上。但无论如何,如果有必要,在不破坏大部分功能的前提下,将代码进行反向迁移也是可行的。 -Hope you’ve enjoyed this article and it was useful for you. +希望您喜欢这篇文章,但愿它对您有所帮助 -Happy coding! +祝你编程快乐! > 如果发现译文存在错误或其他需要改进的地方,欢迎到 [掘金翻译计划](https://github.com/xitu/gold-miner) 对译文进行修改并 PR,也可获得相应奖励积分。文章开头的 **本文永久链接** 即为本文在 GitHub 上的 MarkDown 链接。