RxJava 及响应式编程


介绍

这篇文章主要讲了 RxJava 在异步场景下和 Future、callback、compelableFuture 等的区别,然后介绍了 RxJava 的 API,最后通过具体的业务使用示例了 RxJava 在项目中的应用。

Java 9's Reactive Streams aka Flow API are a set of Interfaces implemented by various reactive streams libraries such as RxJava 2, Akka Streams, and Vertx. They allow these reactive libraries to interconnect, while preserving the all important back-pressure.

举个栗子

通过一个例子对比一下分别使用 future、callback、completableFuture、RxJava 不同方式来实现下面这个调用流程(或者说是一个 DAG 图)。

Future

我们知道 Future.get() 本质还是阻塞的,并不能达到非阻塞的目的。

Callback

上面只是部分实现,callback 看上去可以实现完全非阻塞,但是代码就变成了层层嵌套的回调地狱,很丑难维护

CompletableFuture

显然 CompletableFuture 很适合这种场景,利用其各种 then/when/combile 等接口可以容易组装成一个DAG执行拓扑实现纯异步执行。代码这里就不写了。

RxJava

再来看看 RxJava,和 CompletableFuture 的用法很类似,所有的任务都可以在调用方同一个层次上获取引用,并且可以随意组合、变换等,堪称信手拈来。就跟搭积木一样!

那么问题来了,通过以上对比,显然决赛选手是 RxJava 和 CompletableFuture ,那么他们有啥区别呢?

RxJava

Reactive Extension for Java,是最开始根据微软的 http://Rx.Net 为基础,由 Netflix 主导做出的提供在 JVM 上实现 Reactive Programming 的一种方式。同类的库还有 Project Reactor、Akka 和 Google 的 Agera 等等。但是目前网上分享几乎都是 Android 端的使用,很少有后端使用案例??

下面重点介绍一下 RxJava,RxJava 是 ReactiveX 家族的一份子,而 ReactiveX 致力于为反应式编程提供工具库。目前的 ReactiveX 家族已经很庞大了如下图所示。

ReactiveX = 观察者 + 迭代器 + 函数式,这三个词很好地概括了 ReactiveX 的核心:函数式异步数据流 Observable

Observable可以认为是异步多值的数据结构,它与我们常见的Iterable类似,我们在日常的业务开发中,也更多地是将 Observable 当作增强版的 Future 来使用。

操作 Iterable(pull) Observable(push)
查询 T next() onNext(T)
错误 throws Exception onError(Exception)
完成 !hasNext() onCompleted

Observable 生命周期

和 CompletableFuture 或者 Stream 类似都分为创建、组合/转换、消费三个环节

创建阶段 API

方式 示例 说明
现有数据 Observable.just(100)Observable.from(Arrays.asList(1, 2, 3))Observable.from(future) RxJava 2.x版本API会有变化这些方法用于将现有数据装箱为Observable类型,目的是为了后续的转换与组合
自定义逻辑 Observable.create(...) 最佳实践:可以自己封装helper方法,将Observable当成增强版的Future使用

转换组合阶段API

模式 说明 代码 示意图
依赖关系 Observable.flatMap例如f3依赖于f1的结果
并行关系 Observable.zip例如f3、f4、f5并行,并组成最终结果
分块处理 化大为小,分而治之Observable.from(list) .buffer(n) .flatMap(subList -> subResult) .reduce((acc, subResult) -> result)

监听/消费 阶段API

方式 说明 代码
非阻塞方式 Observable.subscribe回调函数中执行的代码是异步的,有可能主线程完成了,异步线程代码还未执行完成
阻塞方式 Observable.toBlockingfirst/firstOrDefaultlast/lastOrDefaultsingleforEachtoFuture/toIterable

错误处理

方式 说明 代码
doOnError(Throwable) 一般用于打log或者发告警,无法影响错误的产生及传播
onErrorReturn(Throwable, T) 出错时返回降级默认值
onErrorResumeNext(Observable) 出错后执行异步逻辑,返回降级异步值

线程池

虽然 RxJava 是异步编程的利器,但是如果我们只是按照前文所述一顿操作的话,默认是单线程跑的。我们需要通过Scheduler指定调度代码的执行线程池。实际使用过程中,调用Observable.subscribeOn(Scheduler)即可指定该Observable产值和通知的线程池,也就是我们业务开发中的耗时操作(例如RPC调用、IO操作等)对应的线程池。

调度器 对应线程池特点
Schedulers.io() CachedThreadPool无上界不排队的线程池
Schedulers.computation() 计算线程池,线程数=CPU核数
Schedulers.immediate() 使用当前线程
Schedulers.from(Executor) 通过JDK Executor自定义线程池,可根据业务特点自行定义

到底什么是响应式编程(Reactive Programming)

关于响应式编程(Reactive Programming),看了上面对 RxJava 的介绍,你可能有过这样的疑问:我们已经有了 Java8 的 Stream, CompletableFuture, 以及 Optional,为什么还必要存在 RxJava 和 Reactor?

八个层面比较 Java 8, RxJava, Reactor 这篇文章值得一看,从八个层面对比了七个不同的工具,帮助我们理解标准特性与这些库之间的区别。

八个层面分别是:

  1. Composable(可组合)
  2. Lazy(惰性执行)
  3. Reusable(可复用)
  4. Asynchronous(异步)
  5. Cacheable(可缓存)
  6. Push or Pull(推拉模型)
  7. Backpressure(回压)
  8. Operator fusion(操作融合)

对比的七个工具分别为

  1. CompletableFuture(Java 8)
  2. Stream(Java 8)
  3. Optional(Java 8)
  4. Observable (RxJava 1)
  5. Observable (RxJava 2)
  6. Flowable (RxJava 2)
  7. Flux (Reactor Core)

结论汇总为一张图如下

Copyright © jverson.com 2019 all right reserved,powered by Gitbook 20:27

results matching ""

    No results matching ""