RxJava扩展了观察者模式的语义,添加了两个新的操作接口:
onCompleted() 通知观察者,Observable没有更多的数据了
onError() 观察者,有错误出现了
而onNext() 将被观察者生产的事件通知到观察者
其中,Observable和Subject是两个”生产”(被观察者)的实体,Observer和Subscriber是两个”消费”(观察者)的实体。其中Observable、Observer是两个基础角色。
Subscriber实现了Observer,并且还添加了一个onStart(),它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作。
Subject = Observable + Observer
下图描述Subject与Observer的一对多关系:
热的:Observable只要一创建,就会立即开始emit(发射)数据。后续的订阅它的观察者,可能从序列中间的某个位置开始接受数据(有一些数据错过了) 冷的:Observable创建后,一直在等待,直到有观察者订阅它才开始emit数据
Subject,既是一个Observable,也是一个Observer。 可以emit数据,可以subscribe 一个Observable或其它Subject。
RxJava 提供四种不同的 Subject:
PublishSubjectBehaviorSubjectReplaySubjectAsyncSubjectUnicastSubjectSerializedSubject是Subject的一个子类,它通过create()创建实例。它是一个”冷的”Observable。直到触发它的onNext(T t),才开始emit数据,并完成订阅。 可用于实现类似EventBus的RxBus。
是Subject的一个子类,它通过create()创建实例。首先会向它的订阅者发送截止订阅前的最后一条数据流,然后才正常发送订阅后的数据流。
{ // observer will receive all 4 events (including "default"). BehaviorSubject<Object> subject = BehaviorSubject.create("default"); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); // observer will receive the "one", "two" and "three" events, but not "default" and "zero" BehaviorSubject<Object> subject = BehaviorSubject.create("default"); subject.onNext("zero"); subject.onNext("one"); subject.subscribe(observer); subject.onNext("two"); subject.onNext("three"); // observer will receive only onCompleted BehaviorSubject<Object> subject = BehaviorSubject.create("default"); subject.onNext("zero"); subject.onNext("one"); subject.onCompleted(); subject.subscribe(observer); // observer will receive only onError BehaviorSubject<Object> subject = BehaviorSubject.create("default"); subject.onNext("zero"); subject.onNext("one"); subject.onError(new RuntimeException("error")); subject.subscribe(observer); }缓存订阅的数据,重发给订阅它的观察者
ReplaySubject<String> subject = ReplaySubject.create(); subject.onNext("haha1"); subject.onNext("haha2"); subject.onNext("haha3"); subject.subscribe(s -> System.out.println("rss1" + s));仅在Observable完成之后,发送最后一条数据给观察者。
然而如果当Observable因为异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。
AsyncSubject<String> subject = AsyncSubject.create(); subject.onNext("haha1"); subject.onNext("haha2"); subject.onNext("haha3"); subject.onCompleted(); subject.subscribe(s -> System.out.println("ass1" + s));只允许有一个 Subscriber 订阅(内部subscribeActual(),用到了AtomicBoolean 判断,第二次无就抛出异常)
UnicastSubject subject = UnicastSubject.create(); subject.onNext("haha1"); subject.onNext("haha2"); subject.onNext("haha3"); subject.onCompleted(); subject.subscribe(s -> System.out.println("uss1" + s));当我们使用普通的Subject,必须要注意不要在多线程情况下调用onNext方法, 而使用SerializedSubject封装原来的 Subject即可!! 内部使用了SerializedObserver。查看其doc,如果是多线程环境,即有多个线程发射通知时,它们将被按序列执行: 1.允许仅有一个线程,执行一个emit 2.如果另一线程,已经emit,将下一个添加到通知队列 3.在循环emitting时,不持有任何锁或阻塞任何线程
ReplaySubject<String> subject = ReplaySubject.create(); SerializedSubject<String, String> serializedSubject = new SerializedSubject<>(subject); serializedSubject.subscribe(s -> System.out.println("thread-" + Thread.currentThread().getName() + "--" + s)); for (int i = 0; i < 10; i++) { int finalIndex = i; new Thread(() -> serializedSubject.onNext("haha" + finalIndex)).start(); } serializedSubject.onNext("haha20"); serializedSubject.onNext("haha21"); serializedSubject.onNext("haha22");https://github.com/aa86799/RxJavaDemo
