RxJava 1.x 笔记:变换型操作符

xiaoxiao2021-02-28  103

在写这几篇 RxJava 笔记时,发现官方文档很久都没有更新啊。

一些前辈两年前写的学习笔记内容跟现在也基本一致,RxJava 2.x 的文档也基本没有,不知道是不是缺实习生。

本文内容为 RxJava 官方文档 学习笔记 作者:shixinzhang

读完本文你将了解:

变换型操作符 Buffer buffercountbuffercount skipbufferbufferClosingSelectorbufferboundary FlatMap flatMapconcatMapswitchMap GroupByMap mapcast Scan scanaccumulatorscaninitialValue accumulator Window 代码地址Thanks

变换型操作符

变换型操作符可以将 Observable 发射的数据进行变换。

Buffer

Buffer 可以周期性地将 Observable 发射的数据聚成一堆发出去,而不是一个个发射。

Buffer 即“缓存”,它可以将一个个发射数据的 Observable A 转换成周期性地发射元素缓存集合的 Observable B。

不同语言 Buffer 的实现有很多种,它们在选择缓存的方式上有所不同。

注意,如果源 Observable 发射了 onError 事件,转换后的 Observable 会直接发射 onError 事件,即使前面还有缓存事件没有发射。

Window 操作符和 Buffer 很相似,不同之处在于,Window 会将每波收集的缓存数据在发射前保存到独立的 Observable 中,而不是以一个数据结构的方式发射出去。

RxJava 中有多种 Buffer 实现。

buffer(count)

buffer(count) 以 List 的形式发射非重叠的缓存,每次发射至多 count 个数据。

public final Observable<List<T>> buffer(int count) { return buffer(count, count); }

使用例子:

private void transformingWithBuffer() { Observable.range(2, 10) .buffer(3) .subscribe(getPrintSubscriber()); }

运行结果:

可以看到,经过 buffer() 后,源 Observable 发射的数据会以 3 个为缓存,缓存满了会以数组的形式发射出去。

buffer(count, skip)

前面看到, buffer(count) 的实现也是调用 buffer(count, skip),只不过它的 skip 就等于 count:

public final Observable<List<T>> buffer(int count) { return buffer(count, count); }

buffer(count, skip) 的作用是:以 List 的形式发射可能重叠的缓存(当 skip < count 时就会重叠;skip > count 时会有遗漏),从源 Observable 的第一个数据开始,每次至多缓存 count 个数据,然后就发射,下一次缓存时,跳过 skip 个数据,依次重复:

public final Observable<List<T>> buffer(int count, int skip) { return lift(new OperatorBufferWithSize<T>(count, skip)); }

关于 lift 我们后续介绍。

使用例子:

private void transformingWithBufferSkip() { Observable.range(2, 10) .buffer(3, 4) .subscribe(this.<List<Integer>>getPrintSubscriber()); }

运行结果:

可以看到,其实就是缓存 count 个数据然后发射出去,然后从后面 skip - count 个数据开始缓存、发射。

buffer(bufferClosingSelector)

当订阅到源 Observable 后,buffer(bufferClosingSelector) 会收集源发射的数据到 List 中,同时调用 bufferClosingSelector 生成一个新的 Observable。

当新 Observable 发射一个 TClosing 对象后,buffer 会把缓存的 List 发射出去,然后重复这个过程,直到源 Observable 结束。

public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) { return lift(new OperatorBufferWithSingleObservable<T, TClosing>(bufferClosingSelector, 16)); }

使用例子:

private int emitCount; private void transformingWithBufferClosingSelector() { Observable.range(2, 10) .buffer(new Func0<Observable<?>>() { @Override public Observable<?> call() { emitCount ++; Log.d(TAG, "emitCount:" + emitCount); return Observable.timer(3, TimeUnit.MILLISECONDS); } }) .subscribe(this.<List<Integer>>getPrintSubscriber()); }

运行结果:

可以看到,我们创建了一个 3 秒后发射数据的 Observable,3 秒后所有数据已经缓存完毕,因此参数里的 call() 只调用了一次。

buffer(boundary)

buffer(boundary) 的作用是,使用参数 boundary Observable 作为源 Observable 的监视器,发射不重叠的数据。

每次源 Observable 发射一个数据,它就把数据缓存到 List 中,等到 boundary Observable 发射数据时,buffer 就会把之前缓存的数据发射出去,以此重复。

这里的 boundary 就相当于一个提示发射的边界。

public final <B> Observable<List<T>> buffer(Observable<B> boundary) { return buffer(boundary, 16); } public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity) { return lift(new OperatorBufferWithSingleObservable<T, B>(boundary, initialCapacity)); }

使用例子:

private void transformingWithBufferBoundary() { Observable.interval(1, TimeUnit.SECONDS) .buffer(Observable.interval(3, TimeUnit.SECONDS)) .subscribe(this.<List<Long>>getPrintSubscriber()); }

我们使用 interval() 创建一个每隔一秒发射递增整数序列的源 Observable,监视器是每隔 3 秒发射的 Observable,因此正常情况下,buffer 会收集源发射的整数到 List 中,每隔 3 秒发射一次。

运行结果:

可以看到,的确跟我们想的一样。

FlatMap

FlatMap 可以把源 Observable 发射的数据转换成多个 Observable,然后把这些 Observable 发射的数据放到一个 Observable 中。

FlatMap 操作符使用一个指定的函数对源 Observable 发射的每一项数据执行变换操作、返回一个新的 Observable,然后合并这些 Observables 发射的数据。

这个操作符的使用场景还是很多的,比如服务端返回的数据太复杂,我们只用到其中一部分数据,就可以使用 FlatMap 将数据取出来。

flatMap

注意:FlatMap 会将最后的数据混合,因此顺序可能会改变。

RxJava 中对应的实现是 flatMap():

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) { if (getClass() == ScalarSynchronousObservable.class) { return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); } return merge(map(func)); }

flatMap() 的输入是发射 T 类型的 Observable,输出是发射 R 类型的 Observable。

可以看到最后调用了 merge,我们后续介绍它。

使用例子:

假设现在有嵌套的几种数据类型:年级、班级、学生名称,每个班级有多个学生、每个年级有多个班级,他们的结构是这样的:

//班级 public class Clazz { private String name; private List<String> studentNameList; } //年级 public class Grade { private String name; private List<Clazz> classList; }

现在我们有一个年级的数据,想要拿到这个年级里所有学生的名称,以前的做法是两个 for 循环(遍历每个班、再遍历每个同学)把学生名称数据取出来放到一个单独的 List 里,现在用 RxJava 可以这样写:

private void transformingWithFlatMap() { //数据源 Clazz secondClass = new Clazz("四年级二班", Arrays.asList("张三", "李四", "王五")); Clazz thirdClass = new Clazz("四年级三班", Arrays.asList("赵六", "喜洋洋", "灰太狼")); Grade forthGrade = new Grade("四年级", Arrays.asList(secondClass, thirdClass)); Observable.just(forthGrade) .flatMap(new Func1<Grade, Observable<Clazz>>() { @Override public Observable<Clazz> call(final Grade grade) { return Observable.from(grade.getClassList()); //先拿到年级里的班级数据,合并成一个班级 List } }) .flatMap(new Func1<Clazz, Observable<String>>() { @Override public Observable<String> call(final Clazz clazz) { return Observable.from(clazz.getStudentNameList()); //再从每个班级里拿出所有学生名称数据,合并成一个 List } }) .subscribe(this.getPrintSubscriber()); }

两个 flatMap 搞定,逻辑上比循环套循环清晰多了。

运行结果:

注意:如果 flatMap 产生的任何一个 Observable 调用 onError 异常终止了,最终合并的 Observable 会立即调用 onError 并终止。

concatMap

在一些实现里,有另外一种类似的操作符 ConcatMap,功能和 FlatMap 类似,但是会按严格的顺序将数据拼接在一起,不会改变顺序。

concatMap 类似于简单版本的 flatMap,但是它按次序连接而不是合并那些生成的 Observables。

public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) { if (this instanceof ScalarSynchronousObservable) { ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this; return scalar.scalarFlatMap(func); } return unsafeCreate(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE)); }

使用和 flatMap 差不多:

public static Grade getGradeData() { //数据源 Clazz secondClass = new Clazz("四年级二班", Arrays.asList("张三", "李四", "王五")); Clazz thirdClass = new Clazz("四年级三班", Arrays.asList("赵六", "喜洋洋", "灰太狼")); Grade forthGrade = new Grade("四年级", Arrays.asList(secondClass, thirdClass)); return forthGrade; } private void transformingWithConcatMap() { Observable.just(DataCreator.getGradeData()) .concatMap(new Func1<Grade, Observable<Clazz>>() { @Override public Observable<Clazz> call(final Grade grade) { return Observable.from(grade.getClassList()); } }) .concatMap(new Func1<Clazz, Observable<?>>() { @Override public Observable<?> call(final Clazz clazz) { return Observable.from(clazz.getStudentNameList()); } }) .subscribe(this.getPrintSubscriber()); }

运行结果:

switchMap

switchMap 也可以像 flatMap 一样处理 Observable,将处理后的数据合并成一个 Observable。

不同之处在于它的 “喜新厌旧”:每次源 Observable 发射一个新的数据时,它会解除订阅之前发射的数据的 Observable,转而订阅新的数据。

就像上面的图一样,如果源 Observable 发射多个定时任务,不管前一个定时任务执行了多少,只要后一个定时任务开始执行,就不再接收前面的任务的结果了。举个例子:

private void transformingWithSwitchMap() { Observable.just(Observable.timer(4, TimeUnit.SECONDS), Observable.range(2, 10)) .switchMap(new Func1<Observable<? extends Number>, Observable<?>>() { @Override public Observable<?> call(final Observable<? extends Number> observable) { return observable; } }) .subscribe(this.getPrintSubscriber()); }

上面代码中,我们的源 Observable 的数据是两个 Observable,第一个会在 4 秒后发射一个 0,第二个会立即发射从 2 往后的 10 个整数。

根据 switchMap 的特性,第一个 Observable 还没发射时第二个已经发射了,于是下游的订阅者解除对第一 Observable 的订阅,也就收不到 4 秒后发射的 0 了。

运行结果:

可以看到,的确没有收到 0 。

GroupBy

GroupBy 会将源 Observable 转换成多个 Observable,每个 Observable 发射源 Observable 的一部分数据。

数据项由哪一个 Observable 发射是由一个判定函数决定的,这个函数会给每一项数据指定一个 Key,Key相同的数据会被同一个 Observable 发射。

RxJava 中对应的实现是 groupBy():

public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) { return lift(new OperatorGroupBy<T, K, T>(keySelector)); }

groupBy() 返回的是一个 GroupedObservable,Observable 的子类,它有一个额外的方法 getKey(),这个 Key 就是经过计算、用于将数据分组到指定的 Observable 的值。

使用例子:

public static List<People> getPeopleData() { return Arrays.asList(new People(15, "大熊"), new People(13, "静安"), new People(15, "胖虎"), new People(14, "多来A梦"), new People(13, "拭心")); } private void transformingWithGroupBy() { Observable.from(DataCreator.getPeopleData()) .groupBy(new Func1<People, Integer>() { @Override public Integer call(final People people) { return people.getAge(); } }) .subscribe(new Action1<GroupedObservable<Integer, People>>() { @Override public void call(final GroupedObservable<Integer, People> integerPeopleGroupedObservable) { integerPeopleGroupedObservable.buffer(2) .subscribe(getPrintSubscriber()); } }); }

我们创建了 5 个 People 对象,这个类有两个属性:age 和 name,然后在 groupBy() 中根据 age 分组,这样就可以得到一组发射 GroupedObservable 的 Observable,然后我们把它们两两一组,打印出来。

运行结果:

可以看到,的确是按 age 分了组。

注意:groupBy() 将源 Observable 分解为多个发射 GroupedObservable 的 Observable ,一旦有订阅,每个 GroupedObservable 就开始缓存发射的数据。 如果你对某个 GroupedObservable 没有兴趣却不进行处理,这个缓存可能形成一个潜在的内存泄露。 因此,你应该使用像 take(0) 这样会丢弃自己的缓存的操作符。

Map

Map 操作符的作用是:对源 Observable 发射的每个数据都进行一个函数处理。

map

RxJava 中对应的实现有 map():

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return unsafeCreate(new OnSubscribeMap<T, R>(this, func)); }

使用起来也很方便:

private void transformingWithMap() { Observable.range(1, 5) .map(new Func1<Integer, Integer>() { @Override public Integer call(final Integer integer) { return integer * 3; } }) .subscribe(this.<Integer>getPrintSubscriber()); }![这里写图片描述](http://img.blog.csdn.net/20170717224345469?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvdTAxMTI0MDg3Nw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)

运行结果:

cast

cast() 是 map() 的特殊版本,它的作用是将发射的数据强转成指定的类型。

public final <R> Observable<R> cast(final Class<R> klass) { return lift(new OperatorCast<T, R>(klass)); }

使用也很简单:

private void transformingWithCast() { Observable.range(1, 5) .cast(String.class) .subscribe(this.<String>getPrintSubscriber()); }

运行结果:

其实就跟强转一样,类型不一致会报错。

Scan

Scan 的作用是扫描、累积。

它可以将每次发射的数据都进行指定的函数计算,计算的结果作为参数参与下一次计算。

RxJava 中有两种实现。

scan(accumulator)

第一种是接收一个 Func2 作为参数:

public final Observable<T> scan(Func2<T, T, T> accumulator) { return lift(new OperatorScan<T, T>(accumulator)); }

使用例子:

private void transformingWithScan() { Observable.from(Arrays.asList(6, 4, 1, 5, 7)) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(final Integer lastResult, final Integer newItem) { return lastResult + newItem; } }) .subscribe(this.<Integer>getPrintSubscriber()); }

运行结果:

scan(initialValue, accumulator)

第二种是多了一个初始值 initialValue,它会参与第一次运算。

public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) { return lift(new OperatorScan<R, T>(initialValue, accumulator)); }

使用例子:

private void transformingWithScan2() { Observable.from(Arrays.asList(6, 4, 1, 5, 7)) .scan(1, new Func2<Integer, Integer, Integer>() { @Override public Integer call(final Integer lastResult, final Integer newItem) { return lastResult + newItem; } }) .subscribe(this.<Integer>getPrintSubscriber()); }

运行结果:

可以看到,和前面的区别就是多发射了个初始值,结果多了 1 。

Window

Window 的作用是定期将源 Observable 发射的一部分数据切分为 一个 Observable 窗口,然后发射这个窗口。

Window 和 Buffer 非常相似,但它发射的不是源 Observable 数据的缓存包,而是一系列 Observable。

和 Buffer 一样,Window 也有很多变体,每一种都有自己分割源数据的方法:

使用方式和 Buffer 基本一致,这里我们只举一个例子:

private void transformingWithWindow() { Observable.range(1, 10) .window(3) .subscribe(new Action1<Observable<Integer>>() { @Override public void call(final Observable<Integer> integerObservable) { integerObservable.subscribe(getPrintSubscriber()); } }); }

运行结果:

至此变换型操作符我们基本了解完了,已经成功了一小半,加油!

代码地址

发表自 张拭心的博客

Thanks

http://reactivex.io/documentation/operators.html https://github.com/mcxiaoke/RxDocs/blob/master/Operators.md https://github.com/mgp/effective-rxjava/blob/master/items/understand-switch-map.md

转载请注明原文地址: https://www.6miu.com/read-79795.html

最新回复(0)