RxJava的简单使用(一)

xiaoxiao2021-02-28  15

1.RxJava的介绍

主要特点有:

易于并发从而更好的利用服务器的能力。易于有条件的异步执行。一种更好的方式来避免回调地狱。一种响应式方式。

2.RxJava的与众不同

有三个重要的功能:

生产者在没有更多的数据可用时能够发出信号通知:onCompleted()事件。生产者在发生错误时能够发出信号通知:onError()事件。RxJava Observables 能够组合而不是嵌套,从而避免开发者陷入回调。 onNext()事件

有三个重要的角色: 1. Observable(被观察者) 2. Observer(观察者) 3. Subjects(订阅者)

三者的关联: Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

3.常用的操作符

create,just,from,empty,never,error

create()方法创建一个Observable(被观察者)。需要传一个OnSubscribe < T >对象,该对象继承Action1,当Observer(观察者)订阅,被Observable(被观察者)时,它作为一个参数传入并执行Call()函数。

源码

/** * Constructs an Observable in an unsafe manner, that is, unsubscription and backpressure handling is the responsibility of the OnSubscribe implementation. * @param <T> the value type emitted * @param f the callback to execute for each individual Subscriber that subscribes to the * returned Observable * @return the new Observable instance * @deprecated 1.2.7 - inherently unsafe, use the other create() methods for basic cases or * see {@link #unsafeCreate(OnSubscribe)} for advanced cases (such as custom operators) * @see #create(SyncOnSubscribe) * @see #create(AsyncOnSubscribe) * @see #create(Action1, rx.Emitter.BackpressureMode) */ @Deprecated public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); } /** * Invoked when Observable.subscribe is called. * @param <T> the output value type */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity } /** * A one-argument action. * @param <T> the first argument type */ public interface Action1<T> extends Action { void call(T t); }

通过以上的源码,看看怎么使用该方法

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { for (int i = 0; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } }); Subscription subscription = observable.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("RxJavaActivity.onCompleted,生产者数据已经走完"); } @Override public void onError(Throwable e) { System.out.println("e = " + e.getMessage().toString()); } @Override public void onNext(Integer integer) { System.out.println("integer = " + integer); } });

打印日志: 06-27 17:40:36.111 6016-6016/test.demo I/System.out: integer = 0 06-27 17:40:36.112 6016-6016/test.demo I/System.out: integer = 1 06-27 17:40:36.112 6016-6016/test.demo I/System.out: integer = 2 06-27 17:40:36.112 6016-6016/test.demo I/System.out: integer = 3 06-27 17:40:36.112 6016-6016/test.demo I/System.out: integer = 4 06-27 17:40:36.112 6016-6016/test.demo I/System.out: RxJavaActivity.onCompleted,生产者数据已经走完

还有一个要说的是,当我们onCompleted()与onError()两个方法调用之后,onNext()该方法就不会在执行。可以自行测试,将代码修改成这样:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { observer.onCompleted(); for (int i = 0; i < 5; i++) { observer.onNext(i); } } });

Log日志: 06-27 17:40:36.112 6016-6016/test.demo I/System.out: RxJavaActivity.onCompleted,生产者数据已经走完

所以说,该调用与先后顺序也有一定的关系。我们在看源码:

public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // 判断我们subscriber是否为null if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception } // 调用订阅者的onStart()方法,由此可以得出,该方法是必走的 subscriber.onStart(); // 将(subscriber不安全)的订阅者转换成(SafeSubscriber安全)的订阅者 if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version //返回一个受保护的subscriber subscriber = new SafeSubscriber<T>(subscriber); } // 后面代码不必追究 ....... return Subscriptions.unsubscribed(); } } //我们看SafeSubscriber类的代码 public class SafeSubscriber<T> extends Subscriber<T> { private final Subscriber<? super T> actual; boolean done; //这个标识 ,初始值是false; public SafeSubscriber(Subscriber<? super T> actual) { super(actual); this.actual = actual; } @Override public void onCompleted() { if (!done) { done = true; //这里可以看出,当该方法调用之后,done设置成了 true; try { actual.onCompleted(); } catch (Throwable e) { throw new OnCompletedFailedException(e.getMessage(), e); } finally { // NOPMD try { unsubscribe(); } catch (Throwable e) { RxJavaHooks.onError(e); throw new UnsubscribeFailedException(e.getMessage(), e); } } } } /** @Override public void onError(Throwable e) { Exceptions.throwIfFatal(e); if (!done) { done = true; //这里可以看出,当该方法调用之后,done设置成了 true; _onError(e); } } @Override public void onNext(T t) { try { if (!done) {//当done=true时,跳出该方法 actual.onNext(t); } } catch (Throwable e) { Exceptions.throwOrReport(e, this); } } @SuppressWarnings("deprecation") protected void _onError(Throwable e) { // NOPMD RxJavaPlugins.getInstance().getErrorHandler().handleError(e); try { actual.onError(e); } catch (OnErrorNotImplementedException e2) { // NOPMD try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaHooks.onError(unsubscribeException); throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD } throw e2; } catch (Throwable e2) { RxJavaHooks.onError(e2); try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaHooks.onError(unsubscribeException); throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException))); } throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); } try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaHooks.onError(unsubscribeException); throw new OnErrorFailedException(unsubscribeException); } } public Subscriber<? super T> getActual() { return actual; } }

以上代码,可以看出当调用了onError(),onCompleted()方法时,done=true之后,onNext()方法则不在执行,另外就是onError(),onCompleted()两方法是互斥的,一方法调用之后,则另一方法不在执行。

from()的使用方法我们可以看代码: //from方法传参,只支持数组与集合类型的参数。具体看源码 ArrayList<Integer> list = new ArrayList<>(); list.add(100); list.add(200); list.add(300); list.add(400); String[] strings = new String[]{"1","2","3","4"}; Observable.from(strings) .map(Integer::parseInt)//通过map将类型转成整型 .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("from.onCompleted"); } @Override public void onError(Throwable e) { System.out.println("e = " + e.getMessage().toString()); } @Override public void onNext(Integer integer) { System.out.println("integer = " + integer); } }); Log日志: 06-28 09:57:59.165 27726-27726/test.demo I/System.out: integer = 1 06-28 09:57:59.690 27726-27726/test.demo I/System.out: integer = 2 06-28 09:58:00.058 27726-27726/test.demo I/System.out: integer = 3 06-28 09:58:00.619 27726-27726/test.demo I/System.out: integer = 4 06-28 09:58:01.572 27726-27726/test.demo I/System.out: from.onCompleted just()方法使用,代码: //just()支持多种类型的传参,而且可以传多个参数。 ArrayList<Integer> list = new ArrayList<>(); list.add(100); list.add(200); list.add(300); list.add(400); //just它只能发射整个列表,不会迭代发射整个列表的每个值 Observable.just(list) .subscribe(new Subscriber<ArrayList<Integer>>() { @Override public void onCompleted() { System.out.println("just.onCompleted"); } @Override public void onError(Throwable e) { System.out.println("e = " + e.getMessage().toString()); } @Override public void onNext(ArrayList<Integer> integers) { for (int i = 0; i < integers.size(); i++) { System.out.println(integers.get(i)); } } }); Observable.never()方法,看源码: public static <T> Observable<T> never() { return NeverObservableHolder.instance();//返回一个单列 } //再看NeverObservableHolder.class源码: public enum NeverObservableHolder implements OnSubscribe<Object> { INSTANCE ; /** The singleton instance. */ static final Observable<Object> NEVER = Observable.unsafeCreate(INSTANCE); /** * Returns a type-corrected singleton instance of the never Observable. * @param <T> the value type * @return a type-corrected singleton instance of the never Observable. */ @SuppressWarnings("unchecked") public static <T> Observable<T> instance() { return (Observable<T>)NEVER; } //因为是实现了OnSubscribe接口,则会生成call方法,可以看到方法内什么都没做,所以调用该方法直接结束了观察者的执行。 @Override public void call(Subscriber<? super Object> child) { // deliberately no op } } Observable.empty()方法,看源码: public static <T> Observable<T> empty() { return EmptyObservableHolder.instance();//返回一个单列 } //再看EmptyObservableHolder.class类 public enum EmptyObservableHolder implements OnSubscribe<Object> { INSTANCE ; /** The singleton instance. */ static final Observable<Object> EMPTY = Observable.unsafeCreate(INSTANCE); /** * Returns a type-corrected singleton instance of the empty Observable. * @param <T> the value type * @return a type-corrected singleton instance of the empty Observable. */ @SuppressWarnings("unchecked") public static <T> Observable<T> instance() { return (Observable<T>)EMPTY; } @Override public void call(Subscriber<? super Object> child) { child.onCompleted();//直接走了已完成的方法,结束其它方法的执行。 } } Observable.error()方法,看源码: public static <T> Observable<T> error(Throwable exception) { return unsafeCreate(new OnSubscribeThrow<T>(exception));//直接创建一个异常的OnSubscribe } //再看OnSubscribeThrow.class类 public final class OnSubscribeThrow<T> implements OnSubscribe<T> { private final Throwable exception; public OnSubscribeThrow(Throwable exception) { this.exception = exception; } /** * Accepts an {@link Observer} and calls its {@link Observer#onError onError} method. * * @param observer * an {@link Observer} of this Observable */ @Override public void call(Subscriber<? super T> observer) { observer.onError(exception); //直接执行异常的方法,其它方法则结束执行 } }
转载请注明原文地址: https://www.6miu.com/read-800104.html

最新回复(0)