# RxBus

相信大家也都用过EventBus, Otto等开源库, 利用RxJava也能很简单的实现类似功能而无需引入其他库.

> 参考 [Implementing an Event Bus With RxJava - RxBus](http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/) \[[译文](https://drakeet.me/rxbus)]

炒鸡简单的实现:

```
public class RxBus {

  private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());

  public void send(Object o) {
    _bus.onNext(o);
  }

  public Observable<Object> toObserverable() {
    return _bus;
  }
}
```

具体用法可以参考上面的网址. 这里只说明一下对基础实现改进的地方.

首先, 添加一个默认的事件总线, 当然如果要细分事件的话也可以创建一个UI总线, xx总线等等:

```
private static final RxBus instance = new RxBus();
public static RxBus getDefault() {
    return instance;
}
```

这样, 每次就可以这么发送事件:

```
RxBus.getDefault().send(new TapEvent());
```

但是呢, 每次接受事件的时候都需要筛选一遍:

```
if(event instanceof TapEvent)
```

这样很麻烦, 能不能发送事件的时候自动筛选, 监听了TapEvent的订阅者只能收到TapEvent事件, 不监听的就收不到, 而无需在接受事件的时候自行筛选? 可以的! 我们修改以下代码:

```
public <T> Observable<T> toObservable(final Class<T> eventType) {
        return bus.filter(new Func1<Object, Boolean>() {
            @Override
            public Boolean call(Object o) {
                return eventType.isInstance(o);
            }
        }).cast(eventType);
    }
```

将toObservable方法扩展下, 不单单返回\_bus对象, 而是需要传入一个class对象, 再使用Observable的filter方法进行筛选, 这样就能够在源头上控制谁能收到什么样的事件.

但是呢, 懒癌突然犯了, 每次发送接受事件都得新建一个类, 能不能使用字符串做标识, 同时我想要传递一些基本数据, 比如数字之类的呢? 也是可以的, 继续扩展一下toObservable方法:

```
  public <T> Observable<T> toObservable(final Class<T> eventType, final String tag) {
        return bus.filter(new Func1<Object, Boolean>() {
            @Override
            public Boolean call(Object o) {
                if (!(o instanceof RxBusObject)) return false;
                RxBusObject ro = (RxBusObject) o;
                return eventType.isInstance(ro.getObj()) && tag != null
                        && tag.equals(ro.getTag());
            }
        }).map(new Func1<Object, T>() {
            @Override
            public T call(Object o) {
                RxBusObject ro = (RxBusObject) o;
                return (T) ro.getObj();
            }
        });
    }
```

同时还需要扩展send方法:

```
public void send(Object o, String tag) {
    bus.onNext(new RxBusObject(tag, o));
}
```

这里借助了一个自定义的实体类:

```
    public static class RxBusObject {
        private String tag;
        private Object obj;

        public RxBusObject(String tag, Object obj) {
            this.tag = tag;
            this.obj = obj;
        }

        public String getTag() {
            return tag;
        }

        public void setTag(String tag) {
            this.tag = tag;
        }

        public Object getObj() {
            return obj;
        }

        public void setObj(Object obj) {
            this.obj = obj;
        }

        public static RxBusObject newInstance(String tag, Object obj) {
            return new RxBusObject(tag, obj);
        }
    }
```

为什么需要RxBusObject这个实体类? 为了将数据与Tag绑定起来. 在发送的时候组装RxBusObject对象, 然后再对bus对象做筛选, 不是RxBusObject类型直接返回false, 然后再匹配tag是否相同以及是否是给定的数据类型, 全部相同就能收到事件. 收发事件就变成了如下:

```
RxBus.getDefault().send(steps, EVENT_STEP_CHANGE);
```

```
RxBus.getDefault().toObservable(Integer.class, EVENT_STEP_CHANGE)
                .subscribe(new RxBusSubscriber<Integer>() {
                    @Override public void receive(Integer data) {

                    }
                });
```

这里我们同样可以使用bindToLifeCycle()方法来将Observable绑定至Activity/Fragment的生命周期, 自动的在合适的时候取消订阅. 也能使用subscribeOn()与observeOn()方法做线程调度. 为了能很方便的接受事件, 而无需全部实现Subscriber的三个方法, 同样定义一个自己的RxBusSubscriber类:

```
/**
 * 请使用此类来subscribe RxBus返回的Observable以简化onError与onCompleted函数.
 */
public abstract class RxBusSubscriber<T> extends Subscriber<T> {
    @Override
    public void onCompleted() {
        completed();
    }

    @Override
    public void onError(Throwable e) {
        error(e);
    }

    @Override
    public void onNext(T t) {
        receive(t);
    }

    public abstract void receive(T data);
    public void error(Throwable e) {
        e.printStackTrace();
    }
    public void completed() {}

}
```

现在我们就能比较方便的使用RxBus了. 当然也可以在其之上继续做扩展.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://ryan-8.gitbook.io/android-architecture-journey/utilities/rxbus.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
