flatMap(): 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单: Student[] students = ...; Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } ... }; Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(subscriber); 很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现: Student[] students = ...;Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } ...};Observable.from(students) .subscribe(subscriber); 依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢? 这个时候,就需要用 flatMap() 了: Student[] students = ...;Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ...};Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber); 从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。 扩展:由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava): networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token .flatMap(new Func1<String, Observable<Messages>>() { @Override public Observable<Messages> call(String token) { // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表 return networkClient.messages(); } }) .subscribe(new Action1<Messages>() { @Override public void call(Messages messages) { // 处理显示消息列表 showMessages(messages); } }); 传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。 throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器:RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber);妈妈再也不怕我的用户手抖点开两个重复的界面啦。 此外, RxJava 还提供很多便捷的方法来实现事件序列的变换,这里就不一一举例了。 2) 变换的原理:lift() 这些变换虽然功能各有不同,但实质上都是 针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码): // 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } });} 这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)—— subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。 当含有 lift() 时: 1. lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了; 2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe; 3.当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe; 4.而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber( Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。 这样就实现了 lift() 过程,有点 像一种代理机制,通过事件拦截和处理实现事件序列的变换。 精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考: observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { // 将事件序列中的 Integer 对象转换为 String 对象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; }}); 讲述 lift() 的原理只是为了让你更好地了解 RxJava ,从而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。 3) compose: 对 Observable 整体的变换 除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。你可以这么写: observable1 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber1);observable2 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber2);observable3 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber3);observable4 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber1); 你觉得这样太不软件工程了,于是你改成了这样: private Observable liftAll(Observable observable) { return observable .lift1() .lift2() .lift3() .lift4();}...liftAll(observable1).subscribe(subscriber1);liftAll(observable2).subscribe(subscriber2);liftAll(observable3).subscribe(subscriber3);liftAll(observable4).subscribe(subscriber4); 可读性、可维护性都提高了。可是 Observable 被一个方法包起来,这种方式对于 Observale 的灵活性似乎还是增添了那么点限制。怎么办?这个时候,就应该用 compose() 来解决了: public class LiftAllTransformer implements Observable.Transformer<Integer, String> { @Override public Observable<String> call(Observable<Integer> observable) { return observable .lift1() .lift2() .lift3() .lift4(); }}...Transformer liftAll = new LiftAllTransformer();observable1.compose(liftAll).subscribe(subscriber1);observable2.compose(liftAll).subscribe(subscriber2);observable3.compose(liftAll).subscribe(subscriber3);observable4.compose(liftAll).subscribe(subscriber4); 像上面这样,使用 compose() 方法, Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。 compose() 的原理比较简单,不附图喽。 5. 线程控制:Scheduler (二) 除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。 1) Scheduler 的 API (二) 前面讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程? 答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』) subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说, observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码: Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) // 新线程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 线程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); // Android 主线程,由 observeOn() 指定 如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。 不过,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。 又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果? 这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。 2) Scheduler 的原理(二) 其实, subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。
subscribeOn() 和observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是,subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。 3) 延伸:doOnSubscribe() 然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。 在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。 而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。 示例代码: Observable.create(onSubscribe) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行 } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。 RxJava 的适用场景和使用方式 1. 与 Retrofit 的结合 Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit 的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。 Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。下面我用对比的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本的区别。 以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求: @GET("/user")public void getUser(@Query("userId") String userId, Callback<User> callback); 在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应: getUser(userId, new Callback<User>() { @Override public void success(User user) { userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... }}; 而使用 RxJava 形式的 API,定义同样的请求是这样的: @GET("/user")public Observable<User> getUser(@Query("userId") String userId); 使用的时候是这样的: getUser(userId) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } }); 看到区别了吗? 当 RxJava 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()。 对比来看, Callback 形式和 Observable 形式长得不太一样,但本质都差不多,而且在细节上 Observable 形式似乎还比 Callback 形式要差点。那 Retrofit 为什么还要提供 RxJava 的支持呢? 因为它好用啊!从这个例子看不出来是因为这只是最简单的情况。而一旦情景复杂起来, Callback 形式马上就会开始让人头疼。 比如: 假设这么一种情况:你的程序取到的 User 并不应该直接显示,而是需要先与数据库中的数据进行比对和修正后再显示。使用 Callback 方式大概可以这么写: getUser(userId, new Callback<User>() { @Override public void success(User user) { processUser(user); // 尝试修正 User 数据 userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... }}; 有问题吗? 很简便,但不要这样做。为什么?因为这样做会影响性能。数据库的操作很重,一次读写操作花费 10~20ms 是很常见的,这样的耗时很容易造成界面的卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。所以为了提升性能,这段代码可以优化一下: getUser(userId, new Callback<User>() { @Override public void success(User user) { new Thread() { @Override public void run() { processUser(user); // 尝试修正 User 数据 runOnUiThread(new Runnable() { // 切回 UI 线程 @Override public void run() { userView.setUser(user); } }); }).start(); } @Override public void failure(RetrofitError error) { // Error handling ... }}; 性能问题解决,但……这代码实在是太乱了,迷之缩进啊!杂乱的代码往往不仅仅是美观问题,因为代码越乱往往就越难读懂,而如果项目中充斥着杂乱的代码,无疑会降低代码的可读性,造成团队开发效率的降低和出错率的升高。 这时候,如果用 RxJava 的形式,就好办多了。 RxJava 形式的代码是这样的: getUser(userId) .doOnNext(new Action1<User>() { @Override public void call(User user) { processUser(user); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } }); 后台代码和前台代码全都写在一条链中,明显清晰了很多。 再举一个例子:假设 /user 接口并不能直接访问,而需要填入一个在线获取的 token ,代码应该怎么写? Callback 方式,可以使用嵌套的 Callback: @GET("/token")public void getToken(Callback<String> callback);@GET("/user")public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);...getToken(new Callback<String>() { @Override public void success(String token) { getUser(token, userId, new Callback<User>() { @Override public void success(User user) { userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... } }; } @Override public void failure(RetrofitError error) { // Error handling ... }}); 倒是没有什么性能问题,可是迷之缩进毁一生,你懂我也懂,做过大项目的人应该更懂。 而使用 RxJava 的话,代码是这样的: @GET("/token")public Observable<String> getToken();@GET("/user")public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);...getToken() .flatMap(new Func1<String, Observable<User>>() { @Override public Observable<User> onNext(String token) { return getUser(token, userId); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } }); 用一个 flatMap() 就搞定了逻辑,依然是一条链。看着就很爽,是吧? 2016/03/31 更新,加上我写的一个 Sample 项目: rengwuxian RxJava Samples 好,Retrofit 部分就到这里。 2. RxBinding RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API。 举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置: Button button = ...;RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件 .subscribe(new Action1<ViewClickEvent>() { @Override public void call(ViewClickEvent event) { // Click handling } }); 看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的快速连环点击: RxView.clickEvents(button) .throttleFirst(500, TimeUnit.MILLISECONDS) .subscribe(clickAction); 如果想对 RxBinding 有更多了解,可以去它的 GitHub 项目 下面看看。 3. 各种异步操作 前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。 4. RxBus RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,可以看 这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了 Otto ,目前为止没有不良反应。