相信大家也都用过EventBus, Otto等开源库, 利用RxJava也能很简单的实现类似功能而无需引入其他库.
参考 Implementing an Event Bus With RxJava - RxBus [译文 ]
炒鸡简单的实现:
Copy public class RxBus {
private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> toObserverable() {
return _bus;
}
}
具体用法可以参考上面的网址. 这里只说明一下对基础实现改进的地方.
首先, 添加一个默认的事件总线, 当然如果要细分事件的话也可以创建一个UI总线, xx总线等等:
Copy private static final RxBus instance = new RxBus();
public static RxBus getDefault() {
return instance;
}
这样, 每次就可以这么发送事件:
Copy RxBus.getDefault().send(new TapEvent());
但是呢, 每次接受事件的时候都需要筛选一遍:
Copy if(event instanceof TapEvent)
这样很麻烦, 能不能发送事件的时候自动筛选, 监听了TapEvent的订阅者只能收到TapEvent事件, 不监听的就收不到, 而无需在接受事件的时候自行筛选? 可以的! 我们修改以下代码:
Copy public <T> Observable<T> toObservable(final Class<T> eventType) {
return bus.filter(new Func1<Object, Boolean>() {
@Override
public Boolean call(Object o) {
return eventType.isInstance(o);
}
}).cast(eventType);
}
将toObservable方法扩展下, 不单单返回_bus对象, 而是需要传入一个class对象, 再使用Observable的filter方法进行筛选, 这样就能够在源头上控制谁能收到什么样的事件.
但是呢, 懒癌突然犯了, 每次发送接受事件都得新建一个类, 能不能使用字符串做标识, 同时我想要传递一些基本数据, 比如数字之类的呢? 也是可以的, 继续扩展一下toObservable方法:
Copy public <T> Observable<T> toObservable(final Class<T> eventType, final String tag) {
return bus.filter(new Func1<Object, Boolean>() {
@Override
public Boolean call(Object o) {
if (!(o instanceof RxBusObject)) return false;
RxBusObject ro = (RxBusObject) o;
return eventType.isInstance(ro.getObj()) && tag != null
&& tag.equals(ro.getTag());
}
}).map(new Func1<Object, T>() {
@Override
public T call(Object o) {
RxBusObject ro = (RxBusObject) o;
return (T) ro.getObj();
}
});
}
同时还需要扩展send方法:
Copy public void send(Object o, String tag) {
bus.onNext(new RxBusObject(tag, o));
}
这里借助了一个自定义的实体类:
Copy public static class RxBusObject {
private String tag;
private Object obj;
public RxBusObject(String tag, Object obj) {
this.tag = tag;
this.obj = obj;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public Object getObj() {
return obj;
}
public void setObj(Object obj) {
this.obj = obj;
}
public static RxBusObject newInstance(String tag, Object obj) {
return new RxBusObject(tag, obj);
}
}
为什么需要RxBusObject这个实体类? 为了将数据与Tag绑定起来. 在发送的时候组装RxBusObject对象, 然后再对bus对象做筛选, 不是RxBusObject类型直接返回false, 然后再匹配tag是否相同以及是否是给定的数据类型, 全部相同就能收到事件. 收发事件就变成了如下:
Copy RxBus.getDefault().send(steps, EVENT_STEP_CHANGE);
Copy RxBus.getDefault().toObservable(Integer.class, EVENT_STEP_CHANGE)
.subscribe(new RxBusSubscriber<Integer>() {
@Override public void receive(Integer data) {
}
});
这里我们同样可以使用bindToLifeCycle()方法来将Observable绑定至Activity/Fragment的生命周期, 自动的在合适的时候取消订阅. 也能使用subscribeOn()与observeOn()方法做线程调度. 为了能很方便的接受事件, 而无需全部实现Subscriber的三个方法, 同样定义一个自己的RxBusSubscriber类:
Copy /**
* 请使用此类来subscribe RxBus返回的Observable以简化onError与onCompleted函数.
*/
public abstract class RxBusSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {
completed();
}
@Override
public void onError(Throwable e) {
error(e);
}
@Override
public void onNext(T t) {
receive(t);
}
public abstract void receive(T data);
public void error(Throwable e) {
e.printStackTrace();
}
public void completed() {}
}
现在我们就能比较方便的使用RxBus了. 当然也可以在其之上继续做扩展.