RxJava2 入门详细笔记(三)—— 功能操作符

xiaoxiao2025-05-19  38

一、功能操作符

1.1、delay()

延迟一段事件再发送事件

Observable.just(1, 2, 3) .delay(3, TimeUnit.SECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer value) throws Exception { Log.e(TAG, "value : " + value); } });

1.2、doOnEach()

Observable 发送一次事件之前都会回调这个方法

Observable.just(1, 2, 3) .doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.e(TAG, "integerNotification value : " + integerNotification.getValue()); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer value) throws Exception { Log.e(TAG, "accept : " + value); } }); 10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 1 10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 1 10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 2 10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 2 10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 3 10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 3 10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : null

1.3、doOnNext()

Observable 发送 onNext() 之前都会先回调这个方法

Observable.just(1, 2, 3) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "doOnNext accept : " + integer); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer value) throws Exception { Log.e(TAG, "accept : " + value); } }); 10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 1 10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 1 10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 2 10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 2 10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 3 10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 3

1.4、doAfterNext()

Observable 发送 onNext() 之后都会回调这个方法

Observable.just(1, 2, 3) .doAfterNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "doOnNext accept : " + integer); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer value) throws Exception { Log.e(TAG, "accept : " + value); } }); 10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 1 10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 1 10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 2 10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 2 10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 3 10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 3

1.5、doOnComplete()

Observable 调用 onComplete() 之前都会回调这个方法

Observable.just(1, 2, 3) .doOnComplete(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doOnComplete run()"); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer value) throws Exception { Log.e(TAG, "accept : " + value); } }); 10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 1 10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 2 10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 3 10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: doOnComplete run()

1.6、doOnError()

Observable 发送 onError() 之前都会回调这个方法

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Exception("Normal Exception")); } }).doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.e(TAG, "doOnError accept() : " + throwable.getMessage()); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { } }); 10-06 06:14:17.894 9230-9230/? E/MainActivity: onNext : 1 10-06 06:14:17.894 9230-9230/? E/MainActivity: onNext : 2 10-06 06:14:17.894 9230-9230/? E/MainActivity: doOnError accept() : Normal Exception 10-06 06:14:17.894 9230-9230/? E/MainActivity: onError : Normal Exception

1.7、doOnSubscribe()

Observable 发送 onSubscribe() 之前会回调这个方法

1.8、doOnDispose()

当调用 Disposable 的 dispose() 之后会回调该方法

1.9、doOnLifecycle()

在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅,doOnLifecycle() 第二个参数的回调方法的作用与 doOnDispose() 一样

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).doOnLifecycle(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.e(TAG, "doOnLifecycle accept"); } }, new Action() { @Override public void run() throws Exception { Log.e(TAG, "doOnLifecycle run"); } }).subscribe(new Observer<Integer>() { private Disposable disposable; @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); this.disposable = d; } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); disposable.dispose(); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: doOnLifecycle accept 10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: doOnLifecycle run

1.10、doOnTerminate() & doAfterTerminate()

doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).doOnTerminate(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doOnTerminate run"); } }).doAfterTerminate(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doAfterTerminate run"); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 06:34:55.968 9713-9713/? E/MainActivity: onSubscribe 10-06 06:34:55.968 9713-9713/? E/MainActivity: onNext : 1 10-06 06:34:55.968 9713-9713/? E/MainActivity: onNext : 2 10-06 06:34:55.968 9713-9713/? E/MainActivity: doOnTerminate run 10-06 06:34:55.968 9713-9713/? E/MainActivity: onComplete 10-06 06:34:55.968 9713-9713/? E/MainActivity: doAfterTerminate run

1.11、doFinally()

在所有事件发送完毕之后回调该方法。 doFinally() 和 doAfterTerminate() 的区别在于取消订阅时,如果取消订阅,之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后

1.12、onErrorReturn()

当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Exception("Normal Exception")); } }).onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) throws Exception { return 7; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 06:43:13.712 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 7 10-06 06:43:13.712 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onComplete

1.13、onErrorResumeNext()

当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Exception("Normal Exception")); } }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception { Log.e(TAG, "onErrorResumeNext apply: " + throwable.getMessage()); return Observable.just(4, 5, 6); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onErrorResumeNext apply: Normal Exception 10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 4 10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 5 10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 6 10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onComplete

1.14、 onExceptionResumeNext()

与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception,不能捕获 Error

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Exception("Normal Exception")); } }).onExceptionResumeNext(new Observable<Integer>() { @Override protected void subscribeActual(Observer<? super Integer> observer) { Log.e(TAG, "onExceptionResumeNext subscribeActual"); observer.onNext(3); observer.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onExceptionResumeNext subscribeActual 10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 3 10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onComplete

将 emitter.onError(new Exception("Normal Exception")) 改为 emitter.onError(new Error("Normal Exception"));

异常将不会被捕获

10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onError : Normal Exception

1.15、retry()

如果出现错误事件,则会重新发送所有事件序列指定次数

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Error("Normal Exception")); } }).retry(2).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 06:55:17.273 10591-10591/? E/MainActivity: onSubscribe 10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1 10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2 10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1 10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2 10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1 10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2 10-06 06:55:17.273 10591-10591/? E/MainActivity: onError : Normal Exception

1.16、retryUntil()

出现错误事件之后,可以通过此方法判断是否继续发送事件

private int index = 1; Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Exception("Normal Exception")); } }).retryUntil(new BooleanSupplier() { @Override public boolean getAsBoolean() throws Exception { Log.e(TAG, "getAsBoolean"); return index == 7; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); index++; } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean 10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onError : Normal Exception

1.17、repeat()

以指定次数重复发送被观察者的事件

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).repeat(2).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 07:38:47.680 12155-12155/? E/MainActivity: onSubscribe 10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 1 10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 2 10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 1 10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 2 10-06 07:38:47.690 12155-12155/? E/MainActivity: onComplete

1.18、repeatWhen()

返回一个新的被观察者来决定是否重复发送事件。如果新的被观察者返回 onComplete 或者 onError 事件,则旧的被观察者不会发送事件。如果新的被观察者返回其他事件,则旧的观察者会发送事件

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception { // return Observable.empty(); // return Observable.error(new Exception("Normal Exception")); // return Observable.just(1); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } });

以上三种情况的输出结果分别是

10-06 14:29:05.641 20921-20921/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 14:29:05.641 20921-20921/leavesc.hello.rxjavademo E/MainActivity: onComplete 10-06 14:29:36.150 21027-21027/? E/MainActivity: onSubscribe 10-06 14:29:36.150 21027-21027/? E/MainActivity: onError : Normal Exception 10-06 14:30:33.220 21135-21135/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 14:30:33.220 21135-21135/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 14:30:33.220 21135-21135/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 14:30:33.220 21135-21135/leavesc.hello.rxjavademo E/MainActivity: onNext : 3 10-06 14:30:33.220 21135-21135/leavesc.hello.rxjavademo E/MainActivity: onComplete

1.19、subscribeOn() & observeOn()

subscribeOn() 用于指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效

observeOn() 用于指定观察者的线程,每指定一次就会生效一次

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.e(TAG, "Observable Thread Name: " + Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); Log.e(TAG, "Observer Thread Name: " + Thread.currentThread().getName()); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext : " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError : " + e.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-06 07:54:02.839 12629-12629/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 07:54:02.839 12629-12629/leavesc.hello.rxjavademo E/MainActivity: Observer Thread Name: main 10-06 07:54:02.839 12629-12643/leavesc.hello.rxjavademo E/MainActivity: Observable Thread Name: RxNewThreadScheduler-1 10-06 07:54:02.859 12629-12629/leavesc.hello.rxjavademo E/MainActivity: onNext : 1 10-06 07:54:02.869 12629-12629/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 07:54:02.869 12629-12629/leavesc.hello.rxjavademo E/MainActivity: onComplete 调度器作用Schedulers.computation( )用于使用计算任务,如事件循环和回调处理Schedulers.immediate( )当前线程Schedulers.io( )用于 IO 密集型任务,如果异步阻塞 IO 操作。Schedulers.newThread( )创建一个新的线程AndroidSchedulers.mainThread()Android 的 UI 线程,用于操作 UI。

 

 

 

转载请注明原文地址: https://www.6miu.com/read-5030348.html

最新回复(0)