RxJava从源码到应用——RxJava1基本元素源码分析(3-2)

xiaoxiao2021-02-28  5

RxJava1 五大元素

1.Observable

2.Observer

3.Subscription

4.OnSubscribe

5.Subscriber

Subscription tSubscription = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("test"); subscriber.onCompleted(); } } }).subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext:" + s); } });

Observable通过create方法创建Create对象,里边传入的onSubscribe对象,其中有个方法是call()方法,里边传入的是一个subscriber实例,之后这个Observable对象去subscribe一个Observer,最后这个方法返回一个Subscription,这是我们上一章看的例子,我们现在就从重点的create()方法,subscribe(),还有这个call()方法,具体的来分析一下。

Subscription:

public interface Subscription { /** * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription * was received. * <p> * This allows deregistering an {@link Subscriber} before it has finished receiving all events (i.e. before * onCompleted is called). */ void unsubscribe(); /** * Indicates whether this {@code Subscription} is currently unsubscribed. * * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise */ boolean isUnsubscribed(); }这就是个接口,里边有这两个方法,unsubscribe()接触订阅,isUnsubscription()检测是否订阅。 Observer:

public interface Observer<T> { /** * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. * <p> * The {@link Observable} will not call this method if it calls {@link #onError}. */ void onCompleted(); /** * Notifies the Observer that the {@link Observable} has experienced an error condition. * <p> * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or * {@link #onCompleted}. * * @param e * the exception encountered by the Observable */ void onError(Throwable e); /** * Provides the Observer with a new item to observe. * <p> * The {@link Observable} may call this method 0 or more times. * <p> * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or * {@link #onError}. * * @param t * the item emitted by the Observable */ void onNext(T t); }

Subscriber:

public abstract class Subscriber<T> implements Observer<T>, Subscription { @Override public final void unsubscribe() { subscriptions.unsubscribe(); } /** * Indicates whether this Subscriber has unsubscribed from its list of subscriptions. * * @return {@code true} if this Subscriber has unsubscribed from its subscriptions, {@code false} otherwise */ @Override public final boolean isUnsubscribed() { return subscriptions.isUnsubscribed(); } } 他是个抽象类实现了Observer和Subscription接口,里边也对他们的接口做了基本的实现。其中Observer的方法并没有去实现,只是对Subscription的方法进行实现,其他的一些方法都是用来解决其它问题的,并不是我们这章分析的重点,这章主要分析这几个基本元素的关系,

OnSubscribe:

public class Observable<T> { public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity }} onSubscrib是Observable里边的一个接口,他继承了Action1,

public interface Action1<T> extends Action { void call(T t); }

这里有个方法就是call()方法。

Observable:

-createa():

@Deprecated public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }

createa方法里边传入了OnSubscribe实例,然后new 了一个Observable实例,里边用了一个RxJavaHooks类去调用了onCreate方法,

RxJavaHooks:

public final class RxJavaHooks { public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }}

它只是把传进来的OnSubscribe实例给返回了出去,所以说在RxJavaHooks.onCreatea(f)接到的就是OnSubscribe实例。

-subscribe():

public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber<T>(observer)); }

里边传入了Observer,这个判断我们先不用看,先看他的实质返回了什么,他就是本质的去调用了里边的subscribe()方法, 它把这个observer做了一个包装,

ObserverSubscriber:

public final class ObserverSubscriber<T> extends Subscriber<T> { final Observer<? super T> observer; public ObserverSubscriber(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { observer.onNext(t); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onCompleted() { observer.onCompleted(); } }这个包装其实就是Subscriber,所以说这里边实际上就是用一个Subscriber把这个你传入的具体的Observer做一下包装,然后再调用这个Subscribe()方法,

public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } 所以这个subscribe()方法才是正的,这个subscribe()方法里边其实调用了一个静态方法,

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } } return Subscriptions.unsubscribed(); } }

接下来我们重点看这个静态的subscribe()方法,它里边传入两个参数一个是Subscriber,这个Subscriber我们前边看了,就是你传入的Observer, 然后RxJava里边用一个Subscriber做一下包装,然后再传入到这里,后边的参数是一个Observable对象,前边判断就忽略了,后边做了一subscriber特有的一个onStart()方法,重点是

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber);

这两句话,才是这个方法的核心,通过一个RxJavaHooks的onObservableStart()方法,返回的参数再调用一下call(),我们先来看一下onObservableStart()方法:

public final class RxJavaHooks { public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart; if (f != null) { return f.call(instance, onSubscribe); } return onSubscribe; }}

返回的是OnSubscribe,所以说RxJavaHooks.OnObservableStart()方法返回的就是onservable的onSubscribe,再去调用call()方法,里边给到上边传入的一个subscriber参数,所以说最后回到例子,这个subscribe调用的就是这个call()方法,所以是调用了

@Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("test"); subscriber.onCompleted(); } }

这段代码,才能去向外发射数据,这里

@Override public void onNext(String s) { System.out.println("onNext:" + s); }

才能去接受到数据。

之后接着说,是这个return方法,

return RxJavaHooks.onObservableReturn(subscriber);

这个方法最后要返回一个Subscription,那么这个Subscription到底是谁呢,我们来看一下这里还是用了一个RxJavaHooks,用了它的onSubscribeReturn()里边传入了一个subscriber,我们来看一下这个方法。

public final class RxJavaHooks { public static Subscription onObservableReturn(Subscription subscription) { Func1<Subscription, Subscription> f = onObservableReturn; if (f != null) { return f.call(subscription); } return subscription; }            }

就是把你传入的参数给返回,这里传入的固参的一个限定是Subscription,我们知道subscriber已经实现了Subscription方法,其实最后返回的就是你之前传入的subscriber。分析完了总结一下

Observable

1、观察得到的———被观察者

2、通过Observable创建一个可观察的序列(create方法)

3、通过suscribe去注册一个观察者

Observer

1、用于接收数据的——观察者

2、作为Observable的subscribe方法的参数

Subscription

1、订阅,用于描述被观察者和观察者之间的关系。

2、用于取消订阅和获取当前订阅状态。

OnSubscribe

1、当订阅时会触发此接口调用(call方法)

2、在Observable内部,实际作用是向订阅者发射数据(onNext方法)

Subscriber

1、实现了Observer和Subscription

刚才我们看源码的时候也看到了,最后return的时候就是把你传入的这个subscriber最后返回掉了,所我们说

2、只有自己才能阻止自己

分析

1、现实中的案例——打电话

Observable相当于打电话的人,他subscibe了一个subscriber,给接电话的人打了一个电话。

然后通过OnSubscribe其实就是他们之间这个通信电路,通过这个向接电话人发射数据,最后

这个接电话人可以随时挂掉电话Subscription,这样理解一下他们之间的关系就会简单一点。

下面是UML图

Observable里边有一个onSubscribe实例,还有两个方法,一个是subscribe它里边接受的是Subscriber,另一个也是subscribe接收的是Observer,这个Subscriber有实现了,Observer和Subscription两个接口,实际发送数据的是调用这个call方法,通过里边传入的这个subscriber,onNext方法来去实际的发射数据。

下回分解

1、源码分析RxJava2基本元素

转载请注明原文地址: https://www.6miu.com/read-2350270.html

最新回复(0)