example(of: "ignoreElement") { let disposeBag = DisposeBag() //1 let striks = PublishSubject<String>() //2 striks.ignoreElements() .subscribe { _ in print("You ara out!")} .addDisposableTo(disposeBag) //3 striks.onNext("x") striks.onNext("x") striks.onNext("x") //4 striks.onCompleted() } 执行结果和分析: --- Example of: ignoreElement --- You ara out! 1:创建一个PublishSubject对象 2:开始订阅所有的striks事件,并且使用ignoreElements操作符忽略所有的.next事件,只接受.completed和.error事件 3:添加新消息到striks观察者序列,但是无效,因为使用ignoreElements操作符,跳过了这些消息 4:现在添加完成事件,那么会执行订阅者闭包,打印信息。因为订阅者能够接收到.completed事件 那么ignoreElements有什么用呢?如果你不关注消息的发送,仅仅是想接收由.completed或者.error发送的消息而终止观察者序列,那么可以使用ignoreElements操作符。
example(of: "elementAt") { let disposeBag = DisposeBag() Observable.of(1,2,3,4,5) .elementAt(1) .subscribe(onNext: { print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: elementAt --- 2 */
example(of: "filter") { let disposeBag = DisposeBag() Observable.of(1, 2, 3, 4, 5, 6, 7) .filter { $0 % 2 == 0 } //添加条件约束进行过滤元素,结果为true保存元素 .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag) } /* --- Example of: filter --- 2 4 6 */
example(of: "skip") { let disposeBag = DisposeBag() Observable.of("a", "b", "c", "d", "e") .skip(3) //跳过前面3个元素,即前面3个.next事件 .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag) } /* --- Example of: skip --- d e */
example(of: "skipWhile") { let disposeBag = DisposeBag() Observable.of(2, 2, 3, 4, 5, 6) .skipWhile { $0 % 2 == 0 } .subscribe(onNext: { print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: skipWhile --- 3 4 5 6 */
example(of: "skipUntil") { let disposeBag = DisposeBag() //1 let sourceSequence = PublishSubject<String>() let referenceSequence = PublishSubject<String>() //2 sourceSequence.skipUntil(referenceSequence) .subscribe(onNext: { print($0)}) .addDisposableTo(disposeBag) //3 sourceSequence.onNext("1") sourceSequence.onNext("2") sourceSequence.onNext("3") //4 referenceSequence.onNext("stop") //5 sourceSequence.onNext("4") sourceSequence.onNext("5") sourceSequence.onNext("6") }
执行结果和分析:
--- Example of: skipUntil --- 4 5 6 1:创建源观察者序列和一个触发观察者序列 2:使用skipUntil操作符,并且开始订阅事件 3:源观察者序列开始发送.next事件 4:源观察者序列发送的事件目前为止无效,因为事件一直被忽略,那么为触发观察者序列添加一个新事件 5:源观察者序列再次发送事件,这次订阅者能够接受事件了
example(of: "skipWhileWithIndex") { let disposeBag = DisposeBag() Observable.of("1", "2", "3", "4", "5", "6") .skipWhileWithIndex({ element, index in index < 4 }) .subscribe(onNext: { print( $0)}) .addDisposableTo(disposeBag) } /* --- Example of: skipWhileWithIndex --- 5 6 */
example(of: "take") { let disposeBag = DisposeBag() //获取3个元素,从第一个元素开始,如果超过序列的元素个数,只取当前元素。 Observable.of(1, 2, 3, 4, 5, 6) .take(3) .subscribe(onNext: { print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: take --- 1 2 3 */
example(of: "takeWhile") { let disposeBag = DisposeBag() Observable.of(2, 2, 3, 4, 5, 6, 7) .takeWhile({ $0 % 2 == 0 }) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: takeWhile --- 2 2 */
example(of: "takeWhileWithIndex") { let disposeBag = DisposeBag() //1 创建一个integer观察者序列 Observable.of(2, 2, 4, 4, 6, 6, 7) //2 使用takeWhileWithIndex操作符,闭包中获取元素值和值所在的位置 .takeWhileWithIndex({ (integer, index) -> Bool in //3 每一个元素必须对2取余为0并且位置小于3 integer % 2 == 0 && index < 3 }) //4 开始订阅 .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag) } /* --- Example of: takeWhileWithIndex --- 2 2 4 */
example(of: "takeUntil") { let disposeBag = DisposeBag() //1 let sourceSequence = PublishSubject<String>() let referenceSequence = PublishSubject<String>() //2 sourceSequence.takeUntil(referenceSequence) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) //3 sourceSequence.onNext("1") sourceSequence.onNext("2") sourceSequence.onNext("3") //4 referenceSequence.onNext("stop!") sourceSequence.onNext("4") sourceSequence.onNext("5") sourceSequence.onNext("6") } 执行结果和分析 --- Example of: takeUntil --- 1 2 3 1:创建原观察者序列(sourceSequence),引用观察者序列(referenceSequence) 2:使用takeUntil,传入referenceSequence,这将导致referenceSequence发送消息,那么sourceSequence再发送消息将接收不到。 3:sourceSequence发送消息,订阅者接受消息 4:这里已经停止了sourceSequence获取元素,所以后续的发送消息将无效
example(of: "takeLast") { let disposeBag = DisposeBag() Observable.of("1", "2", "3", "4", "5") .takeLast(3) .subscribe(onNext: {print($0) }) .addDisposableTo(disposeBag) Observable.of("a", "b", "c", "d") .takeLast(2) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) }
执行结果
--- Example of: takeLast ---
3
4
5
c
d
example(of: "distinctUntilChanged") { let disposeBag = DisposeBag() Observable.of("a", "a", "b", "b", "a") .distinctUntilChanged() .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) } /* --- Example of: distinctUntilChanged --- a b a 注意:是连续重复的元素,所以最后的a是可以打印的 */
example(of: "single") { let disposeBag = DisposeBag() Observable.of("1", "a", "2", "b") .single() .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) Observable.of("right") .single() .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) }
执行结果
--- Example of: single ---
1
Received unhandled error: /var/folders/wl/c1df1cbx5_l2cf3p0qwp7lqh0000gn/T/./lldb/23529/playground632.swift:45:__lldb_expr_632 -> Sequence contains more than one element.
right
single还可以拥有一个条件闭包,找需要的元素
example(of: "single with conditions") { let disposeBag = DisposeBag() Observable.of("1", "2", "3", "4", "5", "6") .single{$0 == "3"} .subscribe{print($0)} .addDisposableTo(disposeBag) /* next(3) completed */ Observable.of("a", "b", "c","b") .single { $0 == "b" } .subscribe { print($0) } .addDisposableTo(disposeBag) /* next(b) error(Sequence contains more than one element.) */ Observable.of("A", "B", "C") .single { $0 == "a" } .subscribe { print($0) } .addDisposableTo(disposeBag) /* error(Sequence doesn't contain any elements.) */ }
example(of: "share") { let disposeBag = DisposeBag() let numbers = Observable<Int>.create { observer in observer.onNext(1) observer.onNext(2) observer.onCompleted() return Disposables.create() } //订阅者1 numbers.subscribe(onNext: { print("element [\($0)]") }, onCompleted: { print("completed!") }) .addDisposableTo(disposeBag) //订阅者2 numbers.subscribe(onNext: { print("element [\($0)]") },onCompleted: { print("completed!") }) .addDisposableTo(disposeBag) } 执行结果为:
--- Example of: share --- element [1] element [2] completed! element [1] element [2] completed! 由上面结果,不知道有没有发现一个问题,就是两次订阅得到相同的结果,而且创建了不同的观察者序列。即每次调用subscribe,都为订阅者创建了一个新的观察者序列(Observable)。事实上,这是没有必要。因为这会导致很多重复的元素,执行不必要的操作。特别是网络请求,肯定希望共享订阅者。所以为了共享订阅,我们可以使用share()操作符。避免多个订阅者订阅相同的观察者序列重复执行不必要的操作。所以改写例子中的创建观察者序列部分,添加share()操作符,更多相关信息,看这里:
let numbers = Observable<Int>.create { observer in observer.onNext(1) observer.onNext(2) observer.onCompleted() return Disposables.create() }.share() 执行结果为:
--- Example of: share --- element [1] element [2] completed! completed!
example(of: "shareReplay(1)") { let disposeBag = DisposeBag() let sequence = Observable.of(1,2) .map{ i -> Int in print("map---\(i)") return i * 2 }.shareReplay(1) sequence.subscribe(onNext: { print("subscribe1 = \($0)")}).addDisposableTo(disposeBag) sequence.subscribe(onNext: { print("subscribe2 = \($0)")}).addDisposableTo(disposeBag) } 执行结果和分析: --- Example of: shareReplay(1) --- map---1 subscribe1 = 2 map---2 subscribe1 = 4 subscribe2 = 4 因为我们使用了shareReplay(1),所以原观察者序列被共享,并且缓存一个元素。所以在第二次订阅观察者序列的时候,会发送上一次缓存的最后一个元素数据,例子中为4.如果我们将shareReplay(1)改为shareReplay(2),结果如下: --- Example of: shareReplay(2) --- map---1 subscribe1 = 2 map---2 subscribe1 = 4 subscribe2 = 2 subscribe2 = 4 即缓存了2个元素数据,重发2个元素。如果发送的消息元素个数小于指定缓存大小,发送实际元素个数。 实际使用场景: 比如,使用URLSession.rx.response(request:) 发送你的请求到服务器,一旦接受到返回数据就会发送.next事件,返回对应的数据,并标记完成。在这种情况,如果观察者序列(observable)完成了,这时你再一次订阅它,又将重新创建一个新的订阅并且启动另外一个相同的请求服务端数据操作。为了防止这种情况,我们就可以使用shareReplay(_),缓存上一个发送的元素(即服务端返回的数据),一旦有新的订阅,shareReplay操作符会立马从缓存中获取来自服务端返回的数据。
example(of: "debounce") { let disposeBag = DisposeBag() Observable.of(1,2,3,4,5) .debounce(1, scheduler: MainScheduler.instance) .subscribe(onNext: {print($0)}) .addDisposableTo(disposeBag) } 执行结果: --- Example of: debounce --- 5 可以看到只发射了一个5 ,因为这个序列发射速度是非常快的,所以过滤掉了前面发射的值。看几个经常使用的情况: 1)根据searchBar或者textField输入的文本内容进行搜索相应的结果,我们会有如下代码:
searchBar.rx .text // Observable property thanks to RxCocoa .orEmpty // Make it non-optional .debounce(0.5, scheduler: MainScheduler.instance) // Wait 0.5 for changes .subscribe(onNext: { [unowned self] text in self.search(query: text) // 搜索结果 }).addDisposableTo(disposeBag) 上面使用了debounce(0.5, scheduler: MainScheduler.instance),所以debounce会告诉Rx,当searchBar中的文本内容在上一个0.5秒文本内容没有发生改变,我们想接收通知。这意味着,如果我们的用户一直在不断的输入内容,在停止0.5秒之后会立马发送事件,进行后续查询操作。主要是为了解决,我们并不想每次用户输入一点内容就调用查询API,仅仅是用户输入完成再进行查询。 2)防止按钮多次点击操作
button.rx.tap .debounce(0.3, scheduler: MainScheduler.instance) .subscribe(onNext: { [unowned self] in if self.isFollowedByMe() { self.follow() } else { self.unfollow() } }).addDisposableTo(disposeBag) 比如:我们有一个点赞的按钮,当用户觉得内容不错,想进行点赞的时候,会点击按钮,但是也有可能有些用户会连续点多次点击按钮,通过使用debounce,能够帮助我们避免发送多次请求。这还是很有必要的。