RxJava

RxJava近来在Android领域非常火爆, 那么RxJava到底是什么呢? 这里不会过多的解释, 请参考文章 给 Android 开发者的 RxJava 详解深入浅出RxJava系列 .

RxJava在CoreLibs中有何应用? 最主要的就是与Retrofit2结合, 大大简化了调用网络接口的代码.

  1. 网络请求异常处理

  2. 预处理请求结果

  3. 自动取消网络请求

  4. 对请求结果做各种变换

注意: 下列功能均需要新建一个实体类, 如BaseData, 并实现ResponseHandler.IBaseData接口.

RxJava可以与Retrofit2无缝相连, 只需要在Retrofit.BuilderaddCallAdapterFactory()方法中传入RxJavaCallAdapterFactory.create()即可. RetrofitFactory类中已经做了相应的处理.

接口声明方式则改为如下:

@POST(Urls.GET_ADS)
Observable<BaseData> getAds();

这样就可以直接对getAds方法返回的Observable对象做操作.

以下内容均是建立在了解RxJava的Observable, Subscribe, Subscriber等概念, 并熟悉写法的基础上.

1. 网络请求异常处理

在RxJava中, 一个事件序列中如果出现异常就会回调onError, 我们可以利用这个特性来做统一的异常处理. 如果出现非业务方面的异常, 如网络连接失败, 数据解析失败, 服务器异常等都会进入Subscriber的onError(Throwable e), 通过判断Throwable的类型可以确定异常的具体原因并提示用户:

    @Override
    public void onError(Throwable e) {
        if (e instanceof ConnectException) {
          // 网络连接异常
        } else if (e instanceof HttpException) {
          // 服务器异常
        } else if (e instanceof SocketTimeoutException) {
          // 连接超时
        } else {
          // 其他异常, 如GSON解析错误等
        }
    }

异常种类正在不断完善中, 肯定是不止这几种, 但是这是目前最常见的几种异常. 我们可以新建一个类继承自RxJava的Subscriber, 然后在订阅网络请求返回的Observable时使用此类, 而不是默认的Subscriber:

public abstract class ResponseSubscriber<T> extends Subscriber<T> {

    private BaseView view;

    public ResponseSubscriber() {}

    public ResponseSubscriber(BaseView view) {
        this.view = view;
    }

    @Override
    public void onCompleted() {
      view = null;
    }

    @Override
    public void onError(Throwable e) {
      if (view != null) {
        if (e instanceof ConnectException) {
          view.showToastMessage(view.getViewContext().getString(R.string.network_error));
        } else if (e instanceof HttpException) {
          view.showToastMessage(view.getViewContext().getString(R.string.network_server_error));
        } else if (e instanceof SocketTimeoutException) {
          view.showToastMessage(view.getViewContext().getString(R.string.network_timeout));
        } else {
          view.showToastMessage(view.getViewContext().getString(R.string.network_other));
        }
      }

      view = null;
    }

    @Override
    public void onNext(T t) {
      view = null;
    }
}

我们可以利用抽象出来的BaseView自动提示用户, 而无需每一个请求都做判断. 需要注意的是, 在onNext, onError, onCompleted中需要将通过构造函数传入的BaseView置空. ResponseSubscriber已经实现了onNext, onError, onComplete这样抽象方法, 因此在其实例中可以没有任何实现方法, 也可以选择性的覆写, 如:

api.getTypes()
      .subscribe(new ResponseSubscriber() {
        @Override
        public void onNext(BaseData data) {
          super.onNext(data);
        }
      });

2. 预处理请求结果

进一步的, 我们可以定义自己的方法, 让实例选择覆写, 给予更大的灵活性:

/**
* 请求成功同时业务成功的情况下会调用此函数
*/
public abstract void success(T t);

/**
* 请求成功但业务失败的情况下会调用此函数.
*/
public boolean operationError(T t, int status, String message) {}

/**
* 请求失败的情况下会调用此函数
*/
public boolean error(Throwable e) {}

由于我们Server端返回的JSON都有固定的格式, 因此所有的返回结果都会以BaseData实体来接收:

public class BaseData {
    public int status; // 操作结果, 1为成功, 其他为失败
    public String msg; // 返回的消息
    public MapData data; // 携带的数据
    public Page page; // 分页数据
}

只要status不为1, 就意味着业务失败, 因此抽象出了operationError函数. 当status不为1的情况下, 会调用operationError. 一般情况下, 业务失败我们都需要将服务器返回的消息展示给用户, 我们可以同样的, 将此逻辑写在ResponseSubscriber中.

public void onNext(T t) {
   resetLoadingStatus();
   BaseData data;
   if (t instanceof BaseData) {
     data = (BaseData) t;
     if (data.status == SUCCESS_STATUS) {
       success(t);
     } else {
       if (!operationError(t, data.status, data.msg)) {
         handleOperationError(data.msg);
       }
     }
   } else {
     success(t);
   }
   release();
}

public void handleOperationError(String message) {
   if (view != null)
     view.showToastMessage(message);
}

public void resetLoadingStatus() {
   if (view != null)
     view.hideLoading();
}

public void release() {
   view = null;
}

不论业务是否成功, 都是请求成功, 因此需要在onNext书写判断逻辑. 上述代码很简单, 主要流程是首先重置加载框状态, 开发人员就无需每次都去隐藏加载框. 然后判断返回结果是不是BaseData, 如果不是则直接调用success, 让开发人员自行处理. 如果是BaseData则判断status是否是1, 是1就调用success, 不是1就调用operationError(t, data.status, data.msg), 根据返回结果判断是否调用handleOperationError.

onError中的逻辑类似:

public void onError(Throwable e) {
   resetLoadingStatus();
   e.printStackTrace();
   if (!handler.error(e)) {
      handleException(e);
   }
   release();
}

接下来再重新看看自定义的三个函数:

        /**
         * 请求成功同时业务成功的情况下会调用此函数
         */
        void success(T t);

        /**
         * 请求成功但业务失败的情况下会调用此函数.
         * @return 是否需要自行处理业务错误.
         * true - 需要, 父类不会处理错误
         * false - 不需要, 交由父类处理
         */
        boolean operationError(T t, int status, String message);

        /**
         * 请求失败的情况下会调用此函数
         * @return 是否需要自行处理系统错误.
         * true - 需要, 父类不会处理错误
         * false - 不需要, 交由父类处理
         */
        boolean error(Throwable e);

总结一下, 使用ResponseSubscriber去订阅网络请求结果时, 可以选择不传入BaseView, 这样所有判断逻辑都需要自行实现. 如果传入BaseView, 默认情况会实现所有逻辑. success方法必须覆写, 可以选择覆写operationError与error, 如果覆写返回true, 则意味已经自行处理逻辑, ResponseSubscriber不会再去处理, 反之则会处理. 一般情况下如下写法就够了:

api.getTypes()
      .subscribe(new ResponseSubscriber<BaseData>(view) {
        @Override
        public void success(BaseData baseData) {
          if (baseData.data != null && baseData.data.types != null)
            view.renderTypes(baseData.data.types); // view在BasePresenter中声明并实例化
        }
      });

以上就是预处理请求结果.

3. 自动取消网络请求

我们都知道要在Activity/Fragment的onDestory中取消正在连接的网络请求, 避免内存泄漏或其他风险, 提高体验. 那么在Retrofit2+RxJava中怎么取消请求呢? 如果是这么定义的请求:

@GET("users/{user}/repos")
Call<List<Repo>> listRepos(@Path("user") String user);

则可以使用call.cancel()来取消请求. 如果使用RxJava呢? 可以使用Subscription的unsubscribe方法:

Subscription subscription = api.getAds().subscribe(...);
subscription.unsubscribe();

subscribe Observable会返回一个Subscription对象, 调用subscription.unsubscribe()会取消订阅并回调onComplete方法. 结合Retrofit2则会同时取消网络请求.

如果一个界面需要发送很多网络请求, 则要定义很多个Subscription对象, 这时可以使用CompositeSubscription.

CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(api.getAds().subscribe(...));
compositeSubscription.add(api.getTypes().subscribe(...));
compositeSubscription.unsubscribe();

可以将compositeSubscription看作是一个Subscription的集合, 能同时unsubscribe多个Subscription. 但是这么做又显得不是很优雅, 破坏了RxJava的链式结构, 逼死强迫症.

RxAndroid

RxAndroid还是Jake Wharton大神写的针对Android平台的RxJava扩展, RxAndroid可以很方便的使用AndroidSchedulers.mainThread()将数据发送到Android的主线程, 也可以替代一些诸如点击回调的事件等等等. 具体用途可以自行百度.

这里我选取了RxAndroid lifecycle来替代Subscription. RxAndroid lifecycle可以将给定的Observable绑定至Activity/Fragment的生命周期. 要使用RxAndroid lifecycle首先需要使Activity/Fragment继承自RxFragmentActivity/RxFragment. 因此CoreLibs中的BaseActivity与BaseFragment均是继承自RxFragmentActivity/RxFragment.

然后就可以使用Observable的compose方法, 以及RxFragmentActivity/RxFragment的bindToLifecycle()/bindUntilEvent(ActivityEvent event).

api.getTypes().compose(bindToLifecycle());

通过上述代码, 如果代码是在onCreate中调用的, 则会在onDestroy中unSubscribe, 如果是在onResume中调用, 则会在onPause中unSubscribe. 同时也可以使用bindUntilEvent(ActivityEvent event)指定具体的函数. 一般情况下, 建议使用bindToLifecycle().

但是, 网络请求在MVP中都是通过Presenter发送的, 而bindToLifecycle()方法又是属于Activity/Fragment的, 也就是说Presenter中没有此方法, 无法绑定. 怎么办呢? 从BaseView入手. BaseView中加入如下函数:

<V> Observable.Transformer<V, V> bind();

并且在Activity/Fragment中实现:

@Override
public <V> Observable.Transformer<V, V> bind() {
    return bindToLifecycle();
}

Presenter中就可以通过下列代码将Observable绑定至生命周期:

api.getTypes().compose(view.bind());

接下来在BasePresenter中加入如下方法:

protected <V> Observable.Transformer<V, V> bindToLifeCycle() {
    return view.bind();
}

最后演变成:

api.getTypes().compose(bindToLifeCycle());

如果我们想使用subscribeOn(Schedulers.io())以及observeOn(AndroidSchedulers.mainThread())就意味着我们每次都需要多写两行代码. 因此我们可以写一个类, 专门将重复出现的Observable变换代码整合到一起:

/**
 * 用于对网络请求的Observable做转换.
 * 配合{@link BasePresenter#bindToLifeCycle()}一起使用
 * 可以将原始Observable绑定至Activity/Fragment生命周期, 同时声明在IO线程运行, 在main线程接收.
 */
public class ResponseTransformer<T> implements Observable.Transformer<T, T> {

    private Observable.Transformer<T, T> transformer;

    public ResponseTransformer() {}

    public ResponseTransformer(Observable.Transformer<T, T> t) {
        transformer = t;
    }

    @Override
    public Observable<T> call(Observable<T> source) {
        if (transformer != null)
            return transformer.call(source).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        else
            return source.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
    }
}

ResponseTransformer继承自Transformer, 泛型有点多, 但是在了解了RxJava的Transformer以及Java泛型之后, 这段代码应该不难理解. 使用ResponseTransformer之后的代码如下所示:

api.getAds().compose(new ResponseTransformer<>(this.<BaseData> bindToLifeCycle()));

请注意, 在bindToLifeCycle()前应加上this.<BaseData>, 不然在compose之后的subscribe方法将无法确定返回类型, 从而识别成Object:

api.getAds().compose(new ResponseTransformer<>(bindToLifeCycle()))
                .subscribe(new ResponseSubscriber<Object>(view) {
                    @Override
                    public void success(Object data) {
                    }
                });

完整正确的代码如下:

api.getAds().compose(new ResponseTransformer<>(this.<BaseData> bindToLifeCycle()))
                .subscribe(new ResponseSubscriber<BaseData>(view) {
                    @Override
                    public void success(BaseData baseData) {
                        if (baseData.data.ads != null)
                            view.renderAds(baseData.data.ads);
                    }
                });

以上代码自动处理了:

  1. 在合适的时候取消请求

  2. 在io线程发送请求, 在Main线程接受结果

  3. 将结果转换为BaseData类型

  4. 判断业务是否执行成功, 失败则提示服务器返回的消息

  5. 识别错误类型, 并做相应的提示.

  6. 打印相应的请求Log

4. 对请求结果做各种变换

想象一下, 如果一个页面有两个请求, 一个获取所有的一级分类, 另一个根据第一个一级分类的id去获取二级分类. 一般我们会在第一个网络请求成功后, 去解析数据并发送第二个网络请求. 但是这么写会嵌套, 如果解析代码很多会难以阅读, 这时候我们可以借助Observable的flatMap方法解决这个问题:

    final List<Category> categories = new ArrayList<>();
        api.getCategories()
                .flatMap(new ResponseAction<BaseData, Observable<BaseData>>(view) {
                    @Override 
                    public Observable<BaseData> onCall(BaseData baseData) {
                        if (baseData.data != null && baseData.data.categories != null) {
                            categories.addAll(baseData.data.categories);
                            return api.getSubAttractions(baseData.data.categories.get(0).id);
                        }
                        return null;
                    }
                }).compose(new ResponseTransformer<>(this.<BaseData> bindLifeCycle()))
                .subscribe(new ResponseSubscriber<BaseData>(view) {
                    @Override public void success(BaseData baseData) {
                        view.renderCategories(categories);
                        if (baseData.data != null && baseData.data.subCategories != null)
                            view.renderSubCategories(baseData.data.subCategories);
                    }
                });

flatMap中需要传入一个Func1对象, 在这种情况下, Action里的数据也是需要解析的, 因此也可以创建一个ResponseAction类用于解析结果与错误. 代码与ResponseSubscriber类似, 就不贴出来了. 但是此时就有两个类似的类, 大部分代码都一样. 违反了DRY原则, 如果一旦数据结构有变或者异常类型增多则需要修改两个类, 因此将部分共同的代码, 提取到一个新的处理类中:

/**
 * 网络结果处理类, 此类会判断网络错误与业务错误.
 *
 * <P>
 *     {@link ResponseSubscriber}与{@link ResponseAction}均是调用此类来实现网络结果判断, 错误处理,
 *     以及重置加载状态.
 */
public class ResponseHandler<T> {
    public static final int SUCCESS_STATUS = 1;

    private BaseView view;
    private CustomHandler<T> handler;

    public ResponseHandler(CustomHandler<T> handler) {
        this.handler = handler;
    }

    public ResponseHandler(CustomHandler<T> handler, BaseView view) {
        this.handler = handler;
        this.view = view;
    }

    public void onCompleted() {
        release();
    }

    public void onError(Throwable e) {
        resetLoadingStatus();
        e.printStackTrace();
        if (!handler.error(e)) {
            handleException(e);
        }
        release();
    }

    public void onNext(T t) {
        resetLoadingStatus();
        BaseData data;
        if (t instanceof BaseData) {
            data = (BaseData) t;
            if (data.status == SUCCESS_STATUS) {
                handler.success(t);
            } else {
                if (!handler.operationError(t, data.status, data.msg)) {
                    handleOperationError(data.msg);
                }
            }
        } else {
            handler.success(t);
        }
        release();
    }

    public void resetLoadingStatus() {
        if (view != null) {
            if (view instanceof BasePaginationView) {
                BasePaginationView paginationView = (BasePaginationView) view;
                paginationView.onLoadingCompleted();
            }
            view.hideLoading();
        }
    }

    public void release() {
        view = null;
        handler = null;
    }

    public void handleException(Throwable e) {
        if (view != null) {
            if (e instanceof ConnectException) {
                view.showToastMessage(view.getViewContext().getString(R.string.network_error));
            } else if (e instanceof HttpException) {
                view.showToastMessage(view.getViewContext().getString(R.string.network_server_error));
            } else if (e instanceof SocketTimeoutException) {
                view.showToastMessage(view.getViewContext().getString(R.string.network_timeout));
            } else {
                view.showToastMessage(view.getViewContext().getString(R.string.network_other));
            }
        }
    }

    public void handleOperationError(String message) {
        if (view != null)
            view.showToastMessage(message);
    }

    public interface CustomHandler<T> {
        /**
         * 请求成功同时业务成功的情况下会调用此函数
         */
        void success(T t);

        /**
         * 请求成功但业务失败的情况下会调用此函数.
         * @return 是否需要自行处理业务错误.
         * true - 需要, 父类不会处理错误
         * false - 不需要, 交由父类处理
         */
        boolean operationError(T t, int status, String message);

        /**
         * 请求失败的情况下会调用此函数
         * @return 是否需要自行处理系统错误.
         * true - 需要, 父类不会处理错误
         * false - 不需要, 交由父类处理
         */
        boolean error(Throwable e);
    }
}

ResponseSubscriber与ResponseAction则均通过ResponseHandler处理. ResponseSubscriber与ResponseAction的思路相同, 但是具体实现又有差别, 这里不再赘述.

最后

RxJava最核心的功能就是对数据做各种变换, 在此基础之上又衍生出各种用法. 可以参考 可能是东半球最全的RxJava使用场景小结 .

Last updated