-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathJoinOperator.java
51 lines (45 loc) · 1.64 KB
/
JoinOperator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package rxjava.combining;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import util.ThreadUtil;
/**
* Author: andy.xwt
* Date: 2019-02-08 14:53
* Description:
*/
public class JoinOperator {
static void test() {
Observable<Long> observable1 = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS);
Observable<Long> observable2 = Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS);
observable1.join(observable2, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long integer) throws Exception {
return Observable.timer(1, TimeUnit.SECONDS);
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long integer) throws Exception {
return Observable.timer(0, TimeUnit.SECONDS);
}
}, new BiFunction<Long, Long, String>() {
@Override
public String apply(Long integer, Long integer2) throws Exception {
return integer + "---result---" + integer2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
public static void main(String[] args) {
test();
ThreadUtil.sleep();
;//这里保证虚拟机不停止运行,
}
}