RxSwift(3.4.1) - Filtering Operators

xiaoxiao2021-02-28  57

Filtering Operators

Ignoring operators

 ignoreElements,使用该操作符能够忽略.next事件,观察者序列能够发送.completed,.error等终止事件

example(of: "ignoreElement") { let disposeBag = DisposeBag() //1 let striks = PublishSubject<String>() //2 striks.ignoreElements() .subscribe { _ in print("You ara out!")} .addDisposableTo(disposeBag) //3 striks.onNext("x") striks.onNext("x") striks.onNext("x") //4 striks.onCompleted() } 执行结果和分析:  --- Example of: ignoreElement --- You ara out!   1:创建一个PublishSubject对象 2:开始订阅所有的striks事件,并且使用ignoreElements操作符忽略所有的.next事件,只接受.completed和.error事件 3:添加新消息到striks观察者序列,但是无效,因为使用ignoreElements操作符,跳过了这些消息 4:现在添加完成事件,那么会执行订阅者闭包,打印信息。因为订阅者能够接收到.completed事件 那么ignoreElements有什么用呢?如果你不关注消息的发送,仅仅是想接收由.completed或者.error发送的消息而终止观察者序列,那么可以使用ignoreElements操作符。

ElementAt Operator

仅仅发送观察者序列中具体位置的消息事件,即获取指定位置的元素,忽略其它元素.

example(of: "elementAt") { let disposeBag = DisposeBag() Observable.of(1,2,3,4,5) .elementAt(1) .subscribe(onNext: { print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: elementAt --- 2 */

Filter Operator

只发送观察者序列中满足闭包具体条件的元素.换句话说,即使用过滤闭包对元素进行过滤操作,只有满足结果的元素才会被拥有并发送.

example(of: "filter") { let disposeBag = DisposeBag() Observable.of(1, 2, 3, 4, 5, 6, 7) .filter { $0 % 2 == 0 } //添加条件约束进行过滤元素,结果为true保存元素 .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag) } /* --- Example of: filter --- 2 4 6 */

Skip Operator

跳过指定的元素,即指定一个位置,然后忽略从第一个位置开始到指定位置的元素

example(of: "skip") { let disposeBag = DisposeBag() Observable.of("a", "b", "c", "d", "e") .skip(3) //跳过前面3个元素,即前面3个.next事件 .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag) } /* --- Example of: skip --- d e */

SkipWhile Operator

skipWhile,跟filter很像,它能够让你有一个闭包,用于过滤元素,确定哪些元素应该被跳过,哪些元素可以留下。但是也有区别,skipWhile只会在闭包条件满足之前进行跳过,一旦条件不满足将停止,即使后续元素满足闭包的判断条件也不会被跳过,照样发送元素。

example(of: "skipWhile") { let disposeBag = DisposeBag() Observable.of(2, 2, 3, 4, 5, 6) .skipWhile { $0 % 2 == 0 } .subscribe(onNext: { print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: skipWhile --- 3 4 5 6 */

SkipUntil Operator

skipUntil  保持一直跳过原观察者序列发送的事件,一直到另外一个触发观察者序列发送事件。然后停止跳过元素操作.

example(of: "skipUntil") { let disposeBag = DisposeBag() //1 let sourceSequence = PublishSubject<String>() let referenceSequence = PublishSubject<String>() //2 sourceSequence.skipUntil(referenceSequence) .subscribe(onNext: { print($0)}) .addDisposableTo(disposeBag) //3 sourceSequence.onNext("1") sourceSequence.onNext("2") sourceSequence.onNext("3") //4 referenceSequence.onNext("stop") //5 sourceSequence.onNext("4") sourceSequence.onNext("5") sourceSequence.onNext("6") }

执行结果和分析:

--- Example of: skipUntil --- 4 5 6   1:创建源观察者序列和一个触发观察者序列 2:使用skipUntil操作符,并且开始订阅事件 3:源观察者序列开始发送.next事件 4:源观察者序列发送的事件目前为止无效,因为事件一直被忽略,那么为触发观察者序列添加一个新事件 5:源观察者序列再次发送事件,这次订阅者能够接受事件了

SkipWhileWithIndex Operator

跳过观察者序列中从开始位置的元素一直到满足具体闭包条件,闭包中能够获取元素和元素对应的位置。然后才开始发送后续的元素

example(of: "skipWhileWithIndex") { let disposeBag = DisposeBag() Observable.of("1", "2", "3", "4", "5", "6") .skipWhileWithIndex({ element, index in index < 4 }) .subscribe(onNext: { print( $0)}) .addDisposableTo(disposeBag) } /* --- Example of: skipWhileWithIndex --- 5 6 */

Take Operator

take跟skip相反,当你想获取指定元素的时候,我们可以使用take,可以指定获取元素的多少。也可以理解为仅仅只发送指定数字的元素,如果是2,序列有4个元素,只发送前面2个元素.

example(of: "take") { let disposeBag = DisposeBag() //获取3个元素,从第一个元素开始,如果超过序列的元素个数,只取当前元素。 Observable.of(1, 2, 3, 4, 5, 6) .take(3) .subscribe(onNext: { print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: take --- 1 2 3 */

TakeWhile Operator

takeWhile 跟skipWhile相似,你是获取元素,它是跳过元素。只要满足闭包执行条件的都获取,并且不能中断,一旦遇到不满足的条件,那么立即终止,即使后续元素满足闭包条件也不会获取,被忽略掉。

example(of: "takeWhile") { let disposeBag = DisposeBag() Observable.of(2, 2, 3, 4, 5, 6, 7) .takeWhile({ $0 % 2 == 0 }) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: takeWhile --- 2 2 */

TakeWhileWithIndex Operator

有时候你想指定位置的元素被忽略。那么就需要使用takeWhileWithIndex。

example(of: "takeWhileWithIndex") { let disposeBag = DisposeBag() //1 创建一个integer观察者序列 Observable.of(2, 2, 4, 4, 6, 6, 7) //2 使用takeWhileWithIndex操作符,闭包中获取元素值和值所在的位置 .takeWhileWithIndex({ (integer, index) -> Bool in //3 每一个元素必须对2取余为0并且位置小于3 integer % 2 == 0 && index < 3 }) //4 开始订阅 .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag) } /* --- Example of: takeWhileWithIndex --- 2 2 4 */

TakeUntil Operator

跟skipUnti对应的有一个takeUntil,takeUntil是获取源观察者序列的元素一直到其它观察者序列发送消息。那么停止获取元素. 也可以说是从源观察者序列发送元素一直到引用观察者序列发送一个元素为止。

example(of: "takeUntil") { let disposeBag = DisposeBag() //1 let sourceSequence = PublishSubject<String>() let referenceSequence = PublishSubject<String>() //2 sourceSequence.takeUntil(referenceSequence) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) //3 sourceSequence.onNext("1") sourceSequence.onNext("2") sourceSequence.onNext("3") //4 referenceSequence.onNext("stop!") sourceSequence.onNext("4") sourceSequence.onNext("5") sourceSequence.onNext("6") }  执行结果和分析  --- Example of: takeUntil ---  1  2  3   1:创建原观察者序列(sourceSequence),引用观察者序列(referenceSequence) 2:使用takeUntil,传入referenceSequence,这将导致referenceSequence发送消息,那么sourceSequence再发送消息将接收不到。 3:sourceSequence发送消息,订阅者接受消息 4:这里已经停止了sourceSequence获取元素,所以后续的发送消息将无效

TakeLast Operator

仅仅是发送指定数字个数的元素,从最终元素开始(即最后位置),并且忽略在它之前的items。

example(of: "takeLast") { let disposeBag = DisposeBag() Observable.of("1", "2", "3", "4", "5") .takeLast(3) .subscribe(onNext: {print($0) }) .addDisposableTo(disposeBag) Observable.of("a", "b", "c", "d") .takeLast(2) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) }

执行结果

--- Example of: takeLast ---

3

4

5

c

d

DistinctUntilChanged Operator

使用distinctUntilChanged防止序列连续相连的重复元素通过 

example(of: "distinctUntilChanged") { let disposeBag = DisposeBag() Observable.of("a", "a", "b", "b", "a") .distinctUntilChanged() .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: distinctUntilChanged --- a b a 注意:是连续重复的元素,所以最后的a是可以打印的 */

Single Operator

只发送第一个元素(或者第一个满足条件的元素)。如果观察者序列并不是准确的发送一个消息,将抛出一个异常

example(of: "single") { let disposeBag = DisposeBag() Observable.of("1", "a", "2", "b") .single() .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) Observable.of("right") .single() .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) }

执行结果 

--- Example of: single ---

1

Received unhandled error: /var/folders/wl/c1df1cbx5_l2cf3p0qwp7lqh0000gn/T/./lldb/23529/playground632.swift:45:__lldb_expr_632 -> Sequence contains more than one element.

right

 

single还可以拥有一个条件闭包,找需要的元素

example(of: "single with conditions") { let disposeBag = DisposeBag() Observable.of("1", "2", "3", "4", "5", "6") .single{$0 == "3"} .subscribe{print($0)} .addDisposableTo(disposeBag) /* next(3) completed */ Observable.of("a", "b", "c","b") .single { $0 == "b" } .subscribe { print($0) } .addDisposableTo(disposeBag) /* next(b) error(Sequence contains more than one element.) */ Observable.of("A", "B", "C") .single { $0 == "a" } .subscribe { print($0) } .addDisposableTo(disposeBag) /* error(Sequence doesn't contain any elements.) */ }

Share Operator 

返回一个观察者序列,该序列被所有订阅者共享。

example(of: "share") { let disposeBag = DisposeBag() let numbers = Observable<Int>.create { observer in observer.onNext(1) observer.onNext(2) observer.onCompleted() return Disposables.create() } //订阅者1 numbers.subscribe(onNext: { print("element [\($0)]") }, onCompleted: { print("completed!") }) .addDisposableTo(disposeBag) //订阅者2 numbers.subscribe(onNext: { print("element [\($0)]") },onCompleted: { print("completed!") }) .addDisposableTo(disposeBag) } 执行结果为:

--- Example of: share --- element [1] element [2] completed! element [1] element [2] completed!

由上面结果,不知道有没有发现一个问题,就是两次订阅得到相同的结果,而且创建了不同的观察者序列。即每次调用subscribe,都为订阅者创建了一个新的观察者序列(Observable)。事实上,这是没有必要。因为这会导致很多重复的元素,执行不必要的操作。特别是网络请求,肯定希望共享订阅者。所以为了共享订阅,我们可以使用share()操作符。避免多个订阅者订阅相同的观察者序列重复执行不必要的操作。所以改写例子中的创建观察者序列部分,添加share()操作符,更多相关信息,看这里

let numbers = Observable<Int>.create { observer in observer.onNext(1) observer.onNext(2) observer.onCompleted() return Disposables.create() }.share() 执行结果为:

--- Example of: share --- element [1] element [2] completed! completed!

ShareReplay Operator

shareReplay 由名字就知道是共享和重发。共享(相当于share的功能)同一个观察者序列并且重发消息,重发消息的个数由参数大小决定。

example(of: "shareReplay(1)") { let disposeBag = DisposeBag() let sequence = Observable.of(1,2) .map{ i -> Int in print("map---\(i)") return i * 2 }.shareReplay(1) sequence.subscribe(onNext: { print("subscribe1 = \($0)")}).addDisposableTo(disposeBag) sequence.subscribe(onNext: { print("subscribe2 = \($0)")}).addDisposableTo(disposeBag) } 执行结果和分析: --- Example of: shareReplay(1) --- map---1 subscribe1 = 2 map---2 subscribe1 = 4 subscribe2 = 4 因为我们使用了shareReplay(1),所以原观察者序列被共享,并且缓存一个元素。所以在第二次订阅观察者序列的时候,会发送上一次缓存的最后一个元素数据,例子中为4.如果我们将shareReplay(1)改为shareReplay(2),结果如下: --- Example of: shareReplay(2) --- map---1 subscribe1 = 2 map---2 subscribe1 = 4 subscribe2 = 2 subscribe2 = 4 即缓存了2个元素数据,重发2个元素。如果发送的消息元素个数小于指定缓存大小,发送实际元素个数。

实际使用场景: 比如,使用URLSession.rx.response(request:) 发送你的请求到服务器,一旦接受到返回数据就会发送.next事件,返回对应的数据,并标记完成。在这种情况,如果观察者序列(observable)完成了,这时你再一次订阅它,又将重新创建一个新的订阅并且启动另外一个相同的请求服务端数据操作。为了防止这种情况,我们就可以使用shareReplay(_),缓存上一个发送的元素(即服务端返回的数据),一旦有新的订阅,shareReplay操作符会立马从缓存中获取来自服务端返回的数据。

Debounce Operator

忽略在指定时间间隔内跟随发送的元素,指定具体的运行线程。也可以理解为:仅在过了一段指定的时间还没发射数据时才发射一个数据,换句话说就是debounce会抑制发射过快的值。注意:这一操作需要指定一个线程。一般使用MainScheduler

example(of: "debounce") { let disposeBag = DisposeBag() Observable.of(1,2,3,4,5) .debounce(1, scheduler: MainScheduler.instance) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) }  执行结果:  --- Example of: debounce ---  5   可以看到只发射了一个5 ,因为这个序列发射速度是非常快的,所以过滤掉了前面发射的值。看几个经常使用的情况: 1)根据searchBar或者textField输入的文本内容进行搜索相应的结果,我们会有如下代码:

searchBar.rx .text // Observable property thanks to RxCocoa .orEmpty // Make it non-optional .debounce(0.5, scheduler: MainScheduler.instance) // Wait 0.5 for changes .subscribe(onNext: { [unowned self] text in self.search(query: text) // 搜索结果 }).addDisposableTo(disposeBag) 上面使用了debounce(0.5, scheduler: MainScheduler.instance),所以debounce会告诉Rx,当searchBar中的文本内容在上一个0.5秒文本内容没有发生改变,我们想接收通知。这意味着,如果我们的用户一直在不断的输入内容,在停止0.5秒之后会立马发送事件,进行后续查询操作。主要是为了解决,我们并不想每次用户输入一点内容就调用查询API,仅仅是用户输入完成再进行查询。 2)防止按钮多次点击操作

button.rx.tap .debounce(0.3, scheduler: MainScheduler.instance) .subscribe(onNext: { [unowned self] in if self.isFollowedByMe() { self.follow() } else { self.unfollow() } }).addDisposableTo(disposeBag) 比如:我们有一个点赞的按钮,当用户觉得内容不错,想进行点赞的时候,会点击按钮,但是也有可能有些用户会连续点多次点击按钮,通过使用debounce,能够帮助我们避免发送多次请求。这还是很有必要的。

Throttle Operator

返回的观察者序列发送第一个元素(Throttle emits only the first item emitted by the source observable in the time window)。该操作符将确保在指定的时间内不会有两个元素被发送。队列中的元素如果和下一个元素的间隔小于了指定的时间间隔,那么这个元素将被过滤掉。在RxSwift3.0开始,对于Throttle和debounce区别,看这里 example(of: "throttle") { let disposeBag = DisposeBag() Observable.of(1,2,3,4,5) .throttle(1, scheduler: MainScheduler.instance) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) } 执行结果: --- Example of: throttle --- 1

转载请注明原文地址: https://www.6miu.com/read-28914.html

最新回复(0)