在开发一个rest应用的时候,一种很常见的情形是,要对服务器进行轮询和重试。当服务器正在进行某些任务的时候,我们需要(以一定的延时)查询服务器这个任务是否完成了,同时,当我们出错的时候,有时我们要进行重试。当我在想着如何利用RxJava来正确地实现服务器轮询的时候,我就想写一篇关于这个话题的文章了。最终,我在这个StackOverflow问题中找到了一个很好的解决方案。
在这篇文章中,我会解释用RxJava和Retrofit来实现这个功能是多么的容易。我假定你已经了解了RxJava,Retrofit的使用并且已经能够利用这些库来实现相应的应用架构。
在文章中我将用到的一些定义:
“Predicate” 一个被传到Observable的一些方法中的类,举例来说:Observable.filter(/在这里传进来predicate/), Observable.takeUntil(/在这里传进来predicate/)
Observable
.filter(/*predicate here*/)
.takeUntil(/*predicate here*/)
.subscribe(/*subscriber here/)
takeUntil()返回的Observable是filter()返回的Observable的子元素(child)。作为参数传递给subscribe()的Subscriber是takeUntil()返回的Observable的子元素(child)。
所谓服务器轮询,也就是,当你需要等待服务器去完成某项任务时,你就要周期性地调用API接口来查询该项任务是否已经完成。
示例代码如下
/**
* 这个类用来映射(map)从服务器返回的json数据
*
*/
class ServerPollingResponse {
boolean isJobDone;
@Override
public String toString() {
return "isJobDone=" + isJobDone;
}
}
Subscription checkJobSubscription = mDataManager.pollServer(inputData)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.v(TAG, "repeatWhen, call");
/**
* 这个方法只会被调用一次。
* 5 表示每次重复的调用(repeated call)会被延迟5s。
*/
return observable.delay(5, TimeUnit.SECONDS);
}
})
.takeUntil(new Func1<ServerPollingResponse, Boolean>() {
@Override
public Boolean call(ServerPollingResponse response) {
/** 在这里,我们可以检查服务器返回的数据是否正确,和决定我们是否应该
* 停止轮询。
* 当服务器的任务完成时,我们停止轮询。
* 换句话说,“当任务(job)完成时,我们不拿(take)了”
*/
Log.v(TAG, "takeUntil, call response " + response);
return response.isJobDone;
}
})
.filter(new Func1<ServerPollingResponse, Boolean>() {
@Override
public Boolean call(ServerPollingResponse response) {
/**
* 如果我们在这里返回“false”的话,那这个结果会被过滤掉(filter)
* 过滤(Filtering) 表示 onNext() 不会被调用.
* 但是 onComplete() 仍然会被传递.
*/
Log.v(TAG, "filter, call response " + response);
return response.isJobDone;
}
})
.subscribe(
new Subscriber<ServerPollingResponse>() {
@Override
public void onCompleted() {
Log.v(TAG, "onCompleted ");
}
@Override
public void onError(Throwable e) {
Log.v(TAG, "onError ");
}
@Override
public void onNext(ServerPollingResponse response) {
Log.v(TAG, "onNext response " + response);
//服务器轮询停止了,你可以做些其他事情。
}
}
);
代码看起来很多,但是很容易理解,并且利用了优雅的链式操作符。
假如服务器在三次请求后才返回 “isJobDone=true” , log打印如下:
repeatWhen, call
//在这里发起了api请求
filter, call response isJobDone=false
takeUntil, call response isJobDone=false
//在这里再次发起了api请求
filter, call response isJobDone=false
takeUntil, call response isJobDone=false
//在这里,第三次发起api请求
filter, call response isJobDone=true
onNext response isJobDone=true
takeUntil, call response isJobDone=true
onCompleted
在下一个部分中,我将解释为什么方法会那样被调用
发起http请求后,在所有call()方法中, filter()的predicate中的call()方法会第一个被调用。
如果我们在filter() 中返回“false”,表明我们对结果(result)不满意,不要把这个结果传给 Subscriber
我们返回“false”,表明这个结果(result)不会被传递到 filter()的子元素(child)Subscriber
对onNext()的调用会被传递到 takeUntil()的Observable中,然后它的predicate的call() 方法会被调用
我们看到“job 还没有被完成”,所以我们在takeUntil()的call()方法中返回“false”
这表示repeatWhen()的onNext()和onComplete()会被调用
如果我们在filter()中返回“true”,表明我们对这个结果(result)是满意的,链式事件如下:
结果(result)被传递到了filter的子元素(child)Subscriber
然后这个结果(result)被传递到了takeUntil()的predicate的call()方法中
在 takeUntil() 中,我们也返回 “true”,因为“任务完成了(job done)”
因为我们返回了“true”,takeUntil()操作符会调用它的子元素(child)filter()的onComplete() .
filter() 调用它的子元素Subscriber
整个链是由takeUntil()操作符调用内部的unsubscribe()来终止的。
基本原理是一样的。我们只要向repeatWhen()的predicate中增加一些链式方法(chaning method)就行了。
private static final int COUNTER_START = 1;
private static final int ATTEMPTS = 5;
private static final int ORIGINAL_DELAY_IN_SECONDS = 10;
// 这是链接在repeatWhen的predicate的call方法中的新的function
repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call (Observable < ?extends Void > observable){
Log.v(TAG, "repeatWhen, call");
return observable.zipWith(Observable.range(COUNTER_START, ATTEMPTS), new Func2<Void, Integer, Integer>() {
@Override
public Integer call(Void aVoid, Integer attempt) {
Log.v(TAG, "zipWith, call, attempt " + attempt);
return attempt;
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer repeatAttempt) {
Log.v(TAG, "flatMap, call, repeatAttempt " + repeatAttempt);
// 增加等待时间
return Observable.timer(repeatAttempt * ORIGINAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
}
});
}
})
在这里,每发起一次api请求前的延时时间是随着尝试次数的增长而乘法式地增加的。非常简单高效。
打印信息如下
repeatWhen, call
//这里是我们的第一次API请求
filter, call response isJobDone=false
takeUntil, response isJobDone=false
zipWith, call, attempt 1
flatMap, call, repeatAttempt 1
//等待10s后发起第二次请求
filter, call response isJobDone=false
takeUntil, response isJobDone=false
zipWith, call, attempt 2
flatMap, call, repeatAttempt 2
//等待20s后发起第三次请求
filter, call response isJobDone=true
onNext response isJobDone=true
takeUntil, response isJobDone=true
onCompleted
解释如下
我会略过前面讲过的东西,直接解释一下与zipWith() 和 flatMap()相关的东西。
当takeUntil() 完成它的工作以后,从repeatWhen() 返回的Observable会开始工作。这个Observable是zipWith() 和 flatMap()一起作用的结果(combined result)。
zipWith(parameter1, parameter2)会拿到repeatWhen()里的Observable所发射的值,也就是Void aVoid,还会拿到由它的第一个参数,也就是Observable.range(COUNTER_START, ATTEMPTS) 所发射的值,然后将这两个值传递给函数(function) call(Void, Integer)。在call() 方法中,我们可以利用这两个参数做一些操作,然后返回一个值(虽然在我们的例子中,是一个Integer,但是它也可以是其它任何类型,如果我们想返回其它类型的话,只需要改一下new Func2 < Void, Integer, / 改这个 / Integer>) )中的第三个泛型类型就行了),但是在这里,我们只要返回我们从Observable中获取的值就行了,这个值就是重复尝试的次数。
zipWith()中返回的值会被封装到一个发射(emit)这些值的Observable中。然后我们在 flatMap()中处理这些值。
我们可以忽略掉zipWith()而直接在repeatWhen()中使用 flatMap()
repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.v(TAG, "repeatWhen, call");
return observable.flatMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
if(mCounter > ATTEMPTS){
// 由我们自己终止
throw new RuntimeException();
}
return Observable.timer(mCounter++ * ORIGINAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
}
});
}
})
一般这种情况,我们需要用一个计数器,自己去控制请求(attempt),并且在需要时终止这一系列操作( terminate the sequence )。而在这里,我们利用了zipWith()操作符,让RxJava帮我们做了这一切。
众所周知,在Retrofit1 中每个网络错误都是交由onError()方法处理的
为了在失去网络连接或者当返回的http状态是除了200 OK以外的不正常状态时实现重试,我们需要使用 retryWhen() 而不是repeatWhen()。同时,zipWith() 的参数也要做一点变化。
retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
Log.v(TAG, "retryWhen, call");
return observable.zipWith(Observable.range(COUNTER_START, ATTEMPTS), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attempt) {
Log.v(TAG, "zipWith, call, attempt " + attempt);
return attempt;
}
})
repeatWhen() 和 retryWhen()的主要区别就是repeatWhen()会在接收到 onNext()时重新subscribe,retryWhen()则在接收到 onError()时重新subscribe。
下面是相关的代码
retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
Log.v(TAG, "retryWhen, call");
return observable.compose(zipWithFlatMap());
}
}).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.v(TAG, "repeatWhen, call");
return observable.compose(zipWithFlatMap());
}
})
<T> Observable.Transformer<T, Long> zipWithFlatMap() {
return new Observable.Transformer<T, Long>() {
@Override
public Observable<Long> call(Observable<T> observable) {
return observable.zipWith(Observable.range(COUNTER_START, ATTEMPTS), new Func2<T, Integer, Integer>() {
@Override
public Integer call(T t, Integer repeatAttempt) {
Log.v(TAG, "zipWith, call, repeatAttempt " + repeatAttempt);
return repeatAttempt;
}
}).flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer repeatAttempt) {
Log.v(TAG, "flatMap, call, repeatAttempt " + repeatAttempt);
//增加等待时间
return Observable.timer(repeatAttempt * 5, TimeUnit.SECONDS);
}
});
}
};
}
你可能注意到了,我将zipWith()和flatMap() 封装到了单独的方法中,并且利用compose让它可以在repeatWhen()和retryWhen()里能够重复使用。现在,如果我们请求失败了,我们会去重试(retry),如果成功了,但是“任务(job)还没完成”,我们会重复一遍(repeat)。
RxJavas实在是太好用了!
为它喝彩吧 :)
Copyright© 2013-2019