RxBus

相信大家也都用过EventBus, Otto等开源库, 利用RxJava也能很简单的实现类似功能而无需引入其他库.

参考 Implementing an Event Bus With RxJava - RxBus [译文]

炒鸡简单的实现:

public class RxBus {

  private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());

  public void send(Object o) {
    _bus.onNext(o);
  }

  public Observable<Object> toObserverable() {
    return _bus;
  }
}

具体用法可以参考上面的网址. 这里只说明一下对基础实现改进的地方.

首先, 添加一个默认的事件总线, 当然如果要细分事件的话也可以创建一个UI总线, xx总线等等:

private static final RxBus instance = new RxBus();
public static RxBus getDefault() {
    return instance;
}

这样, 每次就可以这么发送事件:

RxBus.getDefault().send(new TapEvent());

但是呢, 每次接受事件的时候都需要筛选一遍:

if(event instanceof TapEvent)

这样很麻烦, 能不能发送事件的时候自动筛选, 监听了TapEvent的订阅者只能收到TapEvent事件, 不监听的就收不到, 而无需在接受事件的时候自行筛选? 可以的! 我们修改以下代码:

public <T> Observable<T> toObservable(final Class<T> eventType) {
        return bus.filter(new Func1<Object, Boolean>() {
            @Override
            public Boolean call(Object o) {
                return eventType.isInstance(o);
            }
        }).cast(eventType);
    }

将toObservable方法扩展下, 不单单返回_bus对象, 而是需要传入一个class对象, 再使用Observable的filter方法进行筛选, 这样就能够在源头上控制谁能收到什么样的事件.

但是呢, 懒癌突然犯了, 每次发送接受事件都得新建一个类, 能不能使用字符串做标识, 同时我想要传递一些基本数据, 比如数字之类的呢? 也是可以的, 继续扩展一下toObservable方法:

  public <T> Observable<T> toObservable(final Class<T> eventType, final String tag) {
        return bus.filter(new Func1<Object, Boolean>() {
            @Override
            public Boolean call(Object o) {
                if (!(o instanceof RxBusObject)) return false;
                RxBusObject ro = (RxBusObject) o;
                return eventType.isInstance(ro.getObj()) && tag != null
                        && tag.equals(ro.getTag());
            }
        }).map(new Func1<Object, T>() {
            @Override
            public T call(Object o) {
                RxBusObject ro = (RxBusObject) o;
                return (T) ro.getObj();
            }
        });
    }

同时还需要扩展send方法:

public void send(Object o, String tag) {
    bus.onNext(new RxBusObject(tag, o));
}

这里借助了一个自定义的实体类:

    public static class RxBusObject {
        private String tag;
        private Object obj;

        public RxBusObject(String tag, Object obj) {
            this.tag = tag;
            this.obj = obj;
        }

        public String getTag() {
            return tag;
        }

        public void setTag(String tag) {
            this.tag = tag;
        }

        public Object getObj() {
            return obj;
        }

        public void setObj(Object obj) {
            this.obj = obj;
        }

        public static RxBusObject newInstance(String tag, Object obj) {
            return new RxBusObject(tag, obj);
        }
    }

为什么需要RxBusObject这个实体类? 为了将数据与Tag绑定起来. 在发送的时候组装RxBusObject对象, 然后再对bus对象做筛选, 不是RxBusObject类型直接返回false, 然后再匹配tag是否相同以及是否是给定的数据类型, 全部相同就能收到事件. 收发事件就变成了如下:

RxBus.getDefault().send(steps, EVENT_STEP_CHANGE);
RxBus.getDefault().toObservable(Integer.class, EVENT_STEP_CHANGE)
                .subscribe(new RxBusSubscriber<Integer>() {
                    @Override public void receive(Integer data) {

                    }
                });

这里我们同样可以使用bindToLifeCycle()方法来将Observable绑定至Activity/Fragment的生命周期, 自动的在合适的时候取消订阅. 也能使用subscribeOn()与observeOn()方法做线程调度. 为了能很方便的接受事件, 而无需全部实现Subscriber的三个方法, 同样定义一个自己的RxBusSubscriber类:

/**
 * 请使用此类来subscribe RxBus返回的Observable以简化onError与onCompleted函数.
 */
public abstract class RxBusSubscriber<T> extends Subscriber<T> {
    @Override
    public void onCompleted() {
        completed();
    }

    @Override
    public void onError(Throwable e) {
        error(e);
    }

    @Override
    public void onNext(T t) {
        receive(t);
    }

    public abstract void receive(T data);
    public void error(Throwable e) {
        e.printStackTrace();
    }
    public void completed() {}

}

现在我们就能比较方便的使用RxBus了. 当然也可以在其之上继续做扩展.

Last updated