RxJava小考题

xiaoxiao2021-03-01  6

前言: 现在面试很多都会问RxJava的源码,直接讲RxJava的源码,估计大家都不太会看下去,我们先看个小考题,然后再去看相关的源码。 正文:

问题一:
//对象类 class Data { public String name; public Data(String name) { this.name = name; } } //比如我们使用just操作符来发送二个Data对象 Observable<Data> data = Observable.just( new Data("aaaa"), new Data( "bbbb") ); //比如我们用一个Consumer对象来订阅 data.subscribe(new Consumer<Data>() { @Override public void accept(Data data) throws Exception { //里面的内容是把Observable发送过来的对象里面的name属性值改成cccc data.name = "cccc"; } }); //这时候我们在用一个新的Consumer来订阅这个Observable data.subscribe(new Consumer<Data>() { @Override public void accept(Data data) throws Exception { //这时候打印data.name Log.v("TAG",data.name); } });

问题: 我们用二个Customer分别去订阅一个发送对象的Observable,这时候我们的Log.v(“TAG”,data.name);输出内容是什么。

问题二:

Observable<Integer> data1 = Observable.just( 1,2 ); data1.subscribe(new Consumer<Integer>() { @Override public void accept(Integer d) throws Exception{ d++; } }); data1.subscribe(new Consumer<Integer>() { @Override public void accept(Integer d) throws Exception{ Log.v("TAG","d:"+d); } });
问题:

这时候我们的Log.v(“TAG”,”d:”+d);输出内容是什么。

答案是: 第一个输出是cccc,cccc;第二个是1,2。不知道大家做对了没有。 如果没有做对题目的,我们就来一起来分析代码。

问题分析: 我们先看第一个情形的代码:

Observable.just( new Data("aaaa"), new Data( "bbbb") );

just源码 :

public static <T> Observable<T> just(T item1, T item2) { ObjectHelper.requireNonNull(item1, "The first item is null"); ObjectHelper.requireNonNull(item2, "The second item is null"); return fromArray(item1, item2); }

前面的二行是检查是否为null。主要是第三行通过fromeArray返回了一个Observable。

@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }

我们可以看到根据用户传的个数,返回不同的Observable,比如0个的时候返回empty();,一个的时候返回just(items[0]);,其他都返回RxJavaPlugins.onAssembly(new ObservableFromArray(items));,但其实本质都差不多。为什么这么说: empty()源码:

@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @SuppressWarnings("unchecked") public static <T> Observable<T> empty() { return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE); }

我们可以看到也是调用了RxJavaPlugins.onAssembly方法。 just(items[0])源码:

@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }

其实也是调用了RxJavaPlugins.onAssembly方法。 所以三者虽然是不同的Observable,但是都是调用RxJavaPlugins.onAssembly方法,然后传入不同的对象参数而已。

PS : 对于我们平时见到的什么Observable.create方法,或者Observable.interval方法等,都是最终调用RxJavaPlugins.onAssembly方法,只是一个调用了RxJavaPlugins.onAssembly(new ObservableCreate(source));,一个调用了RxJavaPlugins.onAssembly(new ObservableInterval(xxx,xxxx,xxxx))方法。

因为我们的情形一种是发射了二个对象,那我们就重点来看一下RxJavaPlugins.onAssembly(new ObservableFromArray(items));方法: 我们可以看到RxJavaPlugins.onAssembly方法:

@SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }

我们可以看到最后返回了传入的Observable source,所以也就是我们传入的new ObservableFromArray(items)。 所以最终我们拿到的Observable是new ObservableFromArray(items),所以我们一般接下去就是

//本质就是传了一个 new ObservableFromArray<T>(items) Observable observable = Observable.just( new Data("aaaa"), new Data( "bbbb") ); //比如我们用一个Observer对象来订阅 observable .subscribe(new Observer<Data>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Data data) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });

我们先来看subscribe方法:

@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //最终调用了Observable的subscribeActual方法 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }

我们可以看到代码里面的加的备注,说明最终我们的 observable.subscribe(observer)最终执行了变为:observable.subscribeActual(observer);,因为我们说过我们的observable具体是ObservableFromArray的实例,所以我们直接看相关源码。 ObservableFromArray.class:

public final class ObservableFromArray<T> extends Observable<T> { final T[] array; //实例化对象的调用的构造函数,同时传入我们要发送的数组 public ObservableFromArray(T[] array) { this.array = array; } //最终订阅的时候调用这个方法 @Override public void subscribeActual(Observer<? super T> s) { //new 一个我们平时用来取消订阅的Disposable,这里具体是FromArrayDisposable FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); //也就是我们Observer复写的onSubscribe方法,并把Disposable对象传入 s.onSubscribe(d); if (d.fusionMode) { return; } //然后执行了FromArrayDisposable对象的run方法 d.run(); } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> actual; final T[] array; int index; boolean fusionMode; volatile boolean disposed; //构造函数,传入了Observer 和我们要传的数组 FromArrayDisposable(Observer<? super T> actual, T[] array) { this.actual = actual; this.array = array; } @Override public int requestFusion(int mode) { if ((mode & SYNC) != 0) { fusionMode = true; return SYNC; } return NONE; } @Nullable @Override public T poll() { int i = index; T[] a = array; if (i != a.length) { index = i + 1; return ObjectHelper.requireNonNull(a[i], "The array element is null"); } return null; } @Override public boolean isEmpty() { return index == array.length; } @Override public void clear() { index = array.length; } //我们用来取消订阅的方法 @Override public void dispose() { disposed = true; } //用来判断是否取消了订阅 @Override public boolean isDisposed() { return disposed; } //当订阅的时候,真正执行的是Disposable的run方法。 void run() { T[] a = array; int n = a.length; /*遍历我们要传的数组,然后并且判断isDisposed()的disposed值 所以我们就知道了为啥我们取消订阅只要执行Disposable.dispose()方法了 因为这时候会把disposed返回true,然后这里的for循环判断就会退出循环。 */ for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; /*我们知道在RxJava 1的时候我们发送一个null值是可以的, 但是RxJava2就不行了,因为做了一个判空操作。 就会执行Observer的onError方法 */ if (value == null) { actual.onError(new NullPointerException("The " + i + "th element is null")); return; } //执行Observer的onNext方法,并且把值一个个传过去 actual.onNext(value); } /*如果用户在onNext都运行完后,并且没有执行dispose()方法, 则if里面为true,就会执行Observer的onComplete()方法。 */ if (!isDisposed()) { actual.onComplete(); } } } }

我们通过代码可以看到,其实Observable在生成实例后,里面发送的数组是同一个数组,并且发送的数据也是同一个数据,所以虽然我们用多个Observer去订阅的时候,收到的Data对象是同一个,但是因为第一个Observer对这个对象里面的属性修改了,所以第二个Observer对获取同个对象的时候,获取的属性值也就变了。 简单可以这么理解:

Data data = new Data("aaaa"); Log.v("TAG",data.name); change(data); Log.v("TAG",data.name) public void change(Data data){ data.name = "bbbb"; }

其实我们可以这么理解。虽然都是打印同一个对象,但是属性被更改了。 所以我们的情形一的代码结果是不是已经能理解了呢,各位。 而情形二其实不是考验RxJava的源码基础,而是考验 Java基础。因为情形二我们发送的是(1,2);相当于:

int data = 1; Log.v("TAG","data:"+data); change(data); Log.v("TAG","data:"+data); public void change(int data){ data = 2; }

你就会发现其实二个Log打印的内容是一样的,都是1。

与其他语言不同,Java不允许程序员选择按值传递还是按引用传递各个参数,基本类型(byte–short–int–long–float–double–boolean–char)的变量总是按值传递。就对象而言,不是将对象本身传递给方法,而是将对象的的引用或者说对象的首地址传递给方法,引用本身是按值传递的———–也就是说,讲引用的副本传递给方法(副本就是说明对象此时有两个引用了),通过对象的引用,方法可以直接操作该对象(当操作该对象时才能改变该对象,而操作引用时源对象是没有改变的)。

结语:

所以本章我们更多地看了Rxjava的Observable生成及Observer订阅时候的部分源码及Java值传递等相关知识。

转载请注明原文地址: https://www.6miu.com/read-3650297.html

最新回复(0)