Rxjava2.0 再探---操作符

xiaoxiao2021-02-27  266

1.前言

上篇主要介绍了Rxjava2.0的Schedulers以及简单的操作符,这篇主要介绍Rxjava的经常使用的操作符。 上篇链接:Rxjava2.0 初探 http://blog.csdn.net/mr_zhang0101/article/details/74639971

2.Rxjava的操作符

Rxjava的操作符数量庞大,此篇介绍一些常用的操作符 先看下这些操作符的变换功效:

变换操作符: map:在接收事件时,变换事件的类型,比如int ---> String flatMap:将一个事件包装成一个Observable继续发送 concatMap:和flatMap类似,可以保证有序 scanWith:将发送的每一个事件进行操作,观察者接收操作后的事件 过滤操作符: filter:会对要发射的事件进行测试,只有通过测试的数据才会被发射 take:只发射前面的部分数据 辅助操作符: do:只有执行相应的时候会被调用(下文会详细介绍) delay:延迟一段指定的时间再发射Observable的数据。 结合操作符: zip:将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。

2.1 map变换

我们在发送事件可能需要对这些事件就行一些加工,变换,这个时候就可以利用map就行操作了,map是将事件依次进行加工。

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "事件"+integer*3; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "accept: "+s); } });

其中map接收的Function泛型的一个参数就是原事件的类型,第二个参数就是要变换的事件类型。运行结果如下:

D/HANDLERACTIVITY: accept: 事件3 D/HANDLERACTIVITY: accept: 事件6 D/HANDLERACTIVITY: accept: 事件9

这里进行了将发送的Integer*3,并变成String类型。

2.2 flatMap变换

map是将一个事件加工成另外一个类型的事件(也可以变换成同类型的事件),而flatMap是将事件包装成一个Observable继续发送。

Observable.just(1,2,3,4,5,6) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { final List<String> list = new ArrayList<>(); list.add("事件" + integer); //fromIterable把一个集合的每个item依次发送 //delay延迟发送,辅助操作符,之后会介绍 return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "accept: "+s); } });

运行的结果:

D/HANDLERACTIVITY: accept: 事件1 D/HANDLERACTIVITY: accept: 事件3 D/HANDLERACTIVITY: accept: 事件5 D/HANDLERACTIVITY: accept: 事件2 D/HANDLERACTIVITY: accept: 事件4 D/HANDLERACTIVITY: accept: 事件6

可以发现,flatMap变换后,不能保证事件还是有序得发送,如果需要flatMap的这种效果,又想保证有序发送,就需要concatMap变换了

2.3 concatMap变换

将之前flatMap变换的代码中只将flatMap换成concatMap

Observable.just(1,2,3,4,5,6) .concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { final List<String> list = new ArrayList<>(); list.add("事件" + integer); //fromIterable把一个集合的每个item依次发送 //delay延迟发送 return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "accept: "+s); } });

运行如下:

D/HANDLERACTIVITY: accept: 事件1 D/HANDLERACTIVITY: accept: 事件2 D/HANDLERACTIVITY: accept: 事件3 D/HANDLERACTIVITY: accept: 事件4 D/HANDLERACTIVITY: accept: 事件5 D/HANDLERACTIVITY: accept: 事件6

不难看出concatMap保证了有序

2.3 scanWith变换

将发送的每一个事件进行操作,接收操作后的事件

Observable.just(1,2,3) .scanWith(new Callable<Integer>() { @Override public Integer call() throws Exception { Log.d(TAG, "call: "); //接收的第一个事件 return 5; } }, new BiFunction<Integer, Integer, Integer>() { //BiFunction代替Rxjava1.0中的Func2 @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.d(TAG, "apply: "+"integer=="+integer+";integer2=="+integer2); return integer*integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept: "+integer); } });

运行结果:

D/HANDLERACTIVITY: call: D/HANDLERACTIVITY: accept: 5 D/HANDLERACTIVITY: apply: integer==5;integer2==1 D/HANDLERACTIVITY: accept: 5 D/HANDLERACTIVITY: apply: integer==5;integer2==2 D/HANDLERACTIVITY: accept: 10 D/HANDLERACTIVITY: apply: integer==10;integer2==3 D/HANDLERACTIVITY: accept: 30

从结果可以看出,scanWith操作符的第一个参数Callable,提供一个会第一次接收到的事件;第二个参数BiFunction中的apply方法第一个参数就是上次操作后的结果,第二个参数就是item的值

2.4 filter过滤

filter会对要发射的事件进行测试,只有通过测试的数据才会被发射,也就是把事件过滤一次。

Observable.just(1, 2, 3, 4, 5, 6) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return (integer<4); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept: "+integer); } });

运行结果:

D/HANDLERACTIVITY: accept: 1 D/HANDLERACTIVITY: accept: 2 D/HANDLERACTIVITY: accept: 3

另外,ofType(Class): filter操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。

Observable.just(new Integer(1), new String("2"),new Long(3)) .ofType(Integer.class) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept: "+integer); } });

运行结果:

D/HANDLERACTIVITY: accept: 1

2.5 take过滤

take(int):只发射前面的N项数据,然后发射完成通知,忽略剩余的数据。 takeLast(int): 只发射原始Observable发射的后N项数据,忽略之前的数据。 takeLastBuffer: 它和takeLast类似,唯一的不同是它把所有的数据项收集到一个List再发射 这个比较简单,只举一个take的例子

Observable.just(1, 2, 3, 4, 5, 6) .take(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept: "+integer); } });

运行结果:

D/HANDLERACTIVITY: accept: 1 D/HANDLERACTIVITY: accept: 2 D/HANDLERACTIVITY: accept: 3

2.6 do辅助

do操作符包括doOnEach、doOnNext、doOnError等 doOnEach:为 Observable注册这样一个回调,当Observable没发射一项数据就会调用它一次,包括onNext、onError和 onCompleted doOnNext:只有执行onNext的时候会被调用 doOnError:只有执行onError的时候会被调用

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onError(new Throwable("error")); } }) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "doOnNext: " + integer); } }) .doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.d(TAG, "doOnEach: " + integerNotification.getValue()); } }) .doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "doOnError: " + throwable.getMessage()); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "); } @Override public void onNext(Integer value) { Log.d(TAG, "subscribe:onNext: "+value); } @Override public void onError(Throwable e) { Log.d(TAG, "subscribe:onError: "+e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "subscribe:onComplete: "); } });

运行结果:

D/HANDLERACTIVITY: onSubscribe: D/HANDLERACTIVITY: doOnNext: 1 D/HANDLERACTIVITY: doOnEach: 1 D/HANDLERACTIVITY: subscribe:onNext: 1 D/HANDLERACTIVITY: doOnNext: 2 D/HANDLERACTIVITY: doOnEach: 2 D/HANDLERACTIVITY: subscribe:onNext: 2 D/HANDLERACTIVITY: doOnNext: 3 D/HANDLERACTIVITY: doOnEach: 3 D/HANDLERACTIVITY: subscribe:onNext: 3 D/HANDLERACTIVITY: doOnEach: null D/HANDLERACTIVITY: doOnError: error D/HANDLERACTIVITY: subscribe:onError: error

可以看出do操作符不会拦截事件的传递,观察者还是会接收到事件

2.7 delay辅助

delay的意思就是延迟,这个操作符会延迟一段指定的时间再发射Observable的数据。

final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { Log.d(TAG, "emit:" + sdf.format(new Date())); e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribeOn(Schedulers.computation()).delay(2, TimeUnit.SECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "delay accept:" + sdf.format(new Date()) + "->" + integer); } });

运行结果:

D/HANDLERACTIVITY: emit:10:17:18 D/HANDLERACTIVITY: delay accept:10:17:20->1 D/HANDLERACTIVITY: delay accept:10:17:20->2 D/HANDLERACTIVITY: delay accept:10:17:20->3

2.8 zip结合

Zip将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。它按照严格的顺序应用这个函数。接收到的数据和发射少的那个一样多。 先写两个发射源a和b:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete1"); emitter.onComplete(); } }); Observable<String> ob2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "emit x"); emitter.onNext("x"); Log.d(TAG, "emit y"); emitter.onNext("y"); Log.d(TAG, "emit complete2"); emitter.onComplete(); } });

zip结合接收事件

Observable.zip(ob1, ob2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { //两个发射的数据的结合方式 return integer +"&"+s; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String value) { Log.d(TAG, "onNext: " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });

运行结果:

D/HANDLERACTIVITY: onSubscribe D/HANDLERACTIVITY: emit 1 D/HANDLERACTIVITY: emit 2 D/HANDLERACTIVITY: emit 3 D/HANDLERACTIVITY: emit complete1 D/HANDLERACTIVITY: emit x D/HANDLERACTIVITY: onNext: 1&x D/HANDLERACTIVITY: emit y D/HANDLERACTIVITY: onNext: 2&y D/HANDLERACTIVITY: emit complete2 D/HANDLERACTIVITY: onComplete

从结果可以看出,由于在一个线程中,所以发射源a发射完数据后,发射源b才开始发射,接收的个数和发射源b发射的个数一样(个数较a少),那么换成不同的线程,怎么发射的呢?

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); //为了排除程序执行过快线程未切换,这里适当添加一些停顿 Thread.sleep(200); emitter.onNext(2); Log.d(TAG, "emit 3"); Thread.sleep(200); emitter.onNext(3); Log.d(TAG, "emit complete1"); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable<String> ob2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "emit x"); emitter.onNext("x"); Thread.sleep(200); Log.d(TAG, "emit y"); emitter.onNext("y"); Thread.sleep(200); Log.d(TAG, "emit complete2"); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable.zip(ob1, ob2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { //两个发射的数据的结合方式 return integer +"&"+s; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String value) { Log.d(TAG, "onNext: " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });

执行结果:

D/HANDLERACTIVITY: onSubscribe D/HANDLERACTIVITY: emit 1 D/HANDLERACTIVITY: emit 2 D/HANDLERACTIVITY: emit x D/HANDLERACTIVITY: onNext: 1&x D/HANDLERACTIVITY: emit 3 D/HANDLERACTIVITY: emit y D/HANDLERACTIVITY: onNext: 2&y D/HANDLERACTIVITY: emit complete1 D/HANDLERACTIVITY: emit complete2 D/HANDLERACTIVITY: onComplete

从结果可以看出,只要组成一对,就会接收到一个组合的数据。 操作符就介绍这些常用的,如果想了解更多,请阅读:http://blog.csdn.net/u010163442/article/category/6270573 Demo地址:https://github.com/Mr-zhang0101/Rxjava2.0Test

转载请注明原文地址: https://www.6miu.com/read-10855.html

最新回复(0)