-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRetryOperator.java
154 lines (140 loc) · 6.31 KB
/
RetryOperator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package rxjava.error;
import common.CommonIntegerObserver;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiPredicate;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
/**
* Author: andy.xwt
* Date: 2019/2/11 10:54
* Description:
* retry():出现错误时,让被观察者重新发送数据,如果一直错误将一直重新发送
* retry(long times):出现错误时,则让被观察者重新发送相应次数的事件,times重试次数,需要注意的是当重试相应次数后,任然报错,那么相应观察者需要处理错误
* retry(Predicate<? super Throwable> predicate):出现错误时,判断是否需要重新发送数据,
* retry(BiPredicate<? super Integer, ? super Throwable> predicate):第一个参数为重试的次数,基本与上一个方法一样。
* retry(long times, Predicate<? super Throwable> predicate),带参数的重试
*
* @see <a href="https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Retry.html"/>
*/
class RetryOperator {
/**
* retry()出现错误时,让被观察者重新发送数据,如果一直错误将一直重新发送
*/
static void testRetry() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("错误了"));
}
}).retry().subscribe(new CommonIntegerObserver());
}
/**
* retry(long times),出现错误时,则让被观察者重新发送相应次数的事件,times重试次数
*/
static void testRetryTimes() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("错误了"));
}
}).retry(2).subscribe(new CommonIntegerObserver());
}
/**
* retry(Predicate<? super Throwable> predicate),出现错误时,判断是否需要重新发送数据,
*/
static void testRetryPredicate() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("错误了"));
}
}).retry(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
return throwable.getMessage().equals("错误了");
}
}).subscribe(new CommonIntegerObserver());
}
/**
* retry(BiPredicate<? super Integer, ? super Throwable> predicate),第一个参数为重试的次数
*/
static void testRetryBiPredicate() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("错误了"));
}
}).retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
System.out.println("重试次数" + integer);
return throwable.getMessage().equals("错误了");
}
}).subscribe(new CommonIntegerObserver());
}
/**
* retry(long times, Predicate<? super Throwable> predicate),带参数的重试
*/
static void testRetryTimesAndPredicate() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("错误了"));
}
}).retry(3, new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
return throwable.getMessage().equals("错误了");
}
}).subscribe(new CommonIntegerObserver());
}
/**
* retryWhen()遇到错误时,将发生的错误传递给一个新的被观察者,并决定是否需要重新订阅元素被观察者发送事件
*/
static void testRetryWhen() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("错误了"));
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
// // 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件
// // 该异常错误信息可在观察者中的onError()中获得
// return Observable.error(new Throwable("retryWhen结束"));
//
// //若返回的Observable发送的事件 = Next事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试
return Observable.just(3);
}
});
}
}).subscribe(new CommonIntegerObserver());
}
public static void main(String[] args) {
// testRetry();
// testRetryTimes();
// testRetryPredicate();
// testRetryBiPredicate();
// testRetryTimesAndPredicate();
testRetryWhen();
}
}