以最简单的调用过程为例
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); subscriber.onNext("world"); subscriber.onCompleted(); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }先来看create()
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }此处RxJavaHooks.onCreate(f)返回的就是OnSubscribe对象,然后作为传参传入Observable并返回。
//Observable protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }即create会将f赋值给Observable.onSubscribe,并返回Observable对象
再来看subscribe
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { subscriber.onStart(); //... RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); }此处首先会调用subscriber.onStart(), 接着RxJavaHooks.onObservableStart一般实际返回的就是observable.onSubscribe, 然后调用observable.onSubscribe.call(), 最后通过RxJavaHooks.onObservableReturn返回Subscription对象。(也就是subscriber,Subscriber实现了Subscription接口)
Observable.subscribe()的过程其实就是先调用subscriber.onStart(),再通过Observable.OnSubscribe.call(subscriber)来间接调用subscriber的onNext、onError、onCompleted等方法。call的具体实现各个OnSubscribe是不同的。
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello world"); subscriber.onNext("tttt"); subscriber.onCompleted(); } })