查看原文
其他

RxJava2系列之转换型操作符

jzman 躬行之 2022-08-26

上篇文章RxJava系列之创建型操作符介绍了创建型操作符的使用,今天来看一下 RxJava 中转换型操作符的使用,再放一次微信交流群二维码:

常用的转换型操作符如下:

  1. buffer 操作符

  2. window操作符

  3. map操作符

  4. groupBy操作符

  5. cast操作符

  6. scan操作符

  7. To操作符

buffer 操作符

buffer 操作符重载方法比较多,这里选取典型的几个来说明 buffer 操作符的使用,buffer 操作符的使用可以分为如下三类,具体如下:

1//第一类
2public final Observable<List<T>> buffer(int count) 
3public final Observable<List<T>> buffer(int count, int skip) 
4//第二类
5public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
6public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit) 
7//第三类
8public final <B> Observable<List<T>> buffer(ObservableSource<B> boundary)
9public final <TOpening, TClosing> Observable<List<T>> buffer(
10            ObservableSource<? extends TOpening> openingIndicator,
11            Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)

buffer(int count)

buffer 操作符将一个 Observable 转换为一个 Observable,这个 Observable 用于收集原来发送的数据,然后发送这些缓存的数据集合,buffer 将发送的单个事件转换成元素集合,下面是针对此种情况的官方示意图:

buffer(int count)

如下面的事件的发送过程,如果不设置 buffer 则需要发送四次,如果使用如下 buffer 进行转换,则只需发送两次,测试代码如下:

1count = 0;
2Observable.just("Event1""Event2""Event3""Event4")
3        .buffer(2)
4        .subscribe(new Consumer<List<String>>() {
5            @Override
6            public void accept(List<String> strings) throws Exception {
7                count++;
8                Log.i(TAG, "第" + count + "次接收...");
9                Log.i(TAG, "accept--->" + strings.size());
10                Log.i(TAG, "接收的数据...");
11                for (String str : strings) {
12                    Log.i(TAG, "accept--->" + strings.size() + "---" + str);
13                }
14            }
15        });

上述代码的执行结果如下:

11次接收...
2accept--->2
3接收的数据...
4accept--->2---Event1
5accept--->2---Event2
62次接收...
7accept--->2
8接收的数据...
9accept--->2---Event3
10accept--->2---Event4

buffer(int count, int skip)

相较 buffer(int count), skip 可以指定下一次由源 Observable 转换的 Observable 收集事件的位置,如果 count 等于 skip,则 buffer(int count,int skip) 等价于 buffer(int count),官方示意图如下:

buffer(int count, int skip)

如下面的事件发送过程,相当于每 3 个事件一组进行发送,但每次收集数据的位置参数 skip 为 2,则每次收集的数据中会有数据重复,测试代码如下:

1count = 0;
2Observable.just("Event1""Event2""Event3""Event4""Event5")
3        .buffer(32)
4        .subscribe(new Consumer<List<String>>() {
5            @Override
6            public void accept(List<String> strings) throws Exception {
7                count++;
8                Log.i(TAG, "第" + count + "次接收...");
9                Log.i(TAG, "accept--->" + strings.size());
10                Log.i(TAG, "接收的数据...");
11                for (String str : strings) {
12                    Log.i(TAG, "accept--->" + strings.size() + "---" + str);
13                }
14            }
15        });

上述代码的执行结果如下:

11次接收...
2accept--->3
3接收的数据...
4accept--->3---Event1
5accept--->3---Event2
6accept--->3---Event3
72次接收...
8accept--->3
9接收的数据...
10accept--->3---Event3
11accept--->3---Event4
12accept--->3---Event5
133次接收...
14accept--->1
15接收的数据...
16accept--->1---Event5

buffer(long timespan, TimeUnit unit)

buffer 操作符会将一个 Observable 转换为一个新的 Observable,timespan 决定新的的 Observsable 在发出缓存的数据的时间间隔,官方示意图如下:

buffer(long timespan, TimeUnit unit)

如下面的事件发送过程,源 Observable 每隔 2 秒发送事件,而 buffer 新生成的 Obsrevable 则以每隔 1 秒的间隔发送缓存的事件集合,当然,这样会在间隔的时间段收集不到数据导致丢失数据,测试代码如下:

1Observable.intervalRange(1,8,0,2, TimeUnit.SECONDS)
2        .buffer(1,TimeUnit.SECONDS)
3        .subscribe(new Consumer<List<Long>>() {
4            @Override
5            public void accept(List<Long> longs) throws Exception {
6                Log.i(TAG, "accept--->" + String.valueOf(longs));
7            }
8        });

上述代码的执行结果如下:

1accept--->[1]
2accept--->[]
3accept--->[2]
4accept--->[]
5accept--->[3]
6accept--->[]
7accept--->[4]
8accept--->[]
9accept--->[5]

buffer(long timespan, long timeskip, TimeUnit unit)

buffer 操作符会将一个 Observable 转换为一个 Observable,timeskip 决定让新生成的 Observable 定期启动一个新的缓冲区,然后新的 Observable 会发出在 timespan 时间间隔内收集的事件集合,官方示意图如下:

buffer(long timespan, long timeskip, TimeUnit unit)

如下面的事件发送过程,源 Observable 会每隔 1 秒发送 1 到 12 的整数,buffer 新生成的 Observable 会每隔 5 秒接收源 Observable 发送的事件,测试代码如下:

1Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
2        .buffer(1,5, TimeUnit.SECONDS)
3        .subscribe(new Consumer<List<Long>>() {
4            @Override
5            public void accept(List<Long> longs) throws Exception {
6                Log.i(TAG, "accept--->" + String.valueOf(longs));
7            }
8        });

上述代码的执行结果如下:

1accept--->[1]
2accept--->[6]
3accept--->[11]

buffer(ObservableSource boundar)

buffer(boundary) 会监视一个名叫 boundary 的 Observable,每当这个 Observable 发射了一个事件,它就创建一个新的 List 开始收集来自原始 Observable 的发送的事件并发送收集到的数据,官方示意图如下:

buffer(ObservableSource boundary)

如下面事件的发送过程,收集到的原事件会因为时间间隔的不同最终发送的收集到的事件的个数也不同,测试代码如下:

1Observable.intervalRange(1,10,0,2, TimeUnit.SECONDS)
2        .buffer(Observable.interval(3, TimeUnit.SECONDS))
3        .subscribe(new Consumer<List<Long>>() {
4            @Override
5            public void accept(List<Long> longs) throws Exception {
6                Log.i(TAG, "accept--->" + String.valueOf(longs));
7            }
8        });

上述代码的执行结果如下:

1accept--->[1, 2]
2accept--->[3]
3accept--->[4, 5]
4accept--->[6]
5accept--->[7, 8]
6accept--->[9]
7accept--->[10]

buffer(openingIndicator, closingIndicator)

buffer(openingIndicator, closingIndicator)会监视一个名叫 openingIndicator 的 Observable,这个 Observable 每发射一个事件,它就创建一个 List 收集原始 Observable 发送的数据,并将收集的数据给 closingIndicator,closingIndicator 会返回一个 Observable,这个 buffer 会监视 closingIndicator 返回的Observable,检测到这个 Observable 的数据时,就会关闭这个 List 发射刚才从 openingIndicator 获得数据,也就是名为 openingIndicator 的 Observable 收集的数据,下面是针对此种情况的官方示意图:

buffer(openingIndicator, closingIndicator)

如下面时间发送过程,原始的 Observable 以每个 1 秒的间隔发送 1 到 12 之间的整数,名为 openingIndicator 的 Observable 会每隔 3 秒创建一个 List 手机发送的事件,然后将收集的数据给 closingIndicator,closingIndicator 会延时 1 秒发送从名为 openingIndicator 的 Observable 拿到的数据,下面是测试代码:

1Observable openingIndicator = Observable.interval(3, TimeUnit.SECONDS);
2Observable closingIndicator = Observable.timer(1,TimeUnit.SECONDS);
3Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
4        .buffer(openingIndicator, new Function<Object, ObservableSource<?>>() {
5            @Override
6            public ObservableSource<?> apply(Object o) throws Exception {
7                return closingIndicator;
8            }
9        })
10        .subscribe(new Consumer<List<Long>>() {
11            @Override
12            public void accept(List<Long> longs) throws Exception {
13                Log.i(TAG, "accept--->" + String.valueOf(longs));
14            }
15        });

上述代码的执行结果如下:

1accept--->[4, 5]
2accept--->[7]
3accept--->[10]

window操作符

这里就以 window(long count) 为例来介绍 window 操作符的使用,window 操作符的使用和 buffer 使用类似,不同之处是经 buffer 转换成的 Observable 发送的时源 Observable 发送事件的事件集合,而经 window 操作符转换成的 Observable 会依次发送 count 个源 Observable 发送的事件,该操作符官方示意图如下:

window(long count)

测试代码如下:

1Observable.just("Event1""Event2""Event3""Event4")
2        .window(2)
3        .subscribe(new Consumer<Observable<String>>() {
4            @Override
5            public void accept(Observable<String> stringObservable) throws Exception {
6                Log.i(TAG, "accept--Observable->");
7                stringObservable.subscribe(new Consumer<String>() {
8                    @Override
9                    public void accept(String s) throws Exception {
10                        Log.i(TAG, "accept--->" + s);
11                    }
12                });
13            }
14        });

上述代码的执行结果如下:

1accept--Observable->
2accept--->Event1
3accept--->Event2
4accept--Observable->
5accept--->Event3
6accept--->Event4

map操作符

map(mapper)

map 操作符可对发送的数据进行相应的类型转化,map 操作的官方示意图如下:

map(mapper)

如下面的事件发送过程,经过 map 操作符转换,可对源 Observable 发送的事件进行进一步的加工和转换,测试代码如下:

1Observable.just("Event1""Event2""Event3""Event4")
2        .map(new Function<String, String>() {
3            @Override
4            public String apply(String s) throws Exception {
5                return "this is " + s;
6            }
7        }).subscribe(new Consumer<String>() {
8    @Override
9    public void accept(String s) throws Exception {
10        Log.i(TAG, "accept--->" + s);
11    }
12});

上述代码的执行结果如下:

1accept--->this is Event1
2accept--->this is Event2
3accept--->this is Event3
4accept--->this is Event4

flatMap(mapper)

flatMap 操作符使用时,当源 Observable 发出事件会相应的转换为可以发送多个事件的 Observable,这些 Observable 最终汇入同一个 Observable,然后这个 Observable 将这些事件统一发送出去,这里决定不再想上文中一样,每个重载方法都进行说明,这里已常用的 flatMap(mapper) 为例,其官方示意图如下:

                            flatMap(mapper)

如下面的事件发送过程,使用了 flatMap 操作符之后,源 Observable 发送事件时,相应的生成对应的 Observable,最终发送的事件都汇入同一个 Observable,然后将事件结果回调给观察者,测试代码如下:

1final Observable observable = Observable.just("Event5""Event6");
2Observable.just("Event1""Event2""Event3""Event4")
3        .flatMap(new Function<String, Observable<String>>() {
4            @Override
5            public Observable<String> apply(String s) throws Exception {
6                return observable;
7            }
8        }).subscribe(new Consumer<String>() {
9    @Override
10    public void accept(String s) throws Exception {
11        Log.i(TAG, "accept--->" + s);
12    }
13});

上述代码的执行结果如下:

1accept--->Event5
2accept--->Event6
3accept--->Event5
4accept--->Event6
5accept--->Event5
6accept--->Event6
7accept--->Event5
8accept--->Event6

concatMap(mapper)

concatMap 的使用与 flatMap 的使用大致类似,相较flatMap能够保证事件接收的顺序,而 flatMap 不能保证事件接收的顺序,concatMap 操作符的官方示意图如下:

concatMap(mapper)

如下面的事件发送过程,我们在源 Observable 发送整数 1 时延时 3 秒,然后继续发送其他事件,下面是测试代码:

1Observable.intervalRange(1201, TimeUnit.SECONDS)
2        .concatMap(new Function<Long, ObservableSource<Long>>() {
3            @Override
4            public ObservableSource<Long> apply(Long aLong) throws Exception {
5                int delay = 0;
6                if (aLong == 1) {
7                    delay = 3;
8                }
9                return Observable.intervalRange(44, delay, 1, TimeUnit.SECONDS);
10            }
11        }).subscribe(new Consumer<Long>() {
12    @Override
13    public void accept(Long aLong) throws Exception {
14        Log.i(TAG, "accept--->" + aLong);
15    }
16});

使用 concatMap 操作符上述代码的执行结果如下:

1accept--->4
2accept--->5
3accept--->6
4accept--->7
5accept--->4
6accept--->5
7accept--->6
8accept--->7

使用 flatMap 操作符上述代码的执行结果如下:

1accept--->4
2accept--->5
3accept--->6
4accept--->4
5accept--->7
6accept--->5
7accept--->6
8accept--->7

可见,concatMap 相较 flatMap 能够保证事件接收的顺序。

switchMap(mapper)

当源 Observable 发送事件时会相应的转换为可以发送多个事件的 Observable,switchMap 操作符只关心当前这个 Observable,也就是说,源 Observable 每当发送一个新的事件时,就会丢弃前面一个发送多个事件的 Observable,官方示意图如下:

switchMap(mapper)

如下面的事件发送过程,源 Observable 每个 2 秒发送 1 和 2,转换成的可以发送多个事件的 Observable 每个 1 秒发送从 4 开始的整数,使用 switchMap 操作符时,源 Observable 发送一个整数 1 时,这个新的可以发送多个事件的 Observable 只发送两个整数,也就是 4 和 5 之后就停止发送了,因为此时源 Observable 又开始发送事件了,此时会丢弃前一个可发送多个时间的 Observable,开始下一次源 Observable 发送事件的监听,测试代码如下:

1Observable.intervalRange(1202, TimeUnit.SECONDS)
2        .switchMap(new Function<Long, ObservableSource<Long>>() {
3            @Override
4            public ObservableSource<Long> apply(Long aLong) throws Exception {
5                Log.i(TAG, "accept-aLong-->" + aLong);
6                return Observable.intervalRange(4401, TimeUnit.SECONDS);
7            }
8        }).subscribe(new Consumer<Long>() {
9    @Override
10    public void accept(Long aLong) throws Exception {
11        Log.i(TAG, "accept--->" + aLong);
12    }
13});

上述代码执行结果如下:

1accept-aLong-->1
2accept--->4
3accept--->5
4accept-aLong-->2
5accept--->4
6accept--->5
7accept--->6
8accept--->7

此外,还有与之相关的操作符:concatMapDelayError、concatMapEager、concatMapEagerDelayError concatMapIterable、flatMapIterable 、switchMapDelayError,都是上述操作符的扩展,这里就不在介绍了。

groupBy操作符

groupBy 操作符会对接收的数据按照指定的规则进行分类,然后再被 GroupedObservable 等订阅输出,官方示意图如下:

groupBy

如下面的事件发送过程,我们会按照成绩进行分组输出,具体如下:

1List<DataBean> beanList = new ArrayList<>();
2beanList.add(new DataBean("成绩是95分"95));
3beanList.add(new DataBean("成绩是70分"70));
4beanList.add(new DataBean("成绩是56分"56));
5beanList.add(new DataBean("成绩是69分"69));
6beanList.add(new DataBean("成绩是90分"90));
7beanList.add(new DataBean("成绩是46分"46));
8beanList.add(new DataBean("成绩是85分"85));
9
10Observable.fromIterable(beanList)
11        .groupBy(new Function<DataBean, String>() {
12            @Override
13            public String apply(DataBean dataBean) throws Exception {
14                int score = dataBean.getScore();
15                if (score >= 80) {
16                    return "A";
17                }
18
19                if (score >= 60 && score < 80) {
20                    return "B";
21                }
22
23                if (score < 60) {
24                    return "C";
25                }
26                return null;
27            }
28        })
29        .subscribe(new Consumer<GroupedObservable<String, DataBean>>() {
30            @Override
31            public void accept(final GroupedObservable<String, DataBean> groupedObservable) throws Exception {
32                groupedObservable.subscribe(new Consumer<DataBean>() {
33                    @Override
34                    public void accept(DataBean dataBean) throws Exception {
35                        Log.i(TAG, "accept--->"+ groupedObservable.getKey() + "组--->"+dataBean.getDesc());
36                    }
37                });
38            }
39        });

上述代码的执行结果如下:

1accept--->A组--->成绩是95分 
2accept--->B组--->成绩是70分
3accept--->C组--->成绩是56分
4accept--->B组--->成绩是69分
5accept--->A组--->成绩是90分
6accept--->C组--->成绩是46分
7accept--->A组--->成绩是85分

cast操作符

cast 操作符用于类型转化,cast 操作符官方示意图如下:

cast

测试代码如下:

1Observable.just(1,2,3,4,5)
2        .cast(String.class)
3        .subscribe(new Consumer<String>() {
4            @Override
5            public void accept(String String) throws Exception {
6                Log.i(TAG, "accept--->" + String);
7            }
8        });

测试会出现如下异常:

1java.lang.ClassCastExceptionCannot cast java.lang.Integer to java.lang.String

从结果可知,发现不同类型之间转化会出现类型转化异常,cast 操作符并不能进行不同类型之间的转化,但是可以使用 cast 操作来校验发送的事件数据类型是不是指定的类型。

scan操作符

scan 操作符会依次扫描每两个元素,第一个元素没有上一个元素,则第一个元素的上一个元素会被忽略,当扫描第二个元素时,会获取到第一个元素,之后 apply 方法的返回值会作为上一个元素的值参与计算,最终返回转化后的结果,scan 官方示意图如下:

scan(accumulator)

看一下下面的事件发送过程,第一次扫描时,第一个元素是 1,这里相当于 last,第二个元素是 2 ,这里相当于 item,此时 apply 方法返回的结果是 2,这个 2 会作为 last 的值参与下一次扫描计算,则下一次返回的值肯定是 2 * 3,也就是 6,测试代码如下:

1Observable.just(12345)
2        .scan(new BiFunction<Integer, Integer, Integer>() {
3            @Override
4            public Integer apply(Integer last, Integer item) throws Exception {
5                Log.i(TAG, "accept--last->" + last);
6                Log.i(TAG, "accept--item->" + item);
7                return last * item;
8            }
9        })
10        .subscribe(new Consumer<Integer>() {
11            @Override
12            public void accept(Integer integer) throws Exception {
13                Log.i(TAG, "accept--->" + integer);
14            }
15        });

上述代码的执行结果如下:

1accept--->1
2accept--last->1
3accept--item->2
4accept--->2
5accept--last->2
6accept--item->3
7accept--->6
8accept--last->6
9accept--item->4
10accept--->24
11accept--last->24
12accept--item->5
13accept--->120

To操作符

toList()

toList 操作符会将发送的一系列数据转换成 List,然后一次性发送出去,toList 的官方示意图如下:

toList()

测试代码如下:

1Observable.just(1234)
2        .toList()
3        .subscribe(new Consumer<List<Integer>>() {
4            @Override
5            public void accept(List<Integer> integers) throws Exception {
6                Log.i(TAG, "accept--->" + integers);
7            }
8        });

上述代码的执行结果如下:

1accept--->[1, 2, 3, 4]

toMap(keySelector)

toMap操作符会将要发送的事件按照指定的规则转化为 Map 形式,然后一次性发送出去,toMap 操作符官方示意图如下:

toMap(keySelector)

测试代码如下:

1Observable.just(1234)
2        .toMap(new Function<Integer, String>() {
3            @Override
4            public String apply(Integer integer) throws Exception {
5                return "key"+integer;
6            }
7        })
8        .subscribe(new Consumer<Map<String, Integer>>() {
9            @Override
10            public void accept(Map<String, Integer> map) throws Exception {
11                Log.i(TAG, "accept--->" + map);
12            }
13        });

上述代码的执行结果如下:

1accept--->{key2=2, key4=4, key1=1, key3=3}


RxJava 中转换型操作符基本如上,具体使用还是要结合在实际需求中,可以添加微信 jzmanu一起互相交流且拉你进微信交流群。


推荐阅读:

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存