查看原文
其他

关于RxJava2.0你不知道的事(一)

2017-02-13 一叶飘舟 开发者技术前线

开始

2017年的首篇文章,本次依旧带来一叶飘舟的开年之作,新的一年祝大家事业有成,爱情美满!

Rxjava 已经于2016年11月12日正式发布了2.0.1版本。

RxJava 2.0 已经按照Reactive-Streams specification规范完全的重写了。RxJava2.0 已经独立于RxJava 1.x而存在。

RxJava2.0相比RxJava1.x,它的改动还是很大的,下面我将带大家了解这些改动。

RxJava2.0与1.x的区别

Maven地址

为了让 RxJava 1.x 和 RxJava 2.x 相互独立,我们把RxJava 2.x 被放在了maven io.reactivex.rxjava2:rxjava:2.x.y 下,类放在了 io.reactivex 包下用户从 1.x 切换到 2.x 时需要导入的相应的包,但注意不要把1.x和2.x混淆了。

接口变化

RxJava2.0 是遵循 Reactive Streams Specification 的规范完成的,新的特性依赖其提供的4个基础接口。分别是:

  • Publisher

  • Subscriber

  • Subscription

  • Processor

在后边的介绍中我们会涉及到。

Javadoc文档

官方2.0的 Java 文档 

添加依赖

端使用RxJava需要依赖新的包名:

//RxJava的依赖包 compile 'io.reactivex.rxjava2:rxjava:2.0.3' //RxAndroid的依赖包 compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

Nulls

RxJava1.x中,支持 null 值,如下代码所示:

Observable.just(null); Single.just(null);

RxJava 2.0不再支持 null 值,如果传入一个null会抛出 NullPointerException

Observable and Flowable

在本节开始之前,我们先了解下RxJava背压(Backpressure)机制的问题。

什么是背压(Backpressure)

在RxJava中,可以通过对Observable连续调用多个Operator组成一个调用链,其中数据从上游向下游传递。当上游发送数据的速度大于下游处理数据的速度时,就需要进行Flow Control了。如果不进行Flow Control,就会抛出MissingBackpressureException异常。

这就像小学做的那道数学题:一个水池,有一个进水管和一个出水管。如果进水管水流更大,过一段时间水池就会满(溢出)。这就是没有Flow Control导致的结果。

再举个例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException。

如果你想了解更多关于背压的知识,请参考:

 

下面我们通过一段代码来“感受”一下背压。

Observable.interval(1, TimeUnit.MILLISECONDS)      
//将观察者的工作放在新线程环境中       .observeOn(Schedulers.newThread())      
//观察者处理每1000ms才处理一个事件       .subscribe(new Subscriber<Long>() {           @Override          
          public void onCompleted() {           }           @Override          
          public void onError(Throwable e) {                       }           @Override          
          public void onNext(Long value) {            
             try {                   Thread.sleep(1000);               } catch (InterruptedException e) {                   e.printStackTrace();               }           }       });


Flow Control有哪些思路呢?大概是有四种:

  1. 背压(Backpressure);

  2. 节流(Throttling);

  3. 打包处理;

  4. 调用栈阻塞(Callstack blocking)。

这里限于篇幅的问题,我们就不再一一介绍了,请移步:

如何让Observable支持Backpressure?

在RxJava 1.x中,有些Observable是支持Backpressure的,而有些不支持。但不支持Backpressure的Observable可以通过一些operator来转化成支持Backpressure的Observable。这些operator包括:

  • onBackpressureBuffer

  • onBackpressureDrop

  • onBackpressureLatest

  • onBackpressureBlock(已过期)

它们转化成的Observable分别具有不同的Backpressure策略。

而在RxJava2.0 中,Observable 不再支持背压,而是改用Flowable 支持非阻塞式的背压。Flowable是RxJava2.0中专门用于应对背压(Backpressure)问题而新增的(抽象)类。其中,Flowable默认队列大小为128。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。

上面提到的四种operator的前三种分别对应Flowable的三种Backpressure策略:

  • BackpressureStrategy.BUFFER

  • BackpressureStrategy.DROP

  • BackpressureStrategy.LATEST

onBackpressureBuffer是不丢弃数据的处理方式。把上游收到的全部缓存下来,等下游来请求再发给下游。相当于一个水库。但上游太快,水库(buffer)就会溢出。

onBackpressureDrop和onBackpressureLatest比较类似,都会丢弃数据。这两种策略相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。但这两种策略在令牌数为0的时候有一点微妙的区别:onBackpressureDrop直接丢弃数据,不缓存任何数据;而onBackpressureLatest则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。可以结合下面两幅图来理解。

onBackpressureBlock是看下游有没有需求,有需求就发给下游,下游没有需求,不丢弃,但试图堵住上游的入口(能不能真堵得住还得看上游的情况了),自己并不缓存。这种策略已经废弃不用。

注意:在RxJava2.0中,旧的Observable也保留了,你还可以像以前那样使用,同时要注意接口的变化。

需要说明的是,RxJava2.0中,Flowable是对Observable的补充(而不是替代),也可以这么说,Flowable是能够支持Backpressure的Observable。

何时用Observable

  1. 当上游在一段时间发送的数据量不大(以1000为界限)的时候优先选择使用Observable;

  2. 在处理GUI相关的事件,比如鼠标移动或触摸事件,这种情况下很少会出现backpressured的问题,用Observable就足以满足需求;

  3. 获取数据操作是同步的,但你的平台不支持Java流或者相关特性。使用Observable的开销低于Flowable。

何时用Flowable

  1. 当上游在一段时间发送的数据量过大的时候(这个量我们往往无法预计),此时就要使用Flowable以限制它所产生的量的元素10K +处理。

  2. 当你从本地磁盘某个文件或者数据库读取数据时(这个数据量往往也很大),应当使用Flowable,这样下游可以根据需求自己控制一次读取多少数据;

  3. 以读取数据为主且有阻塞线程的可能时用Flowable,下游可以根据某种条件自己主动读取数据。

Single、Completable

Single 与 Completable 都基于新的 Reactive Streams 的思想重新设计了接口,主要是消费者的接口, 现在他们是这样的:

interface SingleObserver<T> {      void onSubscribe(Disposable d);  
   void onSuccess(T value);    
   void onError(Throwable error); }
   interface CompletableObserver<T> {      void onSubscribe(Disposable d);  
   void onComplete();    
   void onError(Throwable error); }


Subscriber

对比一下 Subscriber :

public interface Subscriber<T> {      public void onSubscribe(Subscription s);    
   public void onNext(T t);    
   public void onError(Throwable t);    
   public void onComplete(); }


我们会发现和以前不一样的是多了一个 onSubscribe 的方法, Subscription 如下:

Subscription

public interface Subscription {      public void request(long n);    
   public void cancel(); }


熟悉 RxJava 1.x 的朋友能发现, 新的 Subscription 更像是综合了旧的 Producer 与 Subscription 的综合体。他既可以向上游请求(request)数据,又可以打断并释放(cancel)资源。而旧的 Subscription 在这里因为名字被占,而被重新命名成了 Disposable。

注意:Subscription 不再有订阅subcribe和unSubcribe的概念。

Disposable

public interface Disposable {      void dispose();    boolean isDisposed(); }


这里最大的不同就是这个 onSubscribe ,根据 Specification, 这个函数一定是第一个被调用的, 然后就会传给调用方一个 Subscription ,通过这种方式组织新的背压关系。当我们消费数据时,可以通过 Subscription 对象,自己决定请求数据。

这里就可以解释上面的非阻塞的背压。旧的阻塞式的背压,就是根据下游的消费速度,中游可以选择阻塞住等待下游的消费,随后向上游请求数据。而新的非阻塞就不在有中间阻塞的过程,由下游自己决定取多少,还有背压策略,如抛弃最新、抛弃最旧、缓存、抛异常等。

而新的接口带来的新的调用方式与旧的也不太一样, subscribe 后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容, Flowable 提供了 subscribeWith方法 返回当前的 Subscriber 对象, 并且同时提供了 DefaultSubscriber , ResourceSubscriber , DisposableSubscriber ,让他们提供了 Disposable 接口,并且可以从外面取消 dispose()。 现在也可以完成和以前类似的代码:

ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {      @Override    public void onStart() {        request(Long.MAX_VALUE);    }    @Override    public void onNext(Integer t) {        System.out.println(t);    }    @Override    public void onError(Throwable t) {        t.printStackTrace();    }    @Override    public void onComplete() {        System.out.println("Done");    } }; Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber); subscriber.dispose();


注意,由于Reactive-Streams的兼容性,方法onCompleted被重命名为onComplete。另外注意dispose()方法,这个方法允许你释放资源。

RxJava2.x中提供了几个Subcriber对象,如下所示:

  • DefaultSubscriber:通过实现Subscriber接口,可以通过调用request(long n)方法请求或者cancel()方法取消订阅(同步请求)

public abstract class DefaultSubscriber<T> implements Subscriber<T>


  • DisposableSubscriber:通过实现Desposable异步删除。

public abstract class DisposableSubscriber<T> implements Subscriber<T>, Disposable

  • ResourceSubscriber:允许异步取消其订阅相关资源,节省内存而且是线程安全。

public abstract class ResourceSubscriber<T> implements Subscriber<T>, Disposable


  • SafeSubscriber:包装另一个订阅者,并确保所有onXXX方法遵守协议(序列化要求访问除外)。

public final class SafeSubscriber<T> implements Subscriber<T>, Subscription


  • SerializedSubscriber:序列化访问另一个订阅者的onNext,onError和onComplete方法。

public final class SerializedSubscriber<T> implements Subscriber<T>, Subscription


在onSubscribe/onStart中调用request

注意,在Subscriber.onSubscribe或ResourceSubscriber.onStart中调用request(n)将会立即调用onNext,实例代码如下:

Flowable.range(1, 3).subscribe(new Subscriber<Integer>() {    @Override    public void onSubscribe(Subscription s) {        System.out.println("OnSubscribe start");        s.request(Long.MAX_VALUE);        System.out.println("OnSubscribe end");    }    @Override    public void onNext(Integer v) {        System.out.println(v);    }    @Override    public void onError(Throwable e) {        e.printStackTrace();    }    @Override    public void onComplete() {        System.out.println("Done");    } });


输出结果如下:

OnSubscribe start123Done OnSubscribe end


当你在onSubscribe/onStart中做了一些初始化的工作,而这些工作是在request后面时,会出现一些问题,在onNext执行时,你的初始化工作的那部分代码还没有执行。为了避免这种情况,请确保你调用request时,已经把所有初始化工作做完了。

这个行为不同于1.x中的 request要经过延迟的逻辑直到上游的Producer到达时。在2.0中,总是Subscription先传递下来,90%的情况下没有延迟请求的必要。

Subscription

在RxJava 1.x中,接口rx.Subscription负责流和资源的生命周期管理,即退订和释放资源,例如scheduled tasks。Reactive-Streams规范用这个名称指定source和consumer之间的关系: org.reactivestreams.Subscription 允许从上游请求一个正数,并支持取消。

为了避免名字冲突,1.x的rx.Subscription被改成了 io.reactivex.Disposable。

因为Reactive-Streams的基础接口org.reactivestreams.Publisher 定义subscribe()为无返回值,Flowable.subscribe(Subscriber)不再返回任何Subscription。其他的基础类型也遵循这种规律。 
在2.x中其他的subscribe的重载方法返回Disposable。

原始的Subscription容器类型已经被重命名和修改。

  • CompositeSubscription 改成 CompositeDisposable,

  • SerialSubscription 和MultipleAssignmentSubscription 被合并到了 SerialDisposable。 set() 方法取消了旧值,而replace()方法没有。

  • RefCountSubscription 已被删除。

收回 create 方法权限

在RxJava 1.x 最明显的问题就是由于 create 的太过开放,导致其被开发者滥用,而不是学习使用提供的操作符。并且用户对 RxJava 不够了解,导致各种各样的问题,如背压、异常处理等。

由于规范要求所有的操作符强制支持背压,因此新的 create 采用了保守的设计,让用户实现 FlowableOnSubscribe 接口,并选取背压策略,然后在内部实现封装支持背压,简单的例子如下:

Flowable.create(new FlowableOnSubscribe<Integer>() {  
 @Override  public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {      emitter.onNext(1);      emitter.onNext(2);      emitter.onComplete();    } },
BackpressureStrategy.BUFFER);

Functions可以抛出异常

不同于RxJava1.x,RxJava2.x中没有了一系列的Action/Func接口,取而代之的是与Java8命名类似的函数式接口,如下图:

而Consumer即消费者,用于接收单个值,BiConsumer则是接收两个值,Function用于变换对象,Predicate用于判断。这些接口命名大多参照了Java8,熟悉Java8新特性的应该都知道意思,这里也就不再赘述了。

public interface Consumer<T> {    
  void accept(T t) throws Exception; }


新的ActionX、FunctionX的方法声明都增加了一个throws Exception,这带来了显而易见的好处,现在我们可以这样写:

Flowable.just("qq.txt")    
.map(new Function<String, Integer>() {    @Override        
   public Integer apply(String value) throws Exception {            File file = new File(value);            file.createNewFile();          
    return 99;        }    });


而createNewFile方法显式的抛出了一个IOException,而在以前是不可以这样写的。

Schedulers

在2.0的API中仍然支持主要的默认scheduler: computation, io, newThread 和 trampoline,可以通过io.reactivex.schedulers.Schedulers这个实用的工具类来调度。

2.0中不存在immediate 调度器。 它被频繁的误用,并没有正常的实现 Scheduler 规范;它包含用于延迟动作的阻塞睡眠,并且不支持递归调度。你可以使用Schedulers.trampoline()来代替它。

Schedulers.test()已经被移除,这样避免了默认调度器休息的概念差异。那些返回一个”global”的调度器实例是鉴于test()总是返回一个新的TestScheduler实例。现在我们鼓励测试人员使用这样简单的代码new TestScheduler()。

io.reactivex.Scheduler抽象类现在支持直接调度任务,不需要先创建然后通过Worker调度。

操作符的差别

2.0中大部分操作符仍然被保留,实际上大部分行为和1.x一样。

关于操作符,引用JakeWharton的总结就是:

All the same operators(you konw and love or hate and despise) are still there.

Transformer

RxJava 1.x 中Transformer实际上就是Func1<Observable,Observable>,换句话说就是提供给他一个Observable它会返回给你另一个Observable,这和内联一系列操作符有着同等功效。

相关API如下:

public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {   // cover for generics insanity

}
public interface Func1<T, R> extends Function {    R call(T t); }


实际操作下,写个方法,创建一个Transformer调度器:

//子线程运行,主线程回调
public Observable.Transformer<T, T> io_main(final RxAppCompatActivity context) {        
 return new Observable.Transformer<T, T>() {            
         @Override          public Observable<T> call(Observable<T> tObservable) {              Observable<T> observable = (Observable<T>) tObservable                       .subscribeOn(Schedulers.io())                       .doOnSubscribe(new Ation0() {                            
                          @Override                           public void call() {                                DialogHelper.showProgressDlg(context, mMessage);                            }                        })                       .subscribeOn(AndroidSchedulers.mainThread())                       .observeOn(AndroidSchedulers.mainThread())                       .compose(RxLifecycle.bindUntilEvent(context.lifecycle(), ActivityEvent.STOP));                
                          return observable;            }        };    
}


上面这个方法出自本人的,用法和源码详见:

在实际应用中,Transformer 经常和 Observable.compose() 一起使用。本人的也有使用,这里就不多介绍了。

在RxJava2.0中,Transformer划分的更加细致了,每一种“Observable”都对应的有自己的Transformer,相关API如下所示:

public interface ObservableTransformer<Upstream, Downstream> {    ObservableSource<Downstream> apply(Observable<Upstream> upstream); }
public interface CompletableTransformer {    CompletableSource apply(Completable upstream); }
public interface FlowableTransformer<Upstream, Downstream> {    Publisher<Downstream> apply(Flowable<Upstream> upstream); }
public interface MaybeTransformer<Upstream, Downstream> {    MaybeSource<Downstream> apply(Maybe<Upstream> upstream); }
public interface SingleTransformer<Upstream, Downstream> {    SingleSource<Downstream> apply(Single<Upstream> upstream);


这里以FlowableTransformer为例,创建一个Transformer调度器:

//子线程运行,主线程回调

public FlowableTransformer<T, T> io_main(
 final RxAppCompatActivity context) {        
 return new FlowableTransformer<T, T>() {            
 @Override  
 public Publisher<T> apply(Flowable<T> flowable) {                
      return flowable    .subscribeOn(Schedulers.io())    .doOnSubscribe(new Consumer<Subscrition>() {                            
    @Override     public void accept(Subscription subscription) throws Exception {  
   DialogHelper.showProgressDlg(context, mMessage);                            }                        })             .subscribeOn(AndroidSchedulers.mainThread())             .observeOn(AndroidSchedulers.mainThread())             .compose(
         RxLifecycle.<T, ActivityEvent>bindUntilEvent(context.lifecycle(), ActivityEvent.DESTROY));            }        };    }


上面这个方法出自一叶飘舟的,用法和源码详见:

其他改变请看下篇!

---我是分割线---



Tamic开发社区

非专业的移动社区

不只是干货,还有人生

长按二维码关注我们


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存