# RxJava

> RxJava近来在Android领域非常火爆, 那么RxJava到底是什么呢? 这里不会过多的解释, 请参考文章 [给 Android 开发者的 RxJava 详解](http://gank.io/post/560e15be2dca930e00da1083) 与 [深入浅出RxJava系列](http://blog.csdn.net/lzyzsd/article/details/41833541) .

RxJava在CoreLibs中有何应用? 最主要的就是与Retrofit2结合, 大大简化了调用网络接口的代码.

1. 网络请求异常处理
2. 预处理请求结果
3. 自动取消网络请求
4. 对请求结果做各种变换

**注意: 下列功能均需要新建一个实体类, 如BaseData, 并实现ResponseHandler.IBaseData接口.**

RxJava可以与Retrofit2无缝相连, 只需要在`Retrofit.Builder`的`addCallAdapterFactory()`方法中传入`RxJavaCallAdapterFactory.create()`即可. RetrofitFactory类中已经做了相应的处理.

接口声明方式则改为如下:

```
@POST(Urls.GET_ADS)
Observable<BaseData> getAds();
```

这样就可以直接对getAds方法返回的Observable对象做操作.

> 以下内容均是建立在了解RxJava的Observable, Subscribe, Subscriber等概念, 并熟悉写法的基础上.

## 1. 网络请求异常处理

在RxJava中, 一个事件序列中如果出现异常就会回调onError, 我们可以利用这个特性来做统一的异常处理. 如果出现非业务方面的异常, 如网络连接失败, 数据解析失败, 服务器异常等都会进入Subscriber的onError(Throwable e), 通过判断Throwable的类型可以确定异常的具体原因并提示用户:

```
    @Override
    public void onError(Throwable e) {
        if (e instanceof ConnectException) {
          // 网络连接异常
        } else if (e instanceof HttpException) {
          // 服务器异常
        } else if (e instanceof SocketTimeoutException) {
          // 连接超时
        } else {
          // 其他异常, 如GSON解析错误等
        }
    }
```

异常种类正在不断完善中, 肯定是不止这几种, 但是这是目前最常见的几种异常. 我们可以新建一个类继承自RxJava的Subscriber, 然后在订阅网络请求返回的Observable时使用此类, 而不是默认的Subscriber:

```
public abstract class ResponseSubscriber<T> extends Subscriber<T> {

    private BaseView view;

    public ResponseSubscriber() {}

    public ResponseSubscriber(BaseView view) {
        this.view = view;
    }

    @Override
    public void onCompleted() {
      view = null;
    }

    @Override
    public void onError(Throwable e) {
      if (view != null) {
        if (e instanceof ConnectException) {
          view.showToastMessage(view.getViewContext().getString(R.string.network_error));
        } else if (e instanceof HttpException) {
          view.showToastMessage(view.getViewContext().getString(R.string.network_server_error));
        } else if (e instanceof SocketTimeoutException) {
          view.showToastMessage(view.getViewContext().getString(R.string.network_timeout));
        } else {
          view.showToastMessage(view.getViewContext().getString(R.string.network_other));
        }
      }

      view = null;
    }

    @Override
    public void onNext(T t) {
      view = null;
    }
}
```

我们可以利用抽象出来的BaseView自动提示用户, 而无需每一个请求都做判断. 需要注意的是, 在onNext, onError, onCompleted中需要将通过构造函数传入的BaseView置空. ResponseSubscriber已经实现了onNext, onError, onComplete这样抽象方法, 因此在其实例中可以没有任何实现方法, 也可以选择性的覆写, 如:

```
api.getTypes()
      .subscribe(new ResponseSubscriber() {
        @Override
        public void onNext(BaseData data) {
          super.onNext(data);
        }
      });
```

## 2. 预处理请求结果

进一步的, 我们可以定义自己的方法, 让实例选择覆写, 给予更大的灵活性:

```
/**
* 请求成功同时业务成功的情况下会调用此函数
*/
public abstract void success(T t);

/**
* 请求成功但业务失败的情况下会调用此函数.
*/
public boolean operationError(T t, int status, String message) {}

/**
* 请求失败的情况下会调用此函数
*/
public boolean error(Throwable e) {}
```

由于我们Server端返回的JSON都有固定的格式, 因此所有的返回结果都会以BaseData实体来接收:

```
public class BaseData {
    public int status; // 操作结果, 1为成功, 其他为失败
    public String msg; // 返回的消息
    public MapData data; // 携带的数据
    public Page page; // 分页数据
}
```

只要status不为1, 就意味着业务失败, 因此抽象出了operationError函数. 当status不为1的情况下, 会调用operationError. 一般情况下, 业务失败我们都需要将服务器返回的消息展示给用户, 我们可以同样的, 将此逻辑写在ResponseSubscriber中.

```
public void onNext(T t) {
   resetLoadingStatus();
   BaseData data;
   if (t instanceof BaseData) {
     data = (BaseData) t;
     if (data.status == SUCCESS_STATUS) {
       success(t);
     } else {
       if (!operationError(t, data.status, data.msg)) {
         handleOperationError(data.msg);
       }
     }
   } else {
     success(t);
   }
   release();
}

public void handleOperationError(String message) {
   if (view != null)
     view.showToastMessage(message);
}

public void resetLoadingStatus() {
   if (view != null)
     view.hideLoading();
}

public void release() {
   view = null;
}
```

不论业务是否成功, 都是请求成功, 因此需要在onNext书写判断逻辑. 上述代码很简单, 主要流程是首先重置加载框状态, 开发人员就无需每次都去隐藏加载框. 然后判断返回结果是不是BaseData, 如果不是则直接调用success, 让开发人员自行处理. 如果是BaseData则判断status是否是1, 是1就调用success, 不是1就调用operationError(t, data.status, data.msg), 根据返回结果判断是否调用handleOperationError.

onError中的逻辑类似:

```
public void onError(Throwable e) {
   resetLoadingStatus();
   e.printStackTrace();
   if (!handler.error(e)) {
      handleException(e);
   }
   release();
}
```

接下来再重新看看自定义的三个函数:

```
        /**
         * 请求成功同时业务成功的情况下会调用此函数
         */
        void success(T t);

        /**
         * 请求成功但业务失败的情况下会调用此函数.
         * @return 是否需要自行处理业务错误.
         * true - 需要, 父类不会处理错误
         * false - 不需要, 交由父类处理
         */
        boolean operationError(T t, int status, String message);

        /**
         * 请求失败的情况下会调用此函数
         * @return 是否需要自行处理系统错误.
         * true - 需要, 父类不会处理错误
         * false - 不需要, 交由父类处理
         */
        boolean error(Throwable e);
```

总结一下, 使用ResponseSubscriber去订阅网络请求结果时, 可以选择不传入BaseView, 这样所有判断逻辑都需要自行实现. 如果传入BaseView, 默认情况会实现所有逻辑. success方法必须覆写, 可以选择覆写operationError与error, 如果覆写返回true, 则意味已经自行处理逻辑, ResponseSubscriber不会再去处理, 反之则会处理. 一般情况下如下写法就够了:

```
api.getTypes()
      .subscribe(new ResponseSubscriber<BaseData>(view) {
        @Override
        public void success(BaseData baseData) {
          if (baseData.data != null && baseData.data.types != null)
            view.renderTypes(baseData.data.types); // view在BasePresenter中声明并实例化
        }
      });
```

以上就是预处理请求结果.

## 3. 自动取消网络请求

我们都知道要在Activity/Fragment的onDestory中取消正在连接的网络请求, 避免内存泄漏或其他风险, 提高体验. 那么在Retrofit2+RxJava中怎么取消请求呢? 如果是这么定义的请求:

```
@GET("users/{user}/repos")
Call<List<Repo>> listRepos(@Path("user") String user);
```

则可以使用call.cancel()来取消请求. 如果使用RxJava呢? 可以使用Subscription的unsubscribe方法:

```
Subscription subscription = api.getAds().subscribe(...);
subscription.unsubscribe();
```

subscribe Observable会返回一个Subscription对象, 调用subscription.unsubscribe()会取消订阅并回调onComplete方法. 结合Retrofit2则会同时取消网络请求.

如果一个界面需要发送很多网络请求, 则要定义很多个Subscription对象, 这时可以使用CompositeSubscription.

```
CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(api.getAds().subscribe(...));
compositeSubscription.add(api.getTypes().subscribe(...));
compositeSubscription.unsubscribe();
```

可以将compositeSubscription看作是一个Subscription的集合, 能同时unsubscribe多个Subscription. 但是这么做又显得不是很优雅, 破坏了RxJava的链式结构, 逼死强迫症.

### RxAndroid

RxAndroid还是Jake Wharton大神写的针对Android平台的RxJava扩展, RxAndroid可以很方便的使用`AndroidSchedulers.mainThread()`将数据发送到Android的主线程, 也可以替代一些诸如点击回调的事件等等等. 具体用途可以自行百度.

这里我选取了RxAndroid lifecycle来替代Subscription. RxAndroid lifecycle可以将给定的Observable绑定至Activity/Fragment的生命周期. 要使用RxAndroid lifecycle首先需要使Activity/Fragment继承自RxFragmentActivity/RxFragment. 因此CoreLibs中的BaseActivity与BaseFragment均是继承自RxFragmentActivity/RxFragment.

然后就可以使用Observable的compose方法, 以及RxFragmentActivity/RxFragment的bindToLifecycle()/bindUntilEvent(ActivityEvent event).

```
api.getTypes().compose(bindToLifecycle());
```

通过上述代码, 如果代码是在onCreate中调用的, 则会在onDestroy中unSubscribe, 如果是在onResume中调用, 则会在onPause中unSubscribe. 同时也可以使用bindUntilEvent(ActivityEvent event)指定具体的函数. 一般情况下, 建议使用bindToLifecycle().

但是, 网络请求在MVP中都是通过Presenter发送的, 而bindToLifecycle()方法又是属于Activity/Fragment的, 也就是说Presenter中没有此方法, 无法绑定. 怎么办呢? 从BaseView入手. BaseView中加入如下函数:

```
<V> Observable.Transformer<V, V> bind();
```

并且在Activity/Fragment中实现:

```
@Override
public <V> Observable.Transformer<V, V> bind() {
    return bindToLifecycle();
}
```

Presenter中就可以通过下列代码将Observable绑定至生命周期:

```
api.getTypes().compose(view.bind());
```

接下来在BasePresenter中加入如下方法:

```
protected <V> Observable.Transformer<V, V> bindToLifeCycle() {
    return view.bind();
}
```

最后演变成:

```
api.getTypes().compose(bindToLifeCycle());
```

如果我们想使用subscribeOn(Schedulers.io())以及observeOn(AndroidSchedulers.mainThread())就意味着我们每次都需要多写两行代码. 因此我们可以写一个类, 专门将重复出现的Observable变换代码整合到一起:

```
/**
 * 用于对网络请求的Observable做转换.
 * 配合{@link BasePresenter#bindToLifeCycle()}一起使用
 * 可以将原始Observable绑定至Activity/Fragment生命周期, 同时声明在IO线程运行, 在main线程接收.
 */
public class ResponseTransformer<T> implements Observable.Transformer<T, T> {

    private Observable.Transformer<T, T> transformer;

    public ResponseTransformer() {}

    public ResponseTransformer(Observable.Transformer<T, T> t) {
        transformer = t;
    }

    @Override
    public Observable<T> call(Observable<T> source) {
        if (transformer != null)
            return transformer.call(source).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        else
            return source.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
    }
}
```

ResponseTransformer继承自Transformer, 泛型有点多, 但是在了解了RxJava的Transformer以及Java泛型之后, 这段代码应该不难理解. 使用ResponseTransformer之后的代码如下所示:

```
api.getAds().compose(new ResponseTransformer<>(this.<BaseData> bindToLifeCycle()));
```

请注意, 在bindToLifeCycle()前应加上`this.<BaseData>`, 不然在compose之后的subscribe方法将无法确定返回类型, 从而识别成Object:

```
api.getAds().compose(new ResponseTransformer<>(bindToLifeCycle()))
                .subscribe(new ResponseSubscriber<Object>(view) {
                    @Override
                    public void success(Object data) {
                    }
                });
```

完整正确的代码如下:

```
api.getAds().compose(new ResponseTransformer<>(this.<BaseData> bindToLifeCycle()))
                .subscribe(new ResponseSubscriber<BaseData>(view) {
                    @Override
                    public void success(BaseData baseData) {
                        if (baseData.data.ads != null)
                            view.renderAds(baseData.data.ads);
                    }
                });
```

以上代码自动处理了:

1. 在合适的时候取消请求
2. 在io线程发送请求, 在Main线程接受结果
3. 将结果转换为BaseData类型
4. 判断业务是否执行成功, 失败则提示服务器返回的消息
5. 识别错误类型, 并做相应的提示.
6. 打印相应的请求Log

## 4. 对请求结果做各种变换

想象一下, 如果一个页面有两个请求, 一个获取所有的一级分类, 另一个根据第一个一级分类的id去获取二级分类. 一般我们会在第一个网络请求成功后, 去解析数据并发送第二个网络请求. 但是这么写会嵌套, 如果解析代码很多会难以阅读, 这时候我们可以借助Observable的flatMap方法解决这个问题:

```
    final List<Category> categories = new ArrayList<>();
        api.getCategories()
                .flatMap(new ResponseAction<BaseData, Observable<BaseData>>(view) {
                    @Override 
                    public Observable<BaseData> onCall(BaseData baseData) {
                        if (baseData.data != null && baseData.data.categories != null) {
                            categories.addAll(baseData.data.categories);
                            return api.getSubAttractions(baseData.data.categories.get(0).id);
                        }
                        return null;
                    }
                }).compose(new ResponseTransformer<>(this.<BaseData> bindLifeCycle()))
                .subscribe(new ResponseSubscriber<BaseData>(view) {
                    @Override public void success(BaseData baseData) {
                        view.renderCategories(categories);
                        if (baseData.data != null && baseData.data.subCategories != null)
                            view.renderSubCategories(baseData.data.subCategories);
                    }
                });
```

flatMap中需要传入一个Func1对象, 在这种情况下, Action里的数据也是需要解析的, 因此也可以创建一个ResponseAction类用于解析结果与错误. 代码与ResponseSubscriber类似, 就不贴出来了. 但是此时就有两个类似的类, 大部分代码都一样. 违反了DRY原则, 如果一旦数据结构有变或者异常类型增多则需要修改两个类, 因此将部分共同的代码, 提取到一个新的处理类中:

```
/**
 * 网络结果处理类, 此类会判断网络错误与业务错误.
 *
 * <P>
 *     {@link ResponseSubscriber}与{@link ResponseAction}均是调用此类来实现网络结果判断, 错误处理,
 *     以及重置加载状态.
 */
public class ResponseHandler<T> {
    public static final int SUCCESS_STATUS = 1;

    private BaseView view;
    private CustomHandler<T> handler;

    public ResponseHandler(CustomHandler<T> handler) {
        this.handler = handler;
    }

    public ResponseHandler(CustomHandler<T> handler, BaseView view) {
        this.handler = handler;
        this.view = view;
    }

    public void onCompleted() {
        release();
    }

    public void onError(Throwable e) {
        resetLoadingStatus();
        e.printStackTrace();
        if (!handler.error(e)) {
            handleException(e);
        }
        release();
    }

    public void onNext(T t) {
        resetLoadingStatus();
        BaseData data;
        if (t instanceof BaseData) {
            data = (BaseData) t;
            if (data.status == SUCCESS_STATUS) {
                handler.success(t);
            } else {
                if (!handler.operationError(t, data.status, data.msg)) {
                    handleOperationError(data.msg);
                }
            }
        } else {
            handler.success(t);
        }
        release();
    }

    public void resetLoadingStatus() {
        if (view != null) {
            if (view instanceof BasePaginationView) {
                BasePaginationView paginationView = (BasePaginationView) view;
                paginationView.onLoadingCompleted();
            }
            view.hideLoading();
        }
    }

    public void release() {
        view = null;
        handler = null;
    }

    public void handleException(Throwable e) {
        if (view != null) {
            if (e instanceof ConnectException) {
                view.showToastMessage(view.getViewContext().getString(R.string.network_error));
            } else if (e instanceof HttpException) {
                view.showToastMessage(view.getViewContext().getString(R.string.network_server_error));
            } else if (e instanceof SocketTimeoutException) {
                view.showToastMessage(view.getViewContext().getString(R.string.network_timeout));
            } else {
                view.showToastMessage(view.getViewContext().getString(R.string.network_other));
            }
        }
    }

    public void handleOperationError(String message) {
        if (view != null)
            view.showToastMessage(message);
    }

    public interface CustomHandler<T> {
        /**
         * 请求成功同时业务成功的情况下会调用此函数
         */
        void success(T t);

        /**
         * 请求成功但业务失败的情况下会调用此函数.
         * @return 是否需要自行处理业务错误.
         * true - 需要, 父类不会处理错误
         * false - 不需要, 交由父类处理
         */
        boolean operationError(T t, int status, String message);

        /**
         * 请求失败的情况下会调用此函数
         * @return 是否需要自行处理系统错误.
         * true - 需要, 父类不会处理错误
         * false - 不需要, 交由父类处理
         */
        boolean error(Throwable e);
    }
}
```

ResponseSubscriber与ResponseAction则均通过ResponseHandler处理. ResponseSubscriber与ResponseAction的思路相同, 但是具体实现又有差别, 这里不再赘述.

## 最后

> RxJava最核心的功能就是对数据做各种变换, 在此基础之上又衍生出各种用法. 可以参考 [可能是东半球最全的RxJava使用场景小结](http://blog.csdn.net/theone10211024/article/details/50435325) .


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://ryan-8.gitbook.io/android-architecture-journey/corlibsdesign_md/rxjava.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
