一、前言
在spring cloud gateway中大量使用了spring5才引入的反应式编程(Reactive Programming)。在此背景下,本文介绍下反应式编程相关使用及原理。
二、反应式编程简介
2.1 什么是反应式编程
反应式编程(Reactive programming,Rx)最初来源于函数式语言里面的函数式反应编程(Functional Reactive programming,FRP)。后来随着微软.Net Framework增加了Reactive Extension而在主流语言中流行起来。
反应式编程是一种编程思想、编程方式,是为了简化并发编程而出现的。与传统的处理方式相比,它能够基于数据流中的事件进行反应处理。例如:a+b=c的场景,在传统编程方式下如果a、b发生变化,那么我们需要重新计算a+b来得到c的新值。而反应式编程中,我们不需要重新计算,a、b的变化事件会触发c的值自动更新。这种方式类似于我们在消息中间件中常见的发布/订阅模式。由流发布事件,而我们的代码逻辑作为订阅方基于事件进行处理,并且是异步处理的。
反应式编程中,最基本的处理单元是事件流(事件流是不可变的,对流进行操作只会返回新的流)中的事件。流中的事件包括正常事件(对象代表的数据、数据流结束标识)和异常事件(异常对象,例如Exception)。同时,只有当订阅者第一次发布者,发布者发布的事件流才会被消费,后续的订阅者只能从订阅点开始消费,但是我们可以通过背压、流控等方式控制消费。
2.2 反应式编程的优势
反应式编程的核心是基于事件流、无阻塞、异步的,使用反应式编程不需要编写底层的并发、并行代码。并且由于其声明式编写代码的方式,使得异步代码易读且易维护。
反应式编程主要优点有:
1、整体采用了观察者模式,异步解耦,提高服务器的吞吐量。
2、内部提出了背压(Backpressure)概念,可以控制消费的速度
3、书写方式与迭代器,stream类似,方便使用者理解。
2.3 与非反应式编程对比
2.3.1 与迭代器对比
Iterable | Observable | |
---|---|---|
迭代 | next() | onNext() |
异常 | throws Exception | onError() |
完成 | !hasNext() | onCompleted() |
上面表格中的 Observable 那一列表示了观察者接受到相关事件时触发的动作。如果将迭代器看作是拉模式,那观测者模式便是推模式。被观察者(Subject)主动的推送数据给订阅者(Subscriber),触发 onNext 方法,出现异常时触发onError(),完成后触onCompleted()。
2.3.2 与stream对比
事件 | stream | Observable |
---|---|---|
映射 | map() | map() |
过滤 | filter() | filter() |
与stream对比可以看出,Reactive Programming也是通过类似的数据流方式来处理订阅的数据。不同点在于stream无法控制消息发送速度,而反应式编程中如果 Publisher 发布消息太快,超过了 Subscriber 的处理速度,反应式编程提供了背压机制来控制 Publisher的速度。
三、Reactor 入门
3.1 Reactor中主要的类
Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。
Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发表者。
Subscriber观察者,用来观察Publisher相关动作。
Subscription解耦Subscriber和Publisher。
Mono和Flux可以相互转换。多个Mono可以合并成一个Fulx,一个Flux也可以转化成Mono。
3.2 Reactor中主要的方法
创建
just,根据参数创建数据流
never,创建一个不会发出任何数据的无限运行的数据流
empty,创建一个不包含任何数据的数据流,不会无限运行。
error,创建一个订阅后立刻返回异常的数据流
concact,从多个Mono创建Flux
generate,同步、逐一的创建复杂流。重载方法支持生成状态。在方法内部的lambda中通过调用next和complete、error来指定当前循环返回的流中的元素(并不是return)。
create,支持同步、异步、批量的生成流中的元素。
zip,将多个流合并为一个流,流中的元素一一对应
delay,Mono方法,用于指定流中的第一个元素产生的延迟时间
interval,Flux方法,用于指定流中各个元素产生时间的间隔(包括第一个元素产生时间的延迟),从0开始的Long对象组成的流
justOrEmpty,Mono方法,用于指定当初始化时的值为null时返回空的流
defaultIfEmpty,Mono方法,用于指定当流中元素为空时产生的默认值
range,生成一个范围的Integer队列
转化
map,将流中的数据按照逻辑逐个映射为一个新的数据,当流是通过zip创建时,有一个元组入参,元组内元素代表zip前的各个流中的元素。
flatMap,将流中的数据按照逻辑逐个映射一个新的流,新的流之间是异步的。
take,从流中获取N个元素,有多个扩展方法。
zipMap,将当前流和另一个流合并为一个流,两个流中的元素一一对应。
mergeWith,将当前流和另一个流合并为一个流,两个流中的元素按照生成顺序合并,无对应关系。
join,将当前流和另一个流合并为一个流,流中的元素不是一一对应的关系,而是根据产生时间进行合并。
concactWith,将当前流和另一个流按声明顺序(不是元素的生成时间)链接在一起,保证第一个流消费完后再消费第二流
zipWith,将当前流和另一个流合并为一个新的流,这个流可以通过lambda表达式设定合并逻辑,并且流中元素一一对应
first,对于Mono返回多个流中,第一个产生元素的Mono。对于Flux,返回多个Flux流中第一个产生元素的Flux。
block,Mono和Flux中类似的方法,用于阻塞当前线程直到流中生成元素
toIterable,Flux方法,将Flux生成的元素返回一个迭代器
defer,Flux方法,用于从一个Lambda表达式获取结果来生成Flux,这个Lambda一般是线程阻塞的
buffer相关方法,用于将流中的元素按照时间、逻辑规则分组为多个元素集合,并且这些元素集合组成一个元素类型为集合的新流。
window,与buffer类似,但是window返回的流中元素类型还是流,而不是buffer的集合。
filter,顾名思义,返回负责规则的元素组成的新流
reduce,用于将流中的各个元素与初始值(可以设置)逐一累积,最终得到一个Mono。
其他
doOnXXX,当流发生XXX时间时的回调方法,可以有多个,类似于监听。XXX包括Subscribe、Next、Complete、Error等。
onErrorResume,设置流发生异常时返回的发布者,此方法的lambda是异常对象
onErrorReturn,设置流发生异常时返回的元素,无法捕获异常
then,返回Mono,跳过整个流的消费
ignoreElements,忽略整个流中的元素
subscribeOn,配合Scheduler使用,订阅时的线程模型。
publisherOn,配合Scheduler使用,发布时的线程模型。
retry,订阅者重试次数
3.3 一个栗子
假设有个名单列表,要根据名单获取对应名字的邮箱,并且过滤掉邮箱长度小于10的邮箱,最后再将符合条件的邮箱打印出来。
使用stream编程如下所示。
1 | less复制代码Stream.of("Tom", "Bob", "zhangsan", "lisi") |
使用Reactive编程如下所示。
1 | less复制代码Flux.just("Tom", "Bob", "zhangsan", "lisi") |
通过上述例子可以看出,stream和Reactive在形式上有相似之处,都是先创建数据源,然后经过中间过程处理转换,最后再消费中间处理结果。
1 | arduino复制代码Flux.just("Tom", "Bob", "zhangsan", "lisi") |
Flux.just()创建一个Flux的发布者。除了使用just方法外,还有fromCallable,fromIterable等其他方式用来从不同场景中创建publisher。
1 | perl复制代码map(s -> s.concat("@qq.com")) |
map的含义就是映射,在上一步中创建了一个4个元素序列的发布者,在该步骤中将每个序列元素进行转换,在每个名称后面加上邮箱后缀。
1 | scss复制代码filter(s -> s.length() > 10) |
过滤步骤,将经过映射的4个元素进行过滤,剔除掉长度不大于10的。中间过程书写形式和含义与stream类似。
1 | scss复制代码subscribe(System.out::println); |
该步骤是最终的订阅阶段,之前创建的都是被观察者,该步骤是创建一个观察者subscriber。其中subscriber的具体行为就是System.out::println打印出之前处理过的元素。至此一个订阅发布的过程就结束了。
四、Reactor的工作原理
在之前的章节中已经说过,反应式编程的核心就是一个观察者模式。
Flux和Mono相当于观察者模式中的subject,当Flux或Mono调用subscribe方法时,相当于subject发出了一个Event,从而让订阅此事件的观察者进行消费。那Flux框架具体如何实现这套机制呢,还是以上节中的例子跟踪下它是如何工作的。
1 | less复制代码Flux.just("Tom", "Bob", "zhangsan", "lisi") |
本文基于3.1.9.RELEASE版本。
4.1 申明阶段
4.1.1 Flux.just()
进入just方法,经过若干跳转后,进入如下方法。
1 | php复制代码public static <T> Flux<T> fromArray(T[] array) { |
onAssembly是一个钩子方法,暂时忽略。最终就是new FluxArray<>(array)一个对象创建出了一个FluxArray。点击FluxArray的构造函数中,可以看看到,只是把array赋值给了对象内部的array。
1 | php复制代码final T[] array; |
4.1.2 map
Flux.just方法只是创建了一个FluxArray对象,回到最开始定义的地方,下一步执行的是map方法。定义如下所示。
1 | typescript复制代码public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) { |
上一步创建的FluxArray是一个Fuseable,所以执行if条件里的逻辑,创建一个FluxMapFuseable对象,FluxMapFuseable的构造函数中有两个参数,this和mapper。this就是上一步创建出来的FluxArray,mapper就是我们自定义的Lambda表达式,即:s -> s.concat("@qq.com")。再点击进入FluxMapFuseable的构造函数中。
1 | javascript复制代码FluxMapFuseable(Flux<? extends T> source, |
从这个构造函数可以看出,source是上一步骤just得到的FluxArray,mapper是对应map的Lambda表达式,所以当执行map操作的时候,其实是又将FluxArray进行封装,得到了一个新的FluxMapFuseable对象。
4.1.3 filter
再次回到开始的申明地方,在执行完map操作后,接着执行filter。同理,点击filter方法,可以看到如下代码。
1 | kotlin复制代码public final Flux<T> filter(Predicate<? super T> p) { |
在看过map的操作后,这一步骤其实就相当熟悉了,filter步骤将上一步map操作得到的FluxMapFuseable方法又一次封装成了FluxFilterFuseable对象。
4.1.4 申明总结
从上面的定义可以看出,申明阶段就是一层一层的创建各种Flux对象,并没有实际执行任何操作。通过just,map,filter等操作,将发布者一层一层的封装,从最开始的FluxArray对象,到FluxMapFuseable对象以及最后的FluxFilterFuseable对象。如下图所示。
4.2 订阅阶段
4.2.1 subscribe、onsubscribe
上述例子中,just,map,filter只是创建了一个个的对象。并没有实际执行相关逻辑。当调用被观察者的subscribe方法时,会为被观察者添加相应的观察者,同时触发观察者相关方法,从而使整个观察者模式得以进行下去。接着看下Fulx的subscribe方法。
经过一系类的jump后,最终会调用Flux的subscribe,如下所示。
1 | java复制代码public abstract void subscribe(CoreSubscriber<? super T> actual); |
该方法是一个抽象方法,需要看下子类是如何实现的。还记得上一步骤中filter后产生的对象嘛?FluxFilterFuseable是Flux的一个具体实现,当调用subscribe后,会跳转到FluxFilterFuseable的subscribe方法,代码如下。
1 | typescript复制代码public void subscribe(CoreSubscriber<? super T> actual) { |
传进来的actual是System.out::println,也就是我们最终执行的表达式,它被封装成了一个LambdaSubscriber观察者,predicate为filter指定的表达式s -> s.length() > 10,source为上一步骤中生成的FluxMapFuseable对象。根据对象情况,代码会走到2处。2处的逻辑就是将actual和predicate封装成一个**订阅者**去订阅source也就是FluxMapFuseable对象。
接着代码会去调用source的subscribe方法,也就是FluxMapFuseable对应的subscribe方法。
1 | typescript复制代码public void subscribe(CoreSubscriber<? super R> actual) { |
代码还是会走到2出,这里传入的actual是上一步骤中封装了System.out::println和s -> s.length() > 10的观察者,mapper为s -> s.concat("@qq.com"),从这段代码可以看出,所做的逻辑就是将上一步中的观察者和mapper又封装成了新的观察者。一层一层的套娃。
最后,看下本步骤中的source,也就是FluxArray对象的subscribe方法。
1 | php复制代码public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) { |
FluxArray是数据的源头,传入的array为我们定义的"tom", "Bob", "zhangsan", "lisi"名字。s为上一步骤中创建的subscriber。在数据的源头可以看出作为观察者模式的触发项,该步骤中触发了观察者的onsubscribe方法。同时为了解耦观察者和被观察者,创建一个ArraySubscription对象。FluxArray的subscribe会执行2处代码,s.onSubscribe(new ArraySubscription<>(s, array)),这里的s是上一步骤中创建的MapFuseableSubscriber中的onSubscribe方法,对应代码如下所示。
1 | kotlin复制代码@Override |
actual是FilterFuseableSubscriber对象,本质就是赋值后,然后调用FilterFuseableSubscriber的onSubscribe方法。FilterFuseableSubscriber对应的onSubscribe方法如下所示。
1 | kotlin复制代码@Override |
和MapFuseableSubscriber类似。actual对应的是LambdaSubscriber,也就是System.out::println。LambdaSubscriber的onsubscribe如下所示。
1 | scss复制代码public final void onSubscribe(Subscription s) { |
1和2代码的最终逻辑都一样,都会执行request方法。背压的原理就是通过这个request来实现的,观察者可以通过request来指定一次性订阅多少数据。
总结一下:一个subscribe方法其实是创建了三个观察者,与创建发布者类似,创建的观察者也是一层一层嵌套。从最外层的subscriber与上一层的操作结合生成一个新的subscriber。再继续向上调用,最终调用到数据源头。然后从数据源头开始一层一层再出发观察者的onsubscribe。
4.2.2 request
1 | scss复制代码public final void onSubscribe(Subscription s) { |
在onsubscribe阶段最终会调用s的request方法。还记得s嘛?s是在解耦观察者和被观察这创建出来的subscription。
1 | php复制代码public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) { |
就是这里的ArraySubscription对象。看下这个对象的request方法。
1 | ini复制代码@Override |
直接看下fastPath(),代码都贴在了一起。到这里就真正开始消费。通过一个for循环,调用Subscriber的onNext方法,onNext方法执行完毕后,执行Subscriber的onComplete方法。这里的s是MapFuseableConditionalSubscriber,看下它的onNext方法。
1 | kotlin复制代码public void onNext(T t) { |
在1处执行mapper对应的Lambda表达式,在2处执行下一步的Subscriber的onNext方法。下一步是Filter,再下一步是最终的System.out::println。最后onNext都执行换成后,执行s的onComplete方法,道理也是一样的,都是从最开始Subscriber的onComplete方法一层一层执行。至此一个完成的观察者模式的执行情况就完成了。
4.3 总结
1、申明阶段只是创建了一个个的被观察者,把动作包装成对象,其他什么事都没做,直到调用被观察者的subscribe方法,为被观察者添加观察者。
2、添加观察者后,每一个申明步骤都会创建一个新的观察者,观察上一步骤的被观察者,直到最外层被观察者触发onSubscribe,接着按照刚才添加的观察者一层层调用对应的onSubscribe方法,最后触发request方法。
3、当触发到最外层的request后,就执行真正的逻辑,再一层层调用观察者的onNext方法。最后完成后调用onComplete方法。
Q1:反应式编程的背压怎么实现的?
A1:观察者通过request中的传参,控制消费速度,从而实现反应式编程的背压特性,在声明Subscriber时,我们可以重写Subscriber接口,实现里面的request方法,来时控制订阅的速度。
Q2:为什么反应式编程可以提高吞吐量?
A2:从刚才的实现逻辑可以看出,被观察者只是申明了数据操作的定义,实际上什么都没做,几乎不消耗cpu,io等资源。在使用反应式编程处理web请求时,一般会写成如下形式。
1 | typescript复制代码@RequestMapping("/mail") |
只是定义了被观察者,能够很快完成一个请求的处理。
因此在实际应用中,如http请求,短时间如果有大量请求到来,可以快速创建相关请求,增大接口的吞吐量,但是接口的处理速度并不会加快,因为需要处理的请求耗时,都不会减小。
Q3:代码中如何控制订阅的速度?
A3:如果直接使用subecribe(System.out::println),默认走的request(Long.MAX_VALUE)。如果要控制速度有如下几种方式。
1、自己实现Subscriber,上述例子中使用了subecribe(System.out::println),默认创建了一个LambdaSubscriber观察者,在该观察者的onSubscribe方法中执行了s.request(Long.MAX_VALUE);方法,所以可以自己实现一个Subscriber自定义策略给request中传入不同的值,从而控制速度。比如发现当前流量大于500时,限制速度。
1 | arduino复制代码public final void onSubscribe(Subscription s) { |
2、通过flux的limitrate方式实现调整request数量
1 | scss复制代码Flux.range(1,10) |
3、实现现有BaseSubscriber类,重写里面的onSubscribe方法,本质跟1类似。
五、参考文档
cloud.tencent.com/developer/a…
blog.yannxia.top/2018/06/26/…
www.jianshu.com/p/7ee89f70d…
www.jianshu.com/p/df395eb28…
本文转载自: 掘金