初探RxJava2Android

xiaoxiao2021-02-28  48

源码

RxJava: https://github.com/ReactiveX/RxJava

RxAndroid: https://github.com/ReactiveX/RxAndroid

配置

compile 'io.reactivex.rxjava2:rxjava:2.1.12' compile 'io.reactivex.rxjava2:rxandroid:2.0.2'

基本用法

向数据库插入一个用户数据

Observable.create(new ObservableOnSubscribe<User>() {//被观察者 @Override public void subscribe(ObservableEmitter<User> emitter) throws Exception { Log.i("room---", "被观察者存入数据 threadId=" + Thread.currentThread().getId()); User user = getUser(); UserDatabase.getInstance(RoomActivity.this).getUserDao().insert(user);//插入数据 emitter.onNext(user);//通知观察者已经插入数据 emitter.onComplete();//执行观察者的onComplete方法 } }).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { //被观察者开启插入前的回调 Log.i("room---", "被观察者插入数据前回调----threadId=" + Thread.currentThread().getId()); } }) .subscribeOn(Schedulers.io())//被观察者执行在io线程 .observeOn(AndroidSchedulers.mainThread())//观察者执行在android的UI线程 .subscribe(new Consumer<User>() { @Override public void accept(User user) throws Exception { //接收到被观察者的通知,和onComplete执行在同一个线程 Log.i("room---", "存入数据库的数据是:" + user.toString() + " threadId=" + Thread.currentThread().getId()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //发生错误的时候,观察者的此方法会被执行 } }, new Action() { @Override public void run() throws Exception { //观察者的onComplete对象,由被观察者执行(需要被观察者主动调用),和观察者回调执行在统一线程 Log.i("room---", "插入数据执行完毕 threadId=" + Thread.currentThread().getId()); } });

输出

被观察者插入数据前回调----threadId=3564 被观察者存入数据 threadId=3564 存入数据库的数据是:id=0 name=user3 age=13 threadId=1 插入数据执行完毕 threadId=1

fromArray/fromIterable/just 用法

数据插入一组成功后,通知观察者:fromArray、fromIterable或者just,这几种方法都能自动调用onComplete

将数据元素逐个通知给观察者 User[] users = new User[]{new User("userA", 12), new User("userB", 32)}; //插入users... //... //插入完毕后通知监听 Observable.fromArray(users) .observeOn(AndroidSchedulers.mainThread())//观察者执行在android的UI线程 .subscribe(new Consumer<User>() { @Override public void accept(User user) throws Exception { //接收到被观察者的通知,注意,此处是逐个通知 Log.i("room---", "存入数据库的数据是:" + user.toString() + " threadId=" + Thread.currentThread().getId()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //发生错误的时候,观察者的此方法会被执行 } }, new Action() { @Override public void run() throws Exception { //观察者的onComplete对象,能自动被调用 Log.i("room---", "插入数据执行完毕 threadId=" + Thread.currentThread().getId()); } });

或者:

List<User> userList = new ArrayList<>(); userList.add(new User("userA", 23)); userList.add(new User("userB", 24)); //插入users... //... //插入完毕后通知监听 Observable.fromIterable(userList) .observeOn(AndroidSchedulers.mainThread())//观察者执行在android的UI线程 .subscribe(new Consumer<User>() { @Override public void accept(User users) throws Exception { //接收到被观察者的通知 Log.i("room---", "存入数据库的数据是:" + users.toString() + " threadId=" + Thread.currentThread().getId()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //发生错误的时候,观察者的此方法会被执行 } }, new Action() { @Override public void run() throws Exception { //观察者的onComplete对象,能自动被调用 Log.i("room---", "插入数据执行完毕 threadId=" + Thread.currentThread().getId()); } });

数据插入成功后,通知观察者,将数组元素全部一次性通知给观察者

User[] users = new User[]{new User("userA", 12), new User("userB", 32)}; //插入users... //... //插入完毕后通知监听 Observable.just(users) .observeOn(AndroidSchedulers.mainThread())//观察者执行在android的UI线程 .subscribe(new Consumer<User[]>() { @Override public void accept(User[] user) throws Exception { //接收到被观察者的通知 for(int i = 0; i < user.length; i++) { Log.i("room---", "存入数据库的数据是:" + user[i].toString() + " threadId=" + Thread.currentThread().getId()); } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //发生错误的时候,观察者的此方法会被执行 } }, new Action() { @Override public void run() throws Exception { //观察者的onComplete对象,由被观察者执行,能自动被调用 Log.i("room---", "插入数据执行完毕 threadId=" + Thread.currentThread().getId()); } });

或者

List<User> userList = new ArrayList<>(); userList.add(new User("userA", 23)); userList.add(new User("userB", 24)); //插入users... //... //插入完毕后通知监听 Observable.just(userList) .observeOn(AndroidSchedulers.mainThread())//观察者执行在android的UI线程 .subscribe(new Consumer<List<User>>() { @Override public void accept(List<User> users) throws Exception { //接收到被观察者的通知 for(User user : users) { Log.i("room---", "存入数据库的数据是:" + user.toString() + " threadId=" + Thread.currentThread().getId()); } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //发生错误的时候,观察者的此方法会被执行 } }, new Action() { @Override public void run() throws Exception { //观察者的onComplete对象,由被观察者执行,能自动被调用 Log.i("room---", "插入数据执行完毕 threadId=" + Thread.currentThread().getId()); } });

输出结果一样

存入数据库的数据是:id=0 name=userA age=12 threadId=1 存入数据库的数据是:id=0 name=userB age=32 threadId=1 插入数据执行完毕 threadId=1

map用法

将User转换为String回调给观察者

Observable.fromIterable(userList) .map(new Function<User, String>() { @Override public String apply(User user) throws Exception { return "mapUserString=" + user.toString();//将user转乘string给观察者 } }) .subscribe(new Consumer<String>() { @Override public void accept(String userStr) throws Exception { //接收到被观察者的通知 Log.i("room---", "存入数据库的数据是:" + userStr); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //发生错误的时候,观察者的此方法会被执行 } }, new Action() { @Override public void run() throws Exception { //观察者的onComplete对象 Log.i("room---", "插入数据执行完毕"); } });

输出

存入数据库的数据是:mapUserString=id=0 name=userA age=23 存入数据库的数据是:mapUserString=id=0 name=userB age=24 插入数据执行完毕

flatMap 直接转换Observale对象

Observable.create(new ObservableOnSubscribe<List<User>>() { @Override public void subscribe(ObservableEmitter<List<User>> emitter) throws Exception { List<User> users = UserDatabase.getInstance(RoomActivity.this).getUserDao().queryAll(); emitter.onNext(users); } }) .subscribeOn(Schedulers.io()) .flatMap(new Function<List<User>, ObservableSource<?>>() { @Override public ObservableSource<User> apply(List<User> userList) throws Exception { return Observable.fromIterable(userList);//转乘obserable对象 } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object obj) throws Exception { User user = (User) obj;//需要强制转一下 Log.i("room---",user.toString()); } }); id=1 name=user6 age=16 id=2 name=user2 age=12 id=3 name=user7 age=17 id=4 name=user2 age=12 id=5 name=user3 age=13 id=7 name=user6 age=16 id=8 name=user0 age=10 id=9 name=user7 age=17

线程调度

不指定线程的话默认observable和observer会运行在同一个线程

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

Observable.create(new ObservableOnSubscribe<List<User>>() { @Override public void subscribe(ObservableEmitter<List<User>> emitter) throws Exception { Log.i("room---", "从数据库获取数据 ThreadId=" + Thread.currentThread().getId()); List<User> users = UserDatabase.getInstance(RoomActivity.this).getUserDao().queryAll(); emitter.onNext(users); } }) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.i("room---", "获取数据之前的回调 ThreadId=" + Thread.currentThread().getId()); } }) .subscribeOn(AndroidSchedulers.mainThread()) .flatMap(new Function<List<User>, ObservableSource<?>>() { @Override public ObservableSource<User> apply(List<User> userList) throws Exception { Log.i("room---", "转换观察者对象 ThreadId=" + Thread.currentThread().getId()); return Observable.fromIterable(userList); } }) .subscribeOn(Schedulers.newThread()) .filter(new Predicate<Object>() { @Override public boolean test(Object obj) throws Exception { Log.i("room---", "过滤池id为偶数的数据 ThreadId=" + Thread.currentThread().getId()); User user = (User) obj; return user.getId() % 2 == 0; } }) .subscribeOn(Schedulers.computation()) .take(5)//只选泽其中五个数据 .subscribeOn(Schedulers.newThread()) .map(new Function<Object, String>() { @Override public String apply(Object obj) throws Exception { Log.i("room---", "转换User为String ThreadId=" + Thread.currentThread().getId()); User user = (User) obj; return user.toString(); } }) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<String>() { @Override public void accept(String userStr) throws Exception { Log.i("room---", "观察者获取到数据 +" + userStr + " ThreadId=" + Thread.currentThread().getId()); } }); 获取数据之前的回调 ThreadId=1 从数据库获取数据 ThreadId=3605 转换观察者对象 ThreadId=3605 过滤池id为偶数的数据 ThreadId=3605 过滤池id为偶数的数据 ThreadId=3605 转换User为String ThreadId=3605 过滤池id为偶数的数据 ThreadId=3605 过滤池id为偶数的数据 ThreadId=3605 转换User为String ThreadId=3605 过滤池id为偶数的数据 ThreadId=3605 观察者获取到数据 +id=2 name=user2 age=12 ThreadId=3606 过滤池id为偶数的数据 ThreadId=3605 过滤池id为偶数的数据 ThreadId=3605 观察者获取到数据 +id=4 name=user2 age=12 ThreadId=3606 转换User为String ThreadId=3605 过滤池id为偶数的数据 ThreadId=3605 观察者获取到数据 +id=8 name=user0 age=10 ThreadId=3606 过滤池id为偶数的数据 ThreadId=3605 转换User为String ThreadId=3605 过滤池id为偶数的数据 ThreadId=3605 观察者获取到数据 +id=10 name=user1 age=11 ThreadId=3606 过滤池id为偶数的数据 ThreadId=3605 转换User为String ThreadId=3605 观察者获取到数据 +id=12 name=user5 age=15 ThreadId=3606

使用自定义线程池:

Observable.create(new ObservableOnSubscribe<List<User>>() { @Override public void subscribe(ObservableEmitter<List<User>> emitter) throws Exception { Log.i("room---", "从数据库获取数据 ThreadId=" + Thread.currentThread().getId()); List<User> users = UserDatabase.getInstance(RoomActivity.this).getUserDao().queryAll(); emitter.onNext(users); } }) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))//自定义线程池 .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.i("room---", "获取数据之前的回调 ThreadId=" + Thread.currentThread().getId()); } }) .subscribeOn(AndroidSchedulers.mainThread()) .flatMap(new Function<List<User>, ObservableSource<?>>() { @Override public ObservableSource<User> apply(List<User> userList) throws Exception { Log.i("room---", "转换观察者对象 ThreadId=" + Thread.currentThread().getId()); return Observable.fromIterable(userList); } }) .subscribeOn(Schedulers.from(Executors.newCachedThreadPool()))//自定义线程池 .filter(new Predicate<Object>() { @Override public boolean test(Object obj) throws Exception { Log.i("room---", "过滤池id为偶数的数据 ThreadId=" + Thread.currentThread().getId()); User user = (User) obj; return user.getId() % 2 == 0; } }) .subscribeOn(Schedulers.computation()) .take(5)//只选泽其中五个数据 .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3)))//自定义线程池 .map(new Function<Object, String>() { @Override public String apply(Object obj) throws Exception { Log.i("room---", "转换User为String ThreadId=" + Thread.currentThread().getId()); User user = (User) obj; return user.toString(); } }) .subscribeOn(Schedulers.from(Executors.newCachedThreadPool()))//自定义线程池 .observeOn(Schedulers.from(Executors.newCachedThreadPool()))//自定义线程池 .subscribe(new Consumer<String>() { @Override public void accept(String userStr) throws Exception { Log.i("room---", "观察者获取到数据 +" + userStr + " ThreadId=" + Thread.currentThread().getId()); } });

输出:

获取数据之前的回调 ThreadId=1 从数据库获取数据 ThreadId=3624 转换观察者对象 ThreadId=3624 过滤池id为偶数的数据 ThreadId=3624 过滤池id为偶数的数据 ThreadId=3624 转换User为String ThreadId=3624 过滤池id为偶数的数据 ThreadId=3624 过滤池id为偶数的数据 ThreadId=3624 转换User为String ThreadId=3624 观察者获取到数据 +id=2 name=user2 age=12 ThreadId=3625 过滤池id为偶数的数据 ThreadId=3624 过滤池id为偶数的数据 ThreadId=3624 观察者获取到数据 +id=4 name=user2 age=12 ThreadId=3625 过滤池id为偶数的数据 ThreadId=3624 转换User为String ThreadId=3624 过滤池id为偶数的数据 ThreadId=3624 观察者获取到数据 +id=8 name=user0 age=10 ThreadId=3625 过滤池id为偶数的数据 ThreadId=3624 转换User为String ThreadId=3624 过滤池id为偶数的数据 ThreadId=3624 观察者获取到数据 +id=10 name=user1 age=11 ThreadId=3625 过滤池id为偶数的数据 ThreadId=3624 转换User为String ThreadId=3624 观察者获取到数据 +id=12 name=user5 age=15 ThreadId=3625
转载请注明原文地址: https://www.6miu.com/read-2628592.html

最新回复(0)