前言: 现在面试很多都会问RxJava的源码,直接讲RxJava的源码,估计大家都不太会看下去,我们先看个小考题,然后再去看相关的源码。 正文:
问题: 我们用二个Customer分别去订阅一个发送对象的Observable,这时候我们的Log.v(“TAG”,data.name);输出内容是什么。
这时候我们的Log.v(“TAG”,”d:”+d);输出内容是什么。
答案是: 第一个输出是cccc,cccc;第二个是1,2。不知道大家做对了没有。 如果没有做对题目的,我们就来一起来分析代码。
问题分析: 我们先看第一个情形的代码:
Observable.just( new Data("aaaa"), new Data( "bbbb") );just源码 :
public static <T> Observable<T> just(T item1, T item2) { ObjectHelper.requireNonNull(item1, "The first item is null"); ObjectHelper.requireNonNull(item2, "The second item is null"); return fromArray(item1, item2); }前面的二行是检查是否为null。主要是第三行通过fromeArray返回了一个Observable。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }我们可以看到根据用户传的个数,返回不同的Observable,比如0个的时候返回empty();,一个的时候返回just(items[0]);,其他都返回RxJavaPlugins.onAssembly(new ObservableFromArray(items));,但其实本质都差不多。为什么这么说: empty()源码:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @SuppressWarnings("unchecked") public static <T> Observable<T> empty() { return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE); }我们可以看到也是调用了RxJavaPlugins.onAssembly方法。 just(items[0])源码:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }其实也是调用了RxJavaPlugins.onAssembly方法。 所以三者虽然是不同的Observable,但是都是调用RxJavaPlugins.onAssembly方法,然后传入不同的对象参数而已。
PS : 对于我们平时见到的什么Observable.create方法,或者Observable.interval方法等,都是最终调用RxJavaPlugins.onAssembly方法,只是一个调用了RxJavaPlugins.onAssembly(new ObservableCreate(source));,一个调用了RxJavaPlugins.onAssembly(new ObservableInterval(xxx,xxxx,xxxx))方法。
因为我们的情形一种是发射了二个对象,那我们就重点来看一下RxJavaPlugins.onAssembly(new ObservableFromArray(items));方法: 我们可以看到RxJavaPlugins.onAssembly方法:
@SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }我们可以看到最后返回了传入的Observable source,所以也就是我们传入的new ObservableFromArray(items)。 所以最终我们拿到的Observable是new ObservableFromArray(items),所以我们一般接下去就是
//本质就是传了一个 new ObservableFromArray<T>(items) Observable observable = Observable.just( new Data("aaaa"), new Data( "bbbb") ); //比如我们用一个Observer对象来订阅 observable .subscribe(new Observer<Data>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Data data) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });我们先来看subscribe方法:
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //最终调用了Observable的subscribeActual方法 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }我们可以看到代码里面的加的备注,说明最终我们的 observable.subscribe(observer)最终执行了变为:observable.subscribeActual(observer);,因为我们说过我们的observable具体是ObservableFromArray的实例,所以我们直接看相关源码。 ObservableFromArray.class:
public final class ObservableFromArray<T> extends Observable<T> { final T[] array; //实例化对象的调用的构造函数,同时传入我们要发送的数组 public ObservableFromArray(T[] array) { this.array = array; } //最终订阅的时候调用这个方法 @Override public void subscribeActual(Observer<? super T> s) { //new 一个我们平时用来取消订阅的Disposable,这里具体是FromArrayDisposable FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); //也就是我们Observer复写的onSubscribe方法,并把Disposable对象传入 s.onSubscribe(d); if (d.fusionMode) { return; } //然后执行了FromArrayDisposable对象的run方法 d.run(); } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> actual; final T[] array; int index; boolean fusionMode; volatile boolean disposed; //构造函数,传入了Observer 和我们要传的数组 FromArrayDisposable(Observer<? super T> actual, T[] array) { this.actual = actual; this.array = array; } @Override public int requestFusion(int mode) { if ((mode & SYNC) != 0) { fusionMode = true; return SYNC; } return NONE; } @Nullable @Override public T poll() { int i = index; T[] a = array; if (i != a.length) { index = i + 1; return ObjectHelper.requireNonNull(a[i], "The array element is null"); } return null; } @Override public boolean isEmpty() { return index == array.length; } @Override public void clear() { index = array.length; } //我们用来取消订阅的方法 @Override public void dispose() { disposed = true; } //用来判断是否取消了订阅 @Override public boolean isDisposed() { return disposed; } //当订阅的时候,真正执行的是Disposable的run方法。 void run() { T[] a = array; int n = a.length; /*遍历我们要传的数组,然后并且判断isDisposed()的disposed值 所以我们就知道了为啥我们取消订阅只要执行Disposable.dispose()方法了 因为这时候会把disposed返回true,然后这里的for循环判断就会退出循环。 */ for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; /*我们知道在RxJava 1的时候我们发送一个null值是可以的, 但是RxJava2就不行了,因为做了一个判空操作。 就会执行Observer的onError方法 */ if (value == null) { actual.onError(new NullPointerException("The " + i + "th element is null")); return; } //执行Observer的onNext方法,并且把值一个个传过去 actual.onNext(value); } /*如果用户在onNext都运行完后,并且没有执行dispose()方法, 则if里面为true,就会执行Observer的onComplete()方法。 */ if (!isDisposed()) { actual.onComplete(); } } } }我们通过代码可以看到,其实Observable在生成实例后,里面发送的数组是同一个数组,并且发送的数据也是同一个数据,所以虽然我们用多个Observer去订阅的时候,收到的Data对象是同一个,但是因为第一个Observer对这个对象里面的属性修改了,所以第二个Observer对获取同个对象的时候,获取的属性值也就变了。 简单可以这么理解:
Data data = new Data("aaaa"); Log.v("TAG",data.name); change(data); Log.v("TAG",data.name) public void change(Data data){ data.name = "bbbb"; }其实我们可以这么理解。虽然都是打印同一个对象,但是属性被更改了。 所以我们的情形一的代码结果是不是已经能理解了呢,各位。 而情形二其实不是考验RxJava的源码基础,而是考验 Java基础。因为情形二我们发送的是(1,2);相当于:
int data = 1; Log.v("TAG","data:"+data); change(data); Log.v("TAG","data:"+data); public void change(int data){ data = 2; }你就会发现其实二个Log打印的内容是一样的,都是1。
与其他语言不同,Java不允许程序员选择按值传递还是按引用传递各个参数,基本类型(byte–short–int–long–float–double–boolean–char)的变量总是按值传递。就对象而言,不是将对象本身传递给方法,而是将对象的的引用或者说对象的首地址传递给方法,引用本身是按值传递的———–也就是说,讲引用的副本传递给方法(副本就是说明对象此时有两个引用了),通过对象的引用,方法可以直接操作该对象(当操作该对象时才能改变该对象,而操作引用时源对象是没有改变的)。
所以本章我们更多地看了Rxjava的Observable生成及Observer订阅时候的部分源码及Java值传递等相关知识。