RxJava 从入门到实践(二: 核心功能及操作符的使用)

xiaoxiao2021-02-28  29

一:变换

RxJava 提供了对事件序列进行变换的支持,所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列

1 首先看一个需求:输入字符串 要其返回哈希值

private void testRxJava(){ rx.Observable.just("hello RxJava") .map(new Func1<String,Integer>() {//输入类型 @Override public Integer call(String s) { return s.hashCode(); //返回类型 } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer o) { Log.d(TAG,">>"+o); } }); }

<1>这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法

<2>map() 方法将参数中的 String 对象转换成一个 Integer 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了  Integer

2 假设有一个数据结构[学生] 需要打印一组学生的名字

package rxjava.itcast.com.rxjavademo; import java.util.List; /** * Created by lenovo on 2018/3/10. */ public class Student { public String name; public String age; public List<Course> course = null; public Student(String name, String age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } public List<Course> getCourse() { return course; } public void setCourse(List<Course> course) { this.course = course; } }   public class Course { public String cname; public String grade; public Course(String cname, String grade) { this.cname = cname; this.grade = grade; } } private void testRxJava(){ rx.Observable.from(initData()).map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.d(TAG,"--item:"+s); } }); }3.打印出每个学生所需要修的所有课程的名称 实现方式一 是在观察者中通过循环来进行打印 如下 private void testRxJava(){ Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(Student o) { List<Course> courses = o.getCourse(); for (Course item:courses){ Log.d(TAG,"--itms="+item.cname); } } }; rx.Observable.from(initData()).subscribe(subscriber); }

那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象,用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化,就需要用 flatMap() 了

private void testRxJava7(){ rx.Observable.from(initData()).flatMap(new Func1<Student, rx.Observable<Course>>() { @Override public rx.Observable<Course> call(Student student) { return rx.Observable.from(student.getCourse()); } }).subscribe(new Action1<Course>() { @Override public void call(Course s) { Log.d(TAG,"itms7:"+s.cname); } }); }

二:操作符

RxJava中提供了大量不同种类,不同场景的Operators(操作符),RxJava的强大性就来自于它所定义的操作符。主要分类:

1.创建操

   其实我们创建Observable的create(),from(),just()等方法,都属于创建操作符

defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable

Observable.defer(new Func0<Observable<Object>>() { @Override public Observable<Object> call() { //创建并返回新的Observable, return null; } });

repeat( ) — 创建一个重复发射指定数据或数据序列的Observable

Observable.just(1).repeat(3).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("RxJava", String.valueOf(integer)); } });

range( ) — 创建一个发射指定范围的整数序列的Observable

Observable.range(1,4).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("RxJava", String.valueOf(integer)); } });

interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable

Observable.interval(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() { @Override public void call(Long i) { Log.d("RxJava", String.valueOf(i)); } });

timer( ) — 创建一个在给定的延时之后发射单个数据的Observable

Observable.timer(3, TimeUnit.SECONDS).subscribe(new Action1<Long>() { @Override public void call(Long i) { Log.d("RxJava", i); } });

结果:3秒后,发射了一个包含数字0的Observable

03-10 22:36:38.204 5858-5875/rxjava.itcast.com.rxjavademo D/RxJava: 0

2.变换操作

scan( ) — 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值

rx.Observable.just(1, 2, 3) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer+integer2; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("RxJava", String.valueOf(integer)); } });

结果:将自定义函数应用于数据序列,并将这个函数的结果作为函数下一次的参数1使用,1+0=1,1+2=3 ,3+3=6

03-10 22:42:35.032 5942-5942/rxjava.itcast.com.rxjavademo D/RxJava: 1 03-10 22:42:35.036 5942-5942/rxjava.itcast.com.rxjavademo D/RxJava: 3 03-10 22:42:35.040 5942-5942/rxjava.itcast.com.rxjavademo D/RxJava: 6

groupBy( ) — 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据

rx.Observable.just(1, 2, 3, 4) .groupBy(new Func1<Integer,Integer>() { @Override public Integer call(Integer integer) { return integer+1; } }) .subscribe(new Action1<GroupedObservable<Integer, Integer>>() { @Override public void call(final GroupedObservable<Integer, Integer> groupedObservable) { groupedObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("RxJava", "key:" + groupedObservable.getKey() + ",value:" + integer); } }); } });

结果:在第一个func1函数中,设置key,最后生成一个Observable集合,并把每一个groupedObservable,并依次发射出去

03-10 22:57:37.313 6054-6054/rxjava.itcast.com.rxjavademo D/RxJava: -onClick:start test- 03-10 22:57:37.365 6054-6054/rxjava.itcast.com.rxjavademo D/RxJava: key:2,value:1 03-10 22:57:37.369 6054-6054/rxjava.itcast.com.rxjavademo D/RxJava: key:3,value:2 03-10 22:57:37.373 6054-6054/rxjava.itcast.com.rxjavademo D/RxJava: key:4,value:3 03-10 22:57:37.385 6054-6054/rxjava.itcast.com.rxjavademo D/RxJava: key:5,value:4

buffer( ) — 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个

rx.Observable.just(1, 2, 3, 4,5,6,7) .buffer(2, 3) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("RxJava", integers + ""); } });

结果:buffer()有两个参数count和skip,count指定List的大小,skip指定每次发射一个List需要跳过几个数;buffer(2, 3):每组2个数,每次跳过(3-2)个数,结果如下:

03-10 23:03:44.796 6265-6265/rxjava.itcast.com.rxjavademo D/RxJava: [1, 2] 03-10 23:03:44.800 6265-6265/rxjava.itcast.com.rxjavademo D/RxJava: [4, 5] 03-10 23:03:44.808 6265-6265/rxjava.itcast.com.rxjavademo D/RxJava: [7]

window( ) — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项

rx.Observable.just(1, 2, 3, 4, 5, 6, 7) .window(2, 2) .subscribe(new Action1<rx.Observable<Integer>>() { @Override public void call(rx.Observable<Integer> observable) { Log.d("RxJava", "window" ); observable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("RxJava", integer + ""); } }); } });

结果:window()操作符和buffer()类似,都是缓存一段数据集合,再打包发射出去

03-11 20:32:32.445 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: window 03-11 20:32:32.449 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: 1 03-11 20:32:32.453 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: 2 03-11 20:32:32.453 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: window 03-11 20:32:32.453 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: 3 03-11 20:32:32.453 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: 4 03-11 20:32:32.453 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: window 03-11 20:32:32.453 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: 5 03-11 20:32:32.453 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: 6 03-11 20:32:32.453 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: window 03-11 20:32:32.457 1340-1340/rxjava.itcast.com.rxjavademo D/RxJava: 7

3.过滤操作

filter( ) — 过滤数据

rx.Observable.just(1, 2, 3, 4, 5, 6) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { //从数组中,筛选偶数 return integer % 2 == 0; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer i) { Log.d("RxJava", String.valueOf(i)); } }); 结果: 03-11 20:37:05.737 1422-1422/rxjava.itcast.com.rxjavademo D/RxJava: 2 03-11 20:37:05.741 1422-1422/rxjava.itcast.com.rxjavademo D/RxJava: 4 03-11 20:37:05.749 1422-1422/rxjava.itcast.com.rxjavademo D/RxJava: 6

takeLast( ) — 只发射最后的N项数据

rx.Observable.just(1, 2, 3, 4, 5, 6) .takeLast(3) //取最后3项数据 .subscribe(new Action1<Integer>() { @Override public void call(Integer i) { Log.d("RxJava", String.valueOf(i)); } });

结果:

03-11 20:38:51.109 1490-1490/rxjava.itcast.com.rxjavademo D/RxJava: 4 03-11 20:38:51.113 1490-1490/rxjava.itcast.com.rxjavademo D/RxJava: 5 03-11 20:38:51.117 1490-1490/rxjava.itcast.com.rxjavademo D/RxJava: 6

last( ) — 只发射最后的一项数据

rx.Observable.just(1, 2, 3, 4, 5, 6) .last() .subscribe(new Action1<Integer>() { @Override public void call(Integer i) { Log.d("RxJava", String.valueOf(i)); } });

结果

03-11 20:40:46.861 1558-1558/rxjava.itcast.com.rxjavademo D/RxJava: 6

skip( ) — 跳过开始的N项数据

rx.Observable.just(1, 2, 3, 4, 5, 6) .skip(3) .subscribe(new Action1<Integer>() { @Override public void call(Integer i) { Log.d("RxJava", String.valueOf(i)); } });

take( ) — 只发射开始的N项数据

Observable.just(1, 2, 3, 4, 5, 6) .take(3) .subscribe(i -> { Log.d("RxJava", String.valueOf(i)); });

first( ) and takeFirst( ) — 只发射第一项数据,或者满足某种条件的第一项数据

Observable.just(1, 2, 3, 4, 5, 6) .first() .subscribe(i -> { Log.d("RxJava", String.valueOf(i)); });

elementAt( ) — 发射第N项数据

Observable.just(1, 2, 3, 4, 5, 6) .elementAt(3) .subscribe(i -> { Log.d("RxJava", String.valueOf(i)); });

sample( ) or throttleLast( ) — 定期发射Observable最近的数据

Observable.interval(1,TimeUnit.SECONDS) .sample(4, TimeUnit.SECONDS) .subscribe(i -> { Log.d("RxJava", String.valueOf(i)); });

结果:interval()每隔一秒发送整数序列,sample()每隔4秒,获取Observable的数据,结果如下:

-16618/keye.com.rxjavaobserver D/RxJava: 3 -16618/keye.com.rxjavaobserver D/RxJava: 6 -16618/keye.com.rxjavaobserver D/RxJava: 10 -16618/keye.com.rxjavaobserver D/RxJava: 14 -16618/keye.com.rxjavaobserver D/RxJava: 18 -16618/keye.com.rxjavaobserver D/RxJava: 22

debounce( ) — 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据

Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { for (int i = 1; i < 10; i++) { subscriber.onNext(i); Thread.sleep(i * 1000); //每次发送,延迟i*1秒 } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }) .subscribeOn(Schedulers.newThread()) .debounce(3000, TimeUnit.MILLISECONDS) //3秒没有数据,则发送 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("RxJava", String.valueOf(integer)); } });

结果:前3个数延迟短,没有触发debounce()操作符,第4个数延迟3秒,debounce()生效

30534-30550/keye.com.rxjavaobserver D/RxJava: 4 30534-30550/keye.com.rxjavaobserver D/RxJava: 5 30534-30550/keye.com.rxjavaobserver D/RxJava: 6 30534-30550/keye.com.rxjavaobserver D/RxJava: 7 30534-30550/keye.com.rxjavaobserver D/RxJava: 8

distinct( ) — 过滤掉重复数据

Observable.just(1, 2, 1, 4, 1, 6) .distinct() .subscribe(i -> { Log.d("RxJava", String.valueOf(i)); });

结果:

11-06 04:38:49.987 19504-19504/keye.com.rxjavaobserver D/RxJava: 1 11-06 04:38:49.987 19504-19504/keye.com.rxjavaobserver D/RxJava: 2 11-06 04:38:49.987 19504-19504/keye.com.rxjavaobserver D/RxJava: 4 11-06 04:38:49.988 19504-19504/keye.com.rxjavaobserver D/RxJava: 6

ofType( ) — 只发射指定类型的数据

Observable.just(1, "2", 3, "4", 5, 6) .ofType(Integer.class) .subscribe(i -> { Log.d("RxJava", String.valueOf(i)); });

结果

11-06 04:44:28.321 25785-25785/keye.com.rxjavaobserver D/RxJava: 1 11-06 04:44:28.321 25785-25785/keye.com.rxjavaobserver D/RxJava: 3 11-06 04:44:28.321 25785-25785/keye.com.rxjavaobserver D/RxJava: 5 11-06 04:44:28.321 25785-25785/keye.com.rxjavaobserver D/RxJava: 6

转载请注明原文地址: https://www.6miu.com/read-2250281.html

最新回复(0)