查看原文
其他

RxJava系列之创建型操作符

jzman 躬行之 2022-08-26

RxJava 是 ReactiveX 在 Java 上的开源的实现,一个用于通过使用可观察序列来进行异步编程和基于事件的程序的库,这是官网的介绍,主要关注点是异步编程和链式调用以及事件序列。

  1. 引入RxJava

  2. 概念

  3. 基本实现

  4. Just操作符

  5. from操作符

  6. defer操作符

  7. empty操作符

  8. never操作符

  9. timer操作符

  10. interval 操作符

  11. range操作符

  12. 总结

引入RxJava

1implementation "io.reactivex.rxjava2:rxjava:2.2.3"
2implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

概念

RxJava 中的几个重要概念是:观察者(Observer) 、被观察者(Observable)和事件序列,事件序列完全由被观察者者自己控制,那么被观察者如果在需要时通知观察者呢,这就需要被观察者与观察者之间建立订阅关系。建立订阅关系后,当被观察者发生变化,观察者就能在第一时间接收被观察者的变化。

在 RxJava2 中观察者(Observer) 的事件回调方法有四个:

  • onSubscribe:用于解除订阅关系

  • onNext:发送事件时观察者回调该方法接收发送过来的事件序列

  • onError:发送事件时观察者回调该方法表示发送事件序列异常,将不再允许发送事件

  • onComplete:发送事件时观察者回调该方法表示事件序列发送完毕,允许发送事件

注意

  1. onError 调用后不允许继续发送事件,onComplete 调用后允许发送事件,无论是否可以继续发送事件,两者被调用观察者都不会接收消息;

  2. onError 和 onComplete 互斥只允许调用其中一个,如果你在 onComplete 之后调用 onError 程序必然会崩溃,但是 onError 之后调用 onComplete 不崩溃是因为 onError 之后不允许发送事件,自然不会出错;

  3. 四个回调方法中,观察者和被观察者一旦建立订阅关系 onSubscribe 方法就会被回调,onNext、onError、onComplete 方法的回调完全由被观察者决定是否触发,这里容易产生误解。

基本实现

  1. 创建观察者 Observer ,观察者决定时间发生的时候该如何处理,具体参考如下:

1//观察者
2Observer<String> observer = new Observer<String>() {
3    @Override
4    public void onSubscribe(Disposable d) {
5        //解除订阅
6        Log.i(TAG, "onSubscribe--->");
7    }
8
9    @Override
10    public void onNext(String s) {
11        //发送事件时观察者回调
12        Log.i(TAG, "onNext--->"+s);
13    }
14
15    @Override
16    public void onError(Throwable e) {
17        //发送事件时观察者回调(事件序列发生异常)
18        Log.i(TAG, "onError--->");
19    }
20
21    @Override
22    public void onComplete() {
23        //发送事件时观察者回调(事件序列发送完毕)
24        Log.i(TAG, "onComplete--->");
25    }
26};

  1. 创建被观察者 Observable,被观察者决定什么时出触发事件以及触发何种事件,具体参考如下:

1//被观察者
2Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
3    @Override
4    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
5        emitter.onNext("Event1");
6        emitter.onNext("Event2");
7        emitter.onComplete();
8        emitter.onNext("Event3");
9    }
10});

  1. 建立观察者与被观察之间的订阅关系,具体参考如下:

1//建立观察者与被观察者之间的订阅关系
2observable.subscribe(observer);

上述代码的输出结果参考如下:

1onSubscribe--->
2onNext--->Event1
3onNext--->Event2
4onComplete--->

显然,由于在 发送完 Event2 之后就调用了 onComplete 方法,之后发送的事件 Event3 将不会被观察者收到。

上面代码还可以这样写,结果是一样的,具体参考如下:

1Observable.create(new ObservableOnSubscribe<String>() {
2    @Override
3    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
4        emitter.onNext("Event1");
5        emitter.onNext("Event2");
6        emitter.onComplete();
7        emitter.onNext("Event3");
8    }
9}).subscribe(new Observer<String>() {
10    @Override
11    public void onSubscribe(Disposable d) {
12        Log.i(TAG, "onSubscribe--->");
13    }
14
15    @Override
16    public void onNext(String s) {
17        Log.i(TAG, "onNext--->"+s);
18    }
19
20    @Override
21    public void onError(Throwable e) {
22        Log.i(TAG, "onError--->");
23    }
24
25    @Override
26    public void onComplete() {
27        Log.i(TAG, "onComplete--->");
28    }
29});

上面代码中使用了 Observable 的 create 方法来创建 Observable,并以此来进行相关事件的发送,为帮助理解来看一下官方的关于 create 操作符的示意图:

create

Observable 中还提供了很多的静态方法来创建 Observable,下文将会介绍这些常用方法。

Just 操作符

使用 just 可以创建一个发送指定事件的 Observable,just 发送事件的上限 10,即最多发送 10 个事件,相较 create 在一定程度上简化了处理流程,just 重载的方法如下:

1public static <T> Observable<T> just(T item) 
2public static <T> Observable<T> just(T item1, T item2)
3public static <T> Observable<T> just(T item1, T item2, T item3)
4public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
5public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
6public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
7public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
8public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
9public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
10public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
11

下面是 just 操作符的简单使用:

1//just操作符的简单使用
2Observable.just("Event1""Event2""Event3")
3        .subscribe(new Observer<String>() {
4            @Override
5            public void onSubscribe(Disposable d) {
6                Log.i(TAG, "onSubscribe--->");
7            }
8
9            @Override
10            public void onNext(String s) {
11                Log.i(TAG, "onNext--->" + s);
12            }
13
14            @Override
15            public void onError(Throwable e) {
16                Log.i(TAG, "onError--->");
17            }
18
19            @Override
20            public void onComplete() {
21                Log.i(TAG, "onComplete--->");
22            }
23        });

上述代码的输出结果如下:

1onSubscribe--->
2onNext--->Event1
3onNext--->Event2
4onNext--->Event3
5onComplete--->

来看一下官方的关于 just 操作符的示意图,下面是 just 发送四个事件的示意图,具体如下:

create

from 操作符

使用 from 相关的操作符可以创建发送数组(array)、集合(Iterable) 以及异步任务(future)的 Observable,可将 from 相关的操作符分为如下几类:

1//数组
2public static <T> Observable<T> fromArray(T... items)
3//集合
4public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
5//异步任务
6public static <T> Observable<T> fromFuture(Future<? extends T> future)
7//异步任务+超时时间
8public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
9//异步任务+超时时间+线程调度器
10public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
11//异步任务+线程调度器
12public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
13//Reactive Streams中的发布者,使用方式类似create操作符,事件的发送由发布者(被观察者)自行决定
14public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
15

fromArray/fromIterable

下面是 fromArray 的使用方式,具体如下:

1//fromArray操作符的简单使用
2String[] events = {"Event1""Event2""Event3"};
3Observable.fromArray(events).subscribe(new Observer<String>() {
4    @Override
5    public void onSubscribe(Disposable d) {
6        Log.i(TAG, "onSubscribe--->");
7    }
8
9    @Override
10    public void onNext(String s) {
11        Log.i(TAG, "onNext--->" + s);
12    }
13
14    @Override
15    public void onError(Throwable e) {
16        Log.i(TAG, "onError--->");
17    }
18
19    @Override
20    public void onComplete() {
21        Log.i(TAG, "onComplete--->");
22    }
23});

看一下 fromArray 的官方示意图,具体如下:

fromArray

下面是 fromIterable 的使用方式,具体如下:

1//fromIterable操作符的简单使用
2List<String> list = new ArrayList<>();
3list.add("Event1");
4list.add("Event2");
5list.add("Event3");
6Observable.fromIterable(list).subscribe(new Observer<String>() {
7    @Override
8    public void onSubscribe(Disposable d) {
9        Log.i(TAG, "onSubscribe--->");
10    }
11
12    @Override
13    public void onNext(String s) {
14        Log.i(TAG, "onNext--->" + s);
15    }
16
17    @Override
18    public void onError(Throwable e) {
19        Log.i(TAG, "onError--->" + e);
20    }
21
22    @Override
23    public void onComplete() {
24        Log.i(TAG, "onComplete--->");
25    }
26});

看一下 fromIterable 的官方示意图,具体如下:

fromIterable

上述代码的输出参考如下:

1onSubscribe--->
2onNext--->Event1
3onNext--->Event2
4onNext--->Event3
5onComplete--->

fromCallable

Callable 位于 java.util.concurrent 包下,和 Runnable 类似,但是带有返回值,使用 fromCallable 发出的事件是从主线程发出的,如果不订阅则不会执行 call 里面的操作,使用 fromCallable 要注意以下几点:

  1. 涉及耗时任务要使用 subscribeOn 切换订阅线程;

  2. 执行耗时任务是接收 Observable 的发射值要使用 observeOn 切换到 Main 线程接收;

  3. 为避免内存泄漏等问题,在相应的onDestroy方法中取消订阅。

下面是 fromCallable 的简单使用,参考如下:

1//fromCallable操作符的简单使用
2Observable.fromCallable(new Callable<String>() {
3    @Override
4    public String call() throws Exception {
5        //其他操作...
6        return "call";
7    }
8}).subscribe(new Observer<String>() {
9
10    @Override
11    public void onSubscribe(Disposable d) {
12        Log.i(TAG, "onSubscribe--->");
13    }
14
15    @Override
16    public void onNext(String s) {
17        Log.i(TAG, "onNext--->" + s+Thread.currentThread());
18    }
19
20    @Override
21    public void onError(Throwable e) {
22        Log.i(TAG, "onError--->" + e);
23    }
24
25    @Override
26    public void onComplete() {
27        Log.i(TAG, "onComplete--->");
28    }
29});

上述到执行结果如下:

1onSubscribe--->
2onNext--->call
3onComplete--->

看一下 fromCallable 的官方示意图,具体如下:

fromCallable

fromFuture

从上面可知 fromFuture 有四个重载方法,参数中可以指定异步任务、任务超时时间、线程调度器等,先来认识一下 Future 接口,Future 接口位于 java.util.concurrent 包下,主要作用是对 Runnable 和 Callable 的异步任务执行进行任务是否执行的判断、任务结果的获取、具体任务的取消等,而 Runnable 和 Callable 伴随着线程的执行,这就意味着使用 fromFuture 发出的事件是从非 Main 线程发出,如果执行耗时任务要记得使用 subscribeOn 切换订阅线程,下面以 FutureTask 为例来说明 fromFuture 的使用方式。

创建一个 Callable 用来执行异步任务,参考如下:

1//异步任务
2private class MCallable implements Callable<String> {
3    @Override
4    public String call() throws Exception {
5        Log.i(TAG, "任务执行开始--->");
6        Thread.sleep(5000);
7        Log.i(TAG, "任务执行结束--->");
8        return "MCallable";
9    }
10}

然后,创建一个 FutureTask ,参考如下:

1//创建FutureTask
2MCallable mCallable = new MCallable();
3FutureTask<String> mFutureTask = new FutureTask<>(mCallable);

然后,使用 Thread 执行上面创建的 Future,参考如下:

1//执行FutureTask
2new Thread(mFutureTask).start();

最后,使用 fromFuture 创建与之对应的 Observeable 并订阅,参考如下:

1//fromFuture
2Observable.fromFuture(mFutureTask)
3        .subscribeOn(Schedulers.io()) //切换订阅线程
4        .subscribe(new Observer<String>() {
5            @Override
6            public void onSubscribe(Disposable d) {
7                Log.i(TAG, "onSubscribe--->");
8            }
9
10            @Override
11            public void onNext(String s) {
12                Log.i(TAG, "onNext--->" + s+Thread.currentThread());
13            }
14
15            @Override
16            public void onError(Throwable e) {
17                Log.i(TAG, "onError--->" + e);
18            }
19
20            @Override
21            public void onComplete() {
22                Log.i(TAG, "onComplete--->");
23            }
24        });

上述代码的只想结果如下:

1任务执行开始--->
2onSubscribe--->
3任务执行结束--->
4onNext--->MCallable
5onComplete--->

看一下 fromFuture 的官方示意图,下面的示意图是 fromFuture 方法携带一个参数 Future 的示意图,具体如下:

fromFuture

上面的异步任务延时 5 秒,如果使用 fromFuture 的重载方法指定超时时间为 4 秒,参考如下:

1//指定超时时间为4s
2Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io())
3//...

此时,由于异步任务不能在 4 秒内完成,Observer 会相应的被触发 onError 方法,执行结果参考如下:

1任务执行开始--->
2onSubscribe---> 
3onError--->java.util.concurrent.TimeoutException
4任务执行结束--->

那么如何取消这个异步任务呢,这也正是 Future 的优点所在,可以随意的取消这个任务,具体参考如下:

1//异步任务的取消
2public void cancelTask(View view) {
3    if (mFutureTask.isDone()) {
4        Log.i(TAG, "任务已经完成--->");
5    } else {
6        Log.i(TAG, "任务正在执行--->");
7        boolean cancel = mFutureTask.cancel(true);
8        Log.i(TAG, "任务取消是否成功--cancel->" + cancel);
9        Log.i(TAG, "任务取消是否成功--isCancelled->" + mFutureTask.isCancelled());
10    }
11}

下面是在任务执行过程中取消任务的执行结果,参考如下:

1任务执行开始--->
2onSubscribe--->
3任务正在执行--->
4任务取消是否成功--cancel->true
5任务取消是否成功--isCancelled->true
6onError--->java.util.concurrent.CancellationException

这样就取消了正在执行的异步任务,这部分内容更多的是关于 Java Future 相关的知识。

defer 操作符

使用 defer 创建 Observable 时,只有在订阅时去才会创建 Observable 并发送相关的事件,下面是 defer 操作符的使用,参考如下:

1//defer
2defer = "old";
3Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
4    @Override
5    public ObservableSource<String> call() throws Exception {
6        return Observable.just(defer);
7    }
8});
9
10defer = "new";
11observable.subscribe(new Observer<String>() {
12    @Override
13    public void onSubscribe(Disposable d) {
14        Log.i(TAG, "onSubscribe--->");
15    }
16
17    @Override
18    public void onNext(String s) {
19        Log.i(TAG, "onNext--->"+s);
20    }
21
22    @Override
23    public void onError(Throwable e) {
24        Log.i(TAG, "onError--->"+e);
25    }
26
27    @Override
28    public void onComplete() {
29        Log.i(TAG, "onComplete--->");
30    }
31});

上述代码的执行结果如下:

1onSubscribe--->
2onNext--->new
3onComplete--->

显然,最终在订阅之前 Observable 工厂又创建了最新的 Observable,onNext 中接收的数据也是最新的,为了理解 defer 操作符,来看一下官方 defer 操作符的示意图:

defer

empty 操作符

使用 empty 操作符可以创建一个不发生任何数据但正常终止的 Observable,参考如下:

1//empty
2Observable.empty().subscribe(new Observer<Object>() {
3    @Override
4    public void onSubscribe(Disposable d) {
5        Log.i(TAG, "onSubscribe--->");
6    }
7
8    @Override
9    public void onNext(Object o) {
10        Log.i(TAG, "onNext--->"+o);
11    }
12
13    @Override
14    public void onError(Throwable e) {
15        Log.i(TAG, "onError--->"+e);
16    }
17
18    @Override
19    public void onComplete() {
20        Log.i(TAG, "onComplete--->");
21    }
22});

上述代码的输出结果如下:

1onSubscribe--->
2onComplete--->

为了方便理解 empty 操作符的使用,来看一些 empty 操作符的官方示意图:

empty

never 操作符

使用 never 操作符可以创建一个不发生任何数据也不终止的 Observable,参考如下:

1//never
2Observable.never().subscribe(new Observer<Object>() {
3    @Override
4    public void onSubscribe(Disposable d) {
5        Log.i(TAG, "onSubscribe--->");
6    }
7
8    @Override
9    public void onNext(Object o) {
10        Log.i(TAG, "onNext--->"+o);
11    }
12
13    @Override
14    public void onError(Throwable e) {
15        Log.i(TAG, "onError--->"+e);
16    }
17
18    @Override
19    public void onComplete() {
20        Log.i(TAG, "onComplete--->");
21    }
22});

上述代码的输出结果如下:

1onSubscribe--->

为了方便理解 never 操作符的使用,来看一些 never 操作符的官方示意图:

                                  never

timer 操作符

timer 操作符可以创建一个带延时的发送固定数值 0 的 Observable,还可以指定线程调度器,timer 重载方法如下:

1//延时
2public static Observable<Long> timer(long delay, TimeUnit unit)
3//延时+线程调度器
4public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 
5

下面是 timer 的使用方式:

1//timer
2Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
3    @Override
4    public void onSubscribe(Disposable d) {
5        Log.i(TAG, "onSubscribe--->");
6    }
7
8    @Override
9    public void onNext(Long s) {
10        Log.i(TAG, "onNext--->"+s);
11        Log.i(TAG, "当前线程--->"+Thread.currentThread().getName());
12    }
13
14    @Override
15    public void onError(Throwable e) {
16        Log.i(TAG, "onError--->"+e);
17    }
18
19    @Override
20    public void onComplete() {
21        Log.i(TAG, "onComplete--->");
22    }
23});

上述代码的执行结果如下:

1onSubscribe--->
2//延时3秒收到数据
3onNext--->0
4当前线程--->RxCachedThreadScheduler-1
5onComplete--->

为了方便理解 timer 操作符的使用,来看一些 timer 操作符的官方示意图,下面以 timer 指定延时器和线程调度器的方式为例,具体如下:

timer

interval 操作符

使用 interval 操作符可以创建一个可以以固定时间间隔发送整数值的一个 Observable,interval 可以指定初始延时时间、时间间隔、线程调度器等,interval 重载方法如下:

1//初始延时+时间间隔
2public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) 
3//初始延时+时间间隔+线程调度器
4public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler
5scheduler)
6//时间间隔
7public static Observable<Long> interval(long period, TimeUnit unit)
8//时间间隔+线程调度器
9public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 
10

下面是 interval 的使用方式:

1//interval
2Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
3    @Override
4    public void onSubscribe(Disposable d) {
5        Log.i(TAG, "onSubscribe--->");
6    }
7
8    @Override
9    public void onNext(Long aLong) {
10        Log.i(TAG, "onNext--->"+aLong);
11    }
12
13    @Override
14    public void onError(Throwable e) {
15        Log.i(TAG, "onError--->"+e);
16    }
17
18    @Override
19    public void onComplete() {
20        Log.i(TAG, "onComplete--->");
21    }
22});

上述代码执行后就会以每个 3 秒持续发送值为整数的事件,执行结果如下:

1onSubscribe--->
2onNext--->0
3onNext--->1
4onNext--->2
5...

为了方便理解 interval 操作符的使用,来看一些 interval 操作符的官方示意图,下面以 interval 指定间隔时间和时间单位的方式为例,具体如下:

interval

range 操作符

使用 range 操作符可以创建一个可以发送指定整数范围值的一个 Observable,range 相关的方法有两个,只是数值的范围表示不同,两个方法声明如下:

1// int
2public static Observable<Integer> range(final int start, final int count)
3// long
4public static Observable<Long> rangeLong(long start, long count)
5

下面是 range 的使用方式,具体参考如下:

1//range
2Observable.range(1,5).subscribe(new Observer<Integer>() {
3    @Override
4    public void onSubscribe(Disposable d) {
5        Log.i(TAG, "onSubscribe--->");
6    }
7
8    @Override
9    public void onNext(Integer integer) {
10        Log.i(TAG, "onNext--->"+integer);
11    }
12
13    @Override
14    public void onError(Throwable e) {
15        Log.i(TAG, "onError--->"+e);
16    }
17
18    @Override
19    public void onComplete() {
20        Log.i(TAG, "onComplete--->");
21    }
22});

上述代码的执行结果如下:

1onSubscribe--->
2onNext--->1
3onNext--->2
4onNext--->3
5onNext--->4
6onNext--->5
7onComplete--->

为了方便理解 range 操作符的使用,来看一些 range 操作符的官方示意图:

                                    range

总结

这篇文章主要介绍了 RxJava2 相关基础知识以及 RxJava2 中创建型操作符的理解和使用,可以添加我的微信jzmanu互相交流,也可以识别下面二维码添加我的Android微信交流群:

推荐阅读:

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存