RxJava详解(基于2.X版本的过滤操作符)

xiaoxiao2021-02-28  1

本章节讲述RxJava 的过滤操作符

 

<1> Filter()

 

[1] 作用

过滤 特定条件的事件。

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Predicate; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava Filter()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (e == null) { return; } //没有切断与观察者的关联 发送Int类型的数据 if (!e.isDisposed()) { e.onNext(1); e.onNext(2); e.onNext(3); e.onNext(4); e.onNext(5); e.onNext(6); e.onComplete(); } } }).filter(new Predicate<Object>() { @Override public boolean test(Object o) throws Exception { if (null == o) { return false; } int num = (int) o; if (num == 4) {//过滤4 return false;//过滤 返回 false } return true; } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe方法 是否断开连接:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext方法 结果:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

 

[3] 结果

D/TAG: 观察者 onSubscribe方法 是否断开连接:false D/TAG: 观察者 onNext方法 结果:1 D/TAG: 观察者 onNext方法 结果:2 D/TAG: 观察者 onNext方法 结果:3 D/TAG: 观察者 onNext方法 结果:5 D/TAG: 观察者 onNext方法 结果:6

 

 

 

 

<2> ofType()

 

[1] 作用

过滤  保留 特定数据类型的数据。

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava ofType()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (e == null) { return; } //没有切断与观察者的关联 发送Int类型的数据 if (!e.isDisposed()) { e.onNext(1); e.onNext(2); e.onNext("张三"); e.onNext("326"); e.onNext(5); e.onNext("过滤"); e.onComplete(); } } }).ofType(String.class)//过滤 只保留 字符串类型 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe方法 是否断开连接:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext方法 结果:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

 

[3] 结果

D/TAG: 观察者 onSubscribe方法 是否断开连接:false D/TAG: 观察者 onNext方法 结果:张三 D/TAG: 观察者 onNext方法 结果:326 D/TAG: 观察者 onNext方法 结果:过滤

 

 

 

<3> skip() &skipLast()

 

[1] 作用

跳过某个事件。

skip():过滤掉 正序第N个

skipLast():过滤掉 倒叙后N个

 

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava skip()&skipLast()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (e == null) { return; } //没有切断与观察者的关联 发送Int类型的数据 if (!e.isDisposed()) { e.onNext(1); e.onNext(2); e.onNext("张三"); e.onNext("326"); e.onNext(5); e.onNext("过滤"); e.onComplete(); } } }).skip(1)//过滤掉 正序第一个 .skipLast(4)//过滤掉 倒叙后四个 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe方法 是否断开连接:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext方法 结果:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

 

 

[3] 结果

D/TAG: 观察者 onSubscribe方法 是否断开连接:false D/TAG: 观察者 onNext方法 结果:2

 

 

 

<4> distinct()&distinctUntilChanged()

 

[1] 作用

过滤事件序列中重复的事件 / 连续重复的事件。

distinct():过滤 重复数据。

distinctUntilChanged():过滤 连续数据。

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava distinct()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (e == null) { return; } //没有切断与观察者的关联 发送Int类型的数据 if (!e.isDisposed()) { e.onNext(1); e.onNext(5); e.onNext("张三"); e.onNext("张三"); e.onNext(4); e.onNext(5); e.onComplete(); } } }).distinct()//过滤 重复数据 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe方法 是否断开连接:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext方法 结果:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

 

[3] 结果

D/TAG: 观察者 onSubscribe方法 是否断开连接:false D/TAG: 观察者 onNext方法 结果:1 D/TAG: 观察者 onNext方法 结果:5 D/TAG: 观察者 onNext方法 结果:张三 D/TAG: 观察者 onNext方法 结果:4

 

 

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava distinctUntilChanged()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (e == null) { return; } //没有切断与观察者的关联 发送Int类型的数据 if (!e.isDisposed()) { e.onNext(1); e.onNext(5); e.onNext("张三"); e.onNext("张三"); e.onNext(4); e.onNext(5); e.onComplete(); } } }).distinctUntilChanged()//过滤 连续数据 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe方法 是否断开连接:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext方法 结果:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

 

[3] 结果

D/TAG: 观察者 onSubscribe方法 是否断开连接:false D/TAG: 观察者 onNext方法 结果:1 D/TAG: 观察者 onNext方法 结果:5 D/TAG: 观察者 onNext方法 结果:张三 D/TAG: 观察者 onNext方法 结果:4 D/TAG: 观察者 onNext方法 结果:5

 

 

 

 

<5> take()&takeLast()

 

[1] 作用

take():指定观察者最多能接收到的事件数量。

takeLast():指定观察者只能接收到被观察者发送的最后几个事件。

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava take()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (e == null) { return; } //没有切断与观察者的关联 发送Int类型的数据 if (!e.isDisposed()) { e.onNext(1); e.onNext(5); e.onNext("张三"); e.onNext("张三"); e.onNext(4); e.onNext(5); e.onComplete(); } } }).take(2)//最多能接收 2个数据 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe方法 是否断开连接:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext方法 结果:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

[3] 结果

D/TAG: 观察者 onSubscribe方法 是否断开连接:false D/TAG: 观察者 onNext方法 结果:1 D/TAG: 观察者 onNext方法 结果:5

 

 

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava takeLast()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (e == null) { return; } //没有切断与观察者的关联 发送Int类型的数据 if (!e.isDisposed()) { e.onNext(1); e.onNext(5); e.onNext("张三"); e.onNext("张三"); e.onNext(4); e.onNext(5); e.onComplete(); } } }).takeLast(3)//只能接收到被观察者发送的最后 3 个事件 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe方法 是否断开连接:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext方法 结果:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

 

 

[3] 结果

D/TAG: 观察者 onSubscribe方法 是否断开连接:false D/TAG: 观察者 onNext方法 结果:张三 D/TAG: 观察者 onNext方法 结果:4 D/TAG: 观察者 onNext方法 结果:5

 

 

 

 

<6> throttleFirst()&throttleLast()

 

[1] 作用

在某段时间内,只发送该段时间内第1次事件&最后1次事件。

throttleFirst():第1次事件。

throttleLast():最后1次事件。

 

throttleFirst(long windowDuration, TimeUnit unit):参数1:XX时间第一次出现 参数2:时间单位。

throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler):参数1:XX时间第一次出现 参数2:时间单位 参数3:线程调度类。

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava throttleFirst()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (null == e) { return; } /** * 为了测试 模拟 0.5秒执行一次onNext方法发送一次数据 * */ if (!e.isDisposed()) { e.onNext(1); e.onNext(2); Thread.sleep(500); e.onNext(3); Thread.sleep(500); e.onNext(4); Thread.sleep(500); e.onNext(5); Thread.sleep(500); e.onComplete(); } } }).throttleFirst(1, TimeUnit.SECONDS)//每1秒内的第一次 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

[3] 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:false D/TAG: 观察者 onNext 方法 value.toString()----:1 D/TAG: 观察者 onNext 方法 value.toString()----:4

 

[2] 代码

throttleLast(long windowDuration, TimeUnit unit):参数1:XX时间最后出现 参数2:时间单位。

throttleLast(long skipDuration, TimeUnit unit, Scheduler scheduler):参数1:XX时间最后出现 参数2:时间单位 参数3:线程调度类。

 

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava throttleLast()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (null == e) { return; } /** * 为了测试 模拟 0.5秒执行一次onNext方法发送一次数据 * */ if (!e.isDisposed()) { e.onNext(1); e.onNext(2); Thread.sleep(500); e.onNext(3); Thread.sleep(500); e.onNext(4); Thread.sleep(500); e.onNext(5); Thread.sleep(500); e.onComplete(); } } }).throttleLast(1, TimeUnit.SECONDS)//每1秒内的最后一次 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

[3] 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:false D/TAG: 观察者 onNext 方法 value.toString()----:3 D/TAG: 观察者 onNext 方法 value.toString()----:5

 

 

 

 

<7> Sample()

 

[1] 作用

在某段时间内,只发送该段时间内最后1次事件。和throttleFirst方法作用相当。

sample(long period, TimeUnit unit):参数1:XX时间最后出现 参数2:时间单位。

sample(long period, TimeUnit unit, Scheduler scheduler):XX时间最后出现 参数2:时间单位 参数3:线程调度类。

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava Sample()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (null == e) { return; } /** * 为了测试 模拟 0.5秒执行一次onNext方法发送一次数据 * */ if (!e.isDisposed()) { e.onNext(1); e.onNext(2); Thread.sleep(500); e.onNext(3); Thread.sleep(500); e.onNext(4); Thread.sleep(500); e.onNext(5); Thread.sleep(500); e.onComplete(); } } }).sample(1, TimeUnit.SECONDS)//每1秒内的最后一次 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

[3] 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:false D/TAG: 观察者 onNext 方法 value.toString()----:3 D/TAG: 观察者 onNext 方法 value.toString()----:5

 

 

 

<8> throttleWithTimeout()

 

[1] 作用

若2次发送事件的间隔 小于 指定时间。不发送前一次的数据。直到指定时间内都没有新数据发射时才会发送后一次的数据。

 

throttleWithTimeout(long timeout, TimeUnit unit):参数1:两次事件时间间隔小于XX 参数2:时间单位

throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler):参数1:两次事件时间间隔小于XX 参数2:时间单位 参数3:线程调度类

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; public class RxJavaActivity extends AppCompatActivity { private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava throttleWithTimeout()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (null == e) { return; } /** * 为了测试 模拟 0.5秒执行一次onNext方法发送一次数据 * */ if (!e.isDisposed()) { e.onNext(1); e.onNext(2); Thread.sleep(500); e.onNext(3); Thread.sleep(500); e.onNext(4); Thread.sleep(500); e.onNext(5); Thread.sleep(500); e.onComplete(); } } }).throttleWithTimeout(1, TimeUnit.SECONDS)//若2次发送事件的间隔 小于1秒 则不发送数据 知道最后一次 .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { if (null == d) { return; } disposable = d; Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed()); } @Override public void onNext(Object value) { Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } /** * onDestroy方法 */ @Override protected void onDestroy() { super.onDestroy(); if (null != disposable) { Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed()); if (!disposable.isDisposed()) {//没有断开 disposable.dispose();//断开 Log.d("TAG", "onDestroy方法 断开订阅"); } } } }

 

 

[3] 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:false D/TAG: 观察者 onNext 方法 value.toString()----:5

 

 

 

<9> firstElement()&lastElement()

 

[1] 作用

仅选取第1条数据&最后一条数据。

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.functions.Consumer; public class RxJavaActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava firstElement()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (null == e) { return; } if (!e.isDisposed()) { e.onNext(1); e.onNext(2); e.onNext(3); e.onNext(4); e.onNext(5); e.onComplete(); } } }).firstElement()//只取第一条数据 .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { if (null == o) { return; } Log.d("TAG", "观察者 accept 方法 o.toString()----:" + o.toString()); } }); } }

 

 

[3] 结果

D/TAG: 观察者 accept 方法 o.toString()----:1

 

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.functions.Consumer; public class RxJavaActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava lastElement()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (null == e) { return; } if (!e.isDisposed()) { e.onNext(1); e.onNext(2); e.onNext(3); e.onNext(4); e.onNext(5); e.onComplete(); } } }).lastElement()//只取最后一条数据 .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { if (null == o) { return; } Log.d("TAG", "观察者 accept 方法 o.toString()----:" + o.toString()); } }); } }

 

[3] 结果

D/TAG: 观察者 accept 方法 o.toString()----:5

 

 

 

<10> elementAt()&elementAtOrError()

 

[1] 作用

通过 索引值 确定 接收指定消息。

elementAt():通过 索引值 确定 接收指定消息。

elementAtOrError():通过 索引值 确定 接收指定消息。 错误时有区别。当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常

elementAt(long index):参数1:索引index

elementAt(long index, T defaultItem) 参数1:索引index 参数2:默认值 当前索引取不到值时使用

 

[2] 代码

package com.example.mydemo.rxjava; import android.os.Bundle; import android.util.Log; import androidx.appcompat.app.AppCompatActivity; import com.example.mydemo.R; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.functions.Consumer; public class RxJavaActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rxjava); method(); } /** * 创建 RxJava elementAt()过滤操作符 */ public void method() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { if (null == e) { return; } if (!e.isDisposed()) { e.onNext(1); e.onNext(2); e.onNext(3); e.onNext(4); e.onNext(5); e.onComplete(); } } }).elementAt(4)//接收索引为4的消息 .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { if (null == o) { return; } Log.d("TAG", "观察者 accept 方法 o.toString()----:" + o.toString()); } }); } }

 

 

[3] 结果

D/TAG: 观察者 accept 方法 o.toString()----:5

 

如果

.elementAt(10)//接收索引为10的消息

 

结果

无接收内容。即索引值可以越界。越界不会报错,但是会出现无消息可接收的问题。

 

如果 即 索引值越界 但设置默认值

.elementAt(10, 3)//接收索引为10的消息 默认值3

 

结果 显示默认值

D/TAG: 观察者 accept 方法 o.toString()----:3

 

如果 即 索引值正常 但也设置默认值

.elementAt(4, 3)//接收索引为4的消息 默认值3

 

结果 显示正确的值

D/TAG: 观察者 accept 方法 o.toString()----:5

 

 

 

 

 

 

转载请注明原文地址: https://www.6miu.com/read-850394.html

最新回复(0)