Observable 的创建 是用链式的工厂模式创建的 Observable.create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); //这个是判断参数是否为空, 未空的时候会抛出异常 ,“source is null” return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }rxjava2.0的创建代码传入的接口改了。 这个主要代码逻辑在返回值哪里里 首先看ObservableCreate(source) ObservableCreate 是继承于Observable 的一个final 类 这里创建了该类,把传入接口的实现类作为参数传入去。 接下来的RxJavaPlugins 类他的说明是这么写的:Utility class(工具类) to inject注入 handlers 操作 to certain standard(某个标准) RxJava operations运营. 这个一个用于注入某个Rxjava运作标准操作的工作类 。 onAssembly
/** * Calls the associated(关联) hook function. //调用相关的hook函数 * @param <T> the value type * @param source the hook's input value * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) //对于某些警告保持沉默 public static <T> Observable<T> onAssembly(Observable<T> source) { Function<Observable, Observable> f = onObservableAssembly; //默认应该是null if (f != null) { return apply(f, source); } return source; }首先看到它返回的是一个observable 的类型的参数 形参是上转型过程 所以用create’方法创建的是一个流程为: 先实例化一个ObservableOnSubscribe的接口的类然后吧这个类 封装上转型到成Observable 类。接下里的问题是,新建的ObservableOnSubscribe接口的subscribe 方法在哪里调用 ,首先在子类的方法里寻找因为这个是上转型的产物。可以在这个方法里找到`
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); //这里调用了实现接口的subscribe方法 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }这个方法找在哪里调用? 从父类的方里面找:
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); //这里调用了子类的subscribeActual方法, } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }果不其然在subscrible 里面找到了这个方法的调用,他在observable里面仅仅是个抽象方法,具体实现在子类的ObservableCreate里面
所以这个Obserrvable的创建流程是: 1. 用工厂模式创建一个Observable的子类ObservableCreate ,该子类实现了Observable的抽象方法 2. 创建子类ObservableCreate 的时候调用构造方法传入ObservableOnSubscribe 接口的实现类作为变量存储在ObservableCreate里面,在实现父类的抽象方法中调用该接口的方法 3. 这个时候Observable的子类ObservableCreate已经创建配置完毕,在用上转型 强制转换成observable。 4. 当你运用Observable.subscirbe 连接observer的时候 调用了子类的subscribeActual 并且吧observer作为形参传入,在然后在子类的subscribeActual 中调用了接口的subscribe方法并传入observer,这个方法我们已经重写了在创建observable的时候,这样就实现了总体的回调机制