从今天开始我们来和大家一起学习一下Rxjava2系列的相关内容。
要在Android中使用RxJava2, 先添加Gradle配置:
compile 'io.reactivex.rxjava2:rxjava:2.0.7' compile 'io.reactivex.rxjava2:rxandroid:2.0.1'大家可能都知道, RxJava 以观察者模式为骨架,在2.0版本中出现了两种观察者模式(这两种我们后续都会介绍):
Observable ( 被观察者 ) / Observer ( 观察者 )Flowable (被观察者)/ Subscriber (观察者)我们先来看一张图:
在 RxJava 2.x 中,Observable 用于订阅 Observer,不再支持背压(1.x 中可以使用背压策略),而 Flowable 用于订阅 Subscriber , 是支持背压(Backpressure)的。
今天我用两根水管代替观察者和被观察者, 试图用通俗易懂的话把它们的关系解释清楚, 在这里我将从事件流这个角度来说明RxJava的基本工作原理。
上面一根水管为事件产生的水管,叫它上游吧,下面一根水管为事件接收的水管叫它下游吧,这里的上游和下游就分别对应着RxJava中的Observable和Observer,它们之间的连接就对应着subscribe(),因此这个关系用RxJava来表示就是:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "error"); } @Override public void onComplete() { Log.d(TAG, "complete"); } }); //运行结果 subscribe 1 2 3 complete只有当上游和下游建立连接之后, 上游才会开始发送事件, 也就是调用了subscribe() 方法之后才开始发送事件。
接下来解释一下其中两个陌生的玩意:ObservableEmitter和Disposable。
(1)ObservableEmitter: Emitter是发射器的意思,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。 发送事件需要满足一定的规则:
上游可以发送无限个onNext, 下游也可以接收无限个onNext。当上游发送了一个onComplete后,上游onComplete之后的事件将会继续发送,,而下游收到onComplete事件之后将不再继续接收事件。当上游发送了一个onError后,,上游onError之后的事件将继续发送,,而下游收到onError事件之后将不再继续接收事件。上游可以不发送onComplete或onError。最为关键的是onComplete和onError必须唯一并且互斥,即不能发多个onComplete,也不能发多个onError,也不能先发一个onComplete,然后再发一个onError, 反之亦然。只发送OnNext事件
发送onComplete事件
发送onError事件
(2)Disposable:一次性的,可丢弃的,当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件,调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件。
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); Log.d(TAG, "emit 4"); emitter.onNext(4); } }).subscribe(new Observer<Integer>() { private Disposable mDisposable; private int i; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe"); mDisposable = d; } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: " + value); i++; if (i == 2) { Log.d(TAG, "dispose"); //调用了dispose方法 mDisposable.dispose(); Log.d(TAG, "isDisposed : " + mDisposable.isDisposed()); } } @Override public void onError(Throwable e) { Log.d(TAG, "error"); } @Override public void onComplete() { Log.d(TAG, "complete"); } }); //运行结果 subscribe emit 1 onNext: 1 emit 2 onNext: 2 dispose isDisposed : true emit 3 emit complete emit 4从运行结果我们看到,在收到onNext 2这个事件后切断了水管,但是上游仍然发送了3,complete,4这几个事件,而且上游并没有因为发送了onComplete而停止,同时可以看到下游的onSubscribe()方法是最先调用的。
那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管。
另外subscribe()有多个重载的方法:
public final Disposable subscribe() {} public final Disposable subscribe(Consumer<? super T> onNext) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} public final void subscribe(Observer<? super T> observer) {} 不带任何参数的subscribe()表示下游不关心任何事件。带有一个Consumer参数的方法表示下游只关心onNext事件。 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); Log.d(TAG, "emit 4"); emitter.onNext(4); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "onNext: " + integer); } });正常情况下,上游和下游是工作在同一个线程中的,也就是说上游在哪个线程发事件,下游就在哪个线程接收事件。
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName()); Log.d(TAG, "emit 1"); emitter.onNext(1); } }); Consumer<Integer> consumer = new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName()); Log.d(TAG, "onNext: " + integer); } }; observable.subscribe(consumer); } //运行结果 Observable thread is : main emit 1 Observer thread is :main onNext: 1这就验证了刚才所说,上下游默认是在同一个线程工作。
在这个图中,我们用黄色水管表示子线程,深蓝色水管表示主线程。 要达到这个目的,我们需要先改变上游发送事件的线程, 让它去子线程中发送事件, 然后再改变下游的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点。
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName()); Log.d(TAG, "emit 1"); emitter.onNext(1); } }); Consumer<Integer> consumer = new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName()); Log.d(TAG, "onNext: " + integer); } }; observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer); //运行结果 Observable thread is : RxNewThreadScheduler-2 emit 1 Observer thread is :main onNext: 1可以看到, 上游发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-2的线程中发送的事件, 而下游仍然在主线程中接收事件, 这说明我们的目的达成了。
简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。 observable.subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.io()) .subscribe(consumer); //运行结果 Observable thread is : RxNewThreadScheduler-3 emit 1 Observer thread is :RxCachedThreadScheduler-1 onNext: 1这段代码中指定了两次上游发送事件的线程, 分别是newThread和IO线程, 下游也指定了两次线程,分别是main和IO线程.
可以看到, 上游虽然指定了两次线程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler 线程中, 而下游则跑到了RxCachedThreadScheduler 中, 这个CacheThread其实就是IO线程池中的一个.
RxJava 中,已经内置了很多线程选项供我们选择:
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;Schedulers.newThread() 代表一个常规的新线程;AndroidSchedulers.mainThread() 代表Android的主线程;本节介绍RxJava的相关理论就到这里了,下节我们来学习RxJava的相关操作符。