RxJava近来在Android领域非常火爆, 那么RxJava到底是什么呢? 这里不会过多的解释, 请参考文章 给 Android 开发者的 RxJava 详解 与 深入浅出RxJava系列 .
RxJava在CoreLibs中有何应用? 最主要的就是与Retrofit2结合, 大大简化了调用网络接口的代码.
注意: 下列功能均需要新建一个实体类, 如BaseData, 并实现ResponseHandler.IBaseData接口.
RxJava可以与Retrofit2无缝相连, 只需要在Retrofit.Builder
的addCallAdapterFactory()
方法中传入RxJavaCallAdapterFactory.create()
即可. RetrofitFactory类中已经做了相应的处理.
接口声明方式则改为如下:
@POST(Urls.GET_ADS)
Observable<BaseData> getAds();
这样就可以直接对getAds方法返回的Observable对象做操作.
以下内容均是建立在了解RxJava的Observable, Subscribe, Subscriber等概念, 并熟悉写法的基础上.
1. 网络请求异常处理
在RxJava中, 一个事件序列中如果出现异常就会回调onError, 我们可以利用这个特性来做统一的异常处理. 如果出现非业务方面的异常, 如网络连接失败, 数据解析失败, 服务器异常等都会进入Subscriber的onError(Throwable e), 通过判断Throwable的类型可以确定异常的具体原因并提示用户:
@Override
public void onError(Throwable e) {
if (e instanceof ConnectException) {
// 网络连接异常
} else if (e instanceof HttpException) {
// 服务器异常
} else if (e instanceof SocketTimeoutException) {
// 连接超时
} else {
// 其他异常, 如GSON解析错误等
}
}
异常种类正在不断完善中, 肯定是不止这几种, 但是这是目前最常见的几种异常. 我们可以新建一个类继承自RxJava的Subscriber, 然后在订阅网络请求返回的Observable时使用此类, 而不是默认的Subscriber:
public abstract class ResponseSubscriber<T> extends Subscriber<T> {
private BaseView view;
public ResponseSubscriber() {}
public ResponseSubscriber(BaseView view) {
this.view = view;
}
@Override
public void onCompleted() {
view = null;
}
@Override
public void onError(Throwable e) {
if (view != null) {
if (e instanceof ConnectException) {
view.showToastMessage(view.getViewContext().getString(R.string.network_error));
} else if (e instanceof HttpException) {
view.showToastMessage(view.getViewContext().getString(R.string.network_server_error));
} else if (e instanceof SocketTimeoutException) {
view.showToastMessage(view.getViewContext().getString(R.string.network_timeout));
} else {
view.showToastMessage(view.getViewContext().getString(R.string.network_other));
}
}
view = null;
}
@Override
public void onNext(T t) {
view = null;
}
}
我们可以利用抽象出来的BaseView自动提示用户, 而无需每一个请求都做判断. 需要注意的是, 在onNext, onError, onCompleted中需要将通过构造函数传入的BaseView置空. ResponseSubscriber已经实现了onNext, onError, onComplete这样抽象方法, 因此在其实例中可以没有任何实现方法, 也可以选择性的覆写, 如:
api.getTypes()
.subscribe(new ResponseSubscriber() {
@Override
public void onNext(BaseData data) {
super.onNext(data);
}
});
2. 预处理请求结果
进一步的, 我们可以定义自己的方法, 让实例选择覆写, 给予更大的灵活性:
/**
* 请求成功同时业务成功的情况下会调用此函数
*/
public abstract void success(T t);
/**
* 请求成功但业务失败的情况下会调用此函数.
*/
public boolean operationError(T t, int status, String message) {}
/**
* 请求失败的情况下会调用此函数
*/
public boolean error(Throwable e) {}
由于我们Server端返回的JSON都有固定的格式, 因此所有的返回结果都会以BaseData实体来接收:
public class BaseData {
public int status; // 操作结果, 1为成功, 其他为失败
public String msg; // 返回的消息
public MapData data; // 携带的数据
public Page page; // 分页数据
}
只要status不为1, 就意味着业务失败, 因此抽象出了operationError函数. 当status不为1的情况下, 会调用operationError. 一般情况下, 业务失败我们都需要将服务器返回的消息展示给用户, 我们可以同样的, 将此逻辑写在ResponseSubscriber中.
public void onNext(T t) {
resetLoadingStatus();
BaseData data;
if (t instanceof BaseData) {
data = (BaseData) t;
if (data.status == SUCCESS_STATUS) {
success(t);
} else {
if (!operationError(t, data.status, data.msg)) {
handleOperationError(data.msg);
}
}
} else {
success(t);
}
release();
}
public void handleOperationError(String message) {
if (view != null)
view.showToastMessage(message);
}
public void resetLoadingStatus() {
if (view != null)
view.hideLoading();
}
public void release() {
view = null;
}
不论业务是否成功, 都是请求成功, 因此需要在onNext书写判断逻辑. 上述代码很简单, 主要流程是首先重置加载框状态, 开发人员就无需每次都去隐藏加载框. 然后判断返回结果是不是BaseData, 如果不是则直接调用success, 让开发人员自行处理. 如果是BaseData则判断status是否是1, 是1就调用success, 不是1就调用operationError(t, data.status, data.msg), 根据返回结果判断是否调用handleOperationError.
onError中的逻辑类似:
public void onError(Throwable e) {
resetLoadingStatus();
e.printStackTrace();
if (!handler.error(e)) {
handleException(e);
}
release();
}
接下来再重新看看自定义的三个函数:
/**
* 请求成功同时业务成功的情况下会调用此函数
*/
void success(T t);
/**
* 请求成功但业务失败的情况下会调用此函数.
* @return 是否需要自行处理业务错误.
* true - 需要, 父类不会处理错误
* false - 不需要, 交由父类处理
*/
boolean operationError(T t, int status, String message);
/**
* 请求失败的情况下会调用此函数
* @return 是否需要自行处理系统错误.
* true - 需要, 父类不会处理错误
* false - 不需要, 交由父类处理
*/
boolean error(Throwable e);
总结一下, 使用ResponseSubscriber去订阅网络请求结果时, 可以选择不传入BaseView, 这样所有判断逻辑都需要自行实现. 如果传入BaseView, 默认情况会实现所有逻辑. success方法必须覆写, 可以选择覆写operationError与error, 如果覆写返回true, 则意味已经自行处理逻辑, ResponseSubscriber不会再去处理, 反之则会处理. 一般情况下如下写法就够了:
api.getTypes()
.subscribe(new ResponseSubscriber<BaseData>(view) {
@Override
public void success(BaseData baseData) {
if (baseData.data != null && baseData.data.types != null)
view.renderTypes(baseData.data.types); // view在BasePresenter中声明并实例化
}
});
以上就是预处理请求结果.
3. 自动取消网络请求
我们都知道要在Activity/Fragment的onDestory中取消正在连接的网络请求, 避免内存泄漏或其他风险, 提高体验. 那么在Retrofit2+RxJava中怎么取消请求呢? 如果是这么定义的请求:
@GET("users/{user}/repos")
Call<List<Repo>> listRepos(@Path("user") String user);
则可以使用call.cancel()来取消请求. 如果使用RxJava呢? 可以使用Subscription的unsubscribe方法:
Subscription subscription = api.getAds().subscribe(...);
subscription.unsubscribe();
subscribe Observable会返回一个Subscription对象, 调用subscription.unsubscribe()会取消订阅并回调onComplete方法. 结合Retrofit2则会同时取消网络请求.
如果一个界面需要发送很多网络请求, 则要定义很多个Subscription对象, 这时可以使用CompositeSubscription.
CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(api.getAds().subscribe(...));
compositeSubscription.add(api.getTypes().subscribe(...));
compositeSubscription.unsubscribe();
可以将compositeSubscription看作是一个Subscription的集合, 能同时unsubscribe多个Subscription. 但是这么做又显得不是很优雅, 破坏了RxJava的链式结构, 逼死强迫症.
RxAndroid
RxAndroid还是Jake Wharton大神写的针对Android平台的RxJava扩展, RxAndroid可以很方便的使用AndroidSchedulers.mainThread()
将数据发送到Android的主线程, 也可以替代一些诸如点击回调的事件等等等. 具体用途可以自行百度.
这里我选取了RxAndroid lifecycle来替代Subscription. RxAndroid lifecycle可以将给定的Observable绑定至Activity/Fragment的生命周期. 要使用RxAndroid lifecycle首先需要使Activity/Fragment继承自RxFragmentActivity/RxFragment. 因此CoreLibs中的BaseActivity与BaseFragment均是继承自RxFragmentActivity/RxFragment.
然后就可以使用Observable的compose方法, 以及RxFragmentActivity/RxFragment的bindToLifecycle()/bindUntilEvent(ActivityEvent event).
api.getTypes().compose(bindToLifecycle());
通过上述代码, 如果代码是在onCreate中调用的, 则会在onDestroy中unSubscribe, 如果是在onResume中调用, 则会在onPause中unSubscribe. 同时也可以使用bindUntilEvent(ActivityEvent event)指定具体的函数. 一般情况下, 建议使用bindToLifecycle().
但是, 网络请求在MVP中都是通过Presenter发送的, 而bindToLifecycle()方法又是属于Activity/Fragment的, 也就是说Presenter中没有此方法, 无法绑定. 怎么办呢? 从BaseView入手. BaseView中加入如下函数:
<V> Observable.Transformer<V, V> bind();
并且在Activity/Fragment中实现:
@Override
public <V> Observable.Transformer<V, V> bind() {
return bindToLifecycle();
}
Presenter中就可以通过下列代码将Observable绑定至生命周期:
api.getTypes().compose(view.bind());
接下来在BasePresenter中加入如下方法:
protected <V> Observable.Transformer<V, V> bindToLifeCycle() {
return view.bind();
}
最后演变成:
api.getTypes().compose(bindToLifeCycle());
如果我们想使用subscribeOn(Schedulers.io())以及observeOn(AndroidSchedulers.mainThread())就意味着我们每次都需要多写两行代码. 因此我们可以写一个类, 专门将重复出现的Observable变换代码整合到一起:
/**
* 用于对网络请求的Observable做转换.
* 配合{@link BasePresenter#bindToLifeCycle()}一起使用
* 可以将原始Observable绑定至Activity/Fragment生命周期, 同时声明在IO线程运行, 在main线程接收.
*/
public class ResponseTransformer<T> implements Observable.Transformer<T, T> {
private Observable.Transformer<T, T> transformer;
public ResponseTransformer() {}
public ResponseTransformer(Observable.Transformer<T, T> t) {
transformer = t;
}
@Override
public Observable<T> call(Observable<T> source) {
if (transformer != null)
return transformer.call(source).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
else
return source.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}
ResponseTransformer继承自Transformer, 泛型有点多, 但是在了解了RxJava的Transformer以及Java泛型之后, 这段代码应该不难理解. 使用ResponseTransformer之后的代码如下所示:
api.getAds().compose(new ResponseTransformer<>(this.<BaseData> bindToLifeCycle()));
请注意, 在bindToLifeCycle()前应加上this.<BaseData>
, 不然在compose之后的subscribe方法将无法确定返回类型, 从而识别成Object:
api.getAds().compose(new ResponseTransformer<>(bindToLifeCycle()))
.subscribe(new ResponseSubscriber<Object>(view) {
@Override
public void success(Object data) {
}
});
完整正确的代码如下:
api.getAds().compose(new ResponseTransformer<>(this.<BaseData> bindToLifeCycle()))
.subscribe(new ResponseSubscriber<BaseData>(view) {
@Override
public void success(BaseData baseData) {
if (baseData.data.ads != null)
view.renderAds(baseData.data.ads);
}
});
以上代码自动处理了:
判断业务是否执行成功, 失败则提示服务器返回的消息
4. 对请求结果做各种变换
想象一下, 如果一个页面有两个请求, 一个获取所有的一级分类, 另一个根据第一个一级分类的id去获取二级分类. 一般我们会在第一个网络请求成功后, 去解析数据并发送第二个网络请求. 但是这么写会嵌套, 如果解析代码很多会难以阅读, 这时候我们可以借助Observable的flatMap方法解决这个问题:
final List<Category> categories = new ArrayList<>();
api.getCategories()
.flatMap(new ResponseAction<BaseData, Observable<BaseData>>(view) {
@Override
public Observable<BaseData> onCall(BaseData baseData) {
if (baseData.data != null && baseData.data.categories != null) {
categories.addAll(baseData.data.categories);
return api.getSubAttractions(baseData.data.categories.get(0).id);
}
return null;
}
}).compose(new ResponseTransformer<>(this.<BaseData> bindLifeCycle()))
.subscribe(new ResponseSubscriber<BaseData>(view) {
@Override public void success(BaseData baseData) {
view.renderCategories(categories);
if (baseData.data != null && baseData.data.subCategories != null)
view.renderSubCategories(baseData.data.subCategories);
}
});
flatMap中需要传入一个Func1对象, 在这种情况下, Action里的数据也是需要解析的, 因此也可以创建一个ResponseAction类用于解析结果与错误. 代码与ResponseSubscriber类似, 就不贴出来了. 但是此时就有两个类似的类, 大部分代码都一样. 违反了DRY原则, 如果一旦数据结构有变或者异常类型增多则需要修改两个类, 因此将部分共同的代码, 提取到一个新的处理类中:
/**
* 网络结果处理类, 此类会判断网络错误与业务错误.
*
* <P>
* {@link ResponseSubscriber}与{@link ResponseAction}均是调用此类来实现网络结果判断, 错误处理,
* 以及重置加载状态.
*/
public class ResponseHandler<T> {
public static final int SUCCESS_STATUS = 1;
private BaseView view;
private CustomHandler<T> handler;
public ResponseHandler(CustomHandler<T> handler) {
this.handler = handler;
}
public ResponseHandler(CustomHandler<T> handler, BaseView view) {
this.handler = handler;
this.view = view;
}
public void onCompleted() {
release();
}
public void onError(Throwable e) {
resetLoadingStatus();
e.printStackTrace();
if (!handler.error(e)) {
handleException(e);
}
release();
}
public void onNext(T t) {
resetLoadingStatus();
BaseData data;
if (t instanceof BaseData) {
data = (BaseData) t;
if (data.status == SUCCESS_STATUS) {
handler.success(t);
} else {
if (!handler.operationError(t, data.status, data.msg)) {
handleOperationError(data.msg);
}
}
} else {
handler.success(t);
}
release();
}
public void resetLoadingStatus() {
if (view != null) {
if (view instanceof BasePaginationView) {
BasePaginationView paginationView = (BasePaginationView) view;
paginationView.onLoadingCompleted();
}
view.hideLoading();
}
}
public void release() {
view = null;
handler = null;
}
public void handleException(Throwable e) {
if (view != null) {
if (e instanceof ConnectException) {
view.showToastMessage(view.getViewContext().getString(R.string.network_error));
} else if (e instanceof HttpException) {
view.showToastMessage(view.getViewContext().getString(R.string.network_server_error));
} else if (e instanceof SocketTimeoutException) {
view.showToastMessage(view.getViewContext().getString(R.string.network_timeout));
} else {
view.showToastMessage(view.getViewContext().getString(R.string.network_other));
}
}
}
public void handleOperationError(String message) {
if (view != null)
view.showToastMessage(message);
}
public interface CustomHandler<T> {
/**
* 请求成功同时业务成功的情况下会调用此函数
*/
void success(T t);
/**
* 请求成功但业务失败的情况下会调用此函数.
* @return 是否需要自行处理业务错误.
* true - 需要, 父类不会处理错误
* false - 不需要, 交由父类处理
*/
boolean operationError(T t, int status, String message);
/**
* 请求失败的情况下会调用此函数
* @return 是否需要自行处理系统错误.
* true - 需要, 父类不会处理错误
* false - 不需要, 交由父类处理
*/
boolean error(Throwable e);
}
}
ResponseSubscriber与ResponseAction则均通过ResponseHandler处理. ResponseSubscriber与ResponseAction的思路相同, 但是具体实现又有差别, 这里不再赘述.
最后
RxJava最核心的功能就是对数据做各种变换, 在此基础之上又衍生出各种用法. 可以参考 可能是东半球最全的RxJava使用场景小结 .