哈哈哈哈哈,题目有点猖狂。但是既然你都来了,那就看看吧,毕竟响应式编程随着高并发对于性能的吃紧,越来越重要了。
哦对了,这是一篇Java文章。
废话不多说,直接步入正题。
响应式编程核心组件
步入正题之前,我希望你对发布者/订阅者模型有一些了解。
直接看图:
Talk is cheap, show you the code!
1 | Java复制代码public class Main { |
输出:
1 | vbnet复制代码run1: 0 |
Flux
Flux是一个多元素的生产者,言外之意,它可以生产多个元素,组成元素序列,供订阅者使用。
Mono
Mono和Flux的区别在于,它只能生产一个元素供生产者订阅,也就是数量的不同。
Mono的一个常见的应用就是Mono
快速创建一个Flux/Mono并订阅它
来看一些官方文档演示的方法。
1 | Java复制代码Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); |
subscribe()方法(Lambda形式)
- subscribe()方法默认接受一个Lambda表达式作为订阅者来使用。它有四个变种形式。
- 在这里说明一下subscribe()第四个参数,指出了当订阅信号到达,初次请求的个数,如果是null则全部请求(Long.MAX_VALUE)
1 | Java复制代码public class FluxIntegerWithSubscribe { |
如果去掉初次请求,那么会请求最大值:
1 | Java复制代码public class FluxIntegerWithSubscribe { |
输出:
1 | arduino复制代码run |
继承BaseSubscriber(非Lambda形式)
- 这种方式更多像是对于Lambda表达式的一种替换表达。
- 对于基于此方法的订阅,有一些注意事项,比如初次订阅时,要至少请求一次。否则会导致程序无法继续获得新的元素。
1 | Java复制代码public class FluxWithBaseSubscriber { |
输出:
1 | Java复制代码开始啦! |
终止订阅:Disposable
- Disposable是一个订阅时返回的接口,里面包含很多可以操作订阅的方法。
- 比如取消订阅。
在这里使用多线程模拟生产者生产的很快,然后立马取消订阅(虽然立刻取消但是由于生产者实在太快了,所以订阅者还是接收到了一些元素)。
其他的方法,比如Disposables.composite()会得到一个Disposable的集合,调用它的dispose()方法会把集合里的所有Disposable的dispose()方法都调用。
1 | Java复制代码public class FluxWithDisposable { |
输出:
1 | 复制代码这里的输出每次调用可能都会不同,因为订阅之后取消了,所以能打印多少取决于那一瞬间CPU的速度。 |
调整发布者发布速率
- 为了缓解订阅者压力,订阅者可以通过负压流回溯进行重塑发布者发布的速率。最典型的用法就是下面这个——通过继承BaseSubscriber来设置自己的请求速率。但是有一点必须明确,就是hookOnSubscribe()方法必须至少请求一次,不然你的发布者可能会“卡住”。
1 | Java复制代码public class FluxWithLimitRate1 { |
- 或者使用limitRate()实例方法进行限制,它返回一个被限制了速率的Flux或Mono。某些上流的操作可以更改下流订阅者的请求速率,有一些操作有一个prefetch整型作为输入,可以获取大于下流订阅者请求的数量的序列元素,这样做是为了处理它们自己的内部序列。这些预获取的操作方法一般默认预获取32个,不过为了优化;每次已经获取了预获取数量的75%的时候,会再获取75%。这叫“补充优化”。
1 | Java复制代码public class FluxWithLimitRate2 { |
程序化地创建一个序列
静态同步方法:generate()
现在到了程序化生成Flux/Mono的时候。首先介绍generate()方法,这是一个同步的方法。言外之意就是,它是线程不安全的,且它的接收器只能一次一个的接受输入来生成Flux/Mono。也就是说,它在任意时刻只能被调用一次且只接受一个输入。
或者这么说,它生成的元素序列的顺序,取决于代码编写的方式。
1 | Java复制代码public class FluxWithGenerate { |
- 通过上述代码不难看到,每次的接收器接受的值来自于上一次生成方法的返回值,也就是state=上一个迭代的返回值(其实称为上一个流才准确,这么说只是为了方便理解)。
- 不过这个state每次都是一个全新的(每次都+1当然是新的),那么有没有什么方法可以做到前后两次迭代的state是同一个引用且还可以更新值呢?答案就是原子类型。也就是上面的第二种方式。
静态异步多线程方法:create()
说完了同步生成,接下来就是异步生成,还是多线程的!让我们有请:create()闪亮登场!!!
- create()方法对外暴露出一个FluxSink对象,通过它我们可以访问并生成需要的序列。除此之外,它还可以触发回调中的多线程事件。
- create另一特性就是很容易把其他的接口与响应式桥接起来。注意,它是异步多线程并不意味着create可以并行化你写的代码或者异步执行;怎么理解呢?就是,create方法里面的Lambda表达式代码还是单线程阻塞的。如果你在创建序列的地方阻塞了代码,那么可能造成订阅者即使请求了数据,也得不到,因为序列被阻塞了,没法生成新的。
- 其实通过上面的现象可以猜测,默认情况下订阅者使用的线程和create使用的是一个线程,当然阻塞create就会导致订阅者没法运行咯!
- 上述问题可以通过Scheduler解决,后面会提到。
1 | Java复制代码public class FluxWithCreate { |
静态异步单线程方法:push()
说完了异步多线程,同步的生成方法,接下来就是异步单线程:push()。
其实说到push和create的对比,我个人理解如下:
- create允许多线程环境下调用.next()方法,只管生成元素,元素序列的顺序取决于…算了,随机的,毕竟多线程;
- 但是push只允许一个线程生产元素,所以是有序的,至于异步指的是在新的线程中也可以,而不必非得在当前线程。
- 顺带一提,push和create都支持onCancel()和onDispose()操作。一般来说,onCancel只响应于cancel操作,而onDispose响应于error,cancel,complete等操作。
1 | Java复制代码public class FluxWithPush { |
同create一样,push也支持负压调节。但是我没写出来,我试过的Demo都是直接请求Long.MAX_VALUE,其实就是通过sink.onRequest(LongConsumer)方法调用来实现负压控制的。原理在这,想深究的请自行探索,鄙人不才,花费一下午没实现。
实例方法:handle()
在Flux的实例方法里,handle类似filter和map的操作。
1 | Java复制代码public class FluxWithHandle { |
线程和调度
Schedulers的那些静态方法
一般来说,响应式框架都不支持并发,P.s. create那个是生产者并发,它本身不是并发的。所以也没有可用的并发库,需要开发者自己实现。
同时,每一个操作一般都是在上一个操作所在的线程里运行,它们不会拥有自己的线程,而最顶的操作则是和subscribe()在同一个线程。比如Flux.create(…).handle(…).subscribe(…)都在主线程运行的。
在响应式框架里,Scheduler决定了操作在哪个线程被怎么执行,它的作用类似于ExecutorService。不过功能稍微多点。如果你想实现一些并发操作,那么可以考虑使用Schedulers提供的静态方法,来看看有哪些可用的:
Schedulers.immediate(): 直接在当前线程提交Runnable任务,并立即执行。
1 | Java复制代码package com.learn.reactor.flux; |
通过上面看得出,immediate()其实就是在执行位置插入需要执行的Runnable来实现的。和直接把代码写在这里没什么区别。
Schedulers.newSingle():保证每次执行的操作都使用的是一个新的线程。
1 | Java复制代码package com.learn.reactor.flux; |
Schedulers.single(),它的作用是为当前操作开辟一个新的线程,但是记住,所有使用这个方法的操作都共用一个线程;
Schedulers.elastic():一个弹性无界线程池。
无界一般意味着不可管理,因为它可能会导致负压问题和过多的线程被创建。所以马上就要提到它的替代方法。
Schedulers.bounededElastic():有界可复用线程池
1 | Java复制代码package com.learn.reactor.flux; |
Schedulers.boundedElastic()是一个更好的选择,因为它可以在需要的时候创建工作线程池,并复用空闲的池;同时,某些池如果空闲时间超过一个限定的数值就会被抛弃。
同时,它还有一个容量限制,一般10倍于CPU核心数,这是它后备线程池的最大容量。最多提交10万条任务,然后会被装进任务队列,等到有可用时再调度,如果是延时调度,那么延时开始时间是在有线程可用时才开始计算。
由此可见Schedulers.boundedElastic()对于阻塞的I/O操作是一个不错的选择,因为它可以让每一个操作都有自己的线程。但是记得,太多的线程会让系统备受压力。
Schedulers.parallel():提供了系统级并行的能力
1 | Java复制代码package com.learn.reactor.flux; |
最后,Schedulers.parallel()提供了并行的能力,它会创建数量等于CPU核心数的线程来实现这一功能。
其他线程操作
顺带一提,还可以通过ExecutorService创建新的Scheduler。当然,Schedulers的一堆newXXX方法也可以。
有一点很重要,就是boundedElastic()方法可以适用于传统阻塞式代码,但是single()和parallel()都不行,如果你非要这么做那就会抛异常。自定义Schedulers可以通过设置ThreadFactory属性来设置接收的线程是否是被NonBlocking接口修饰的Thread实例。
Flux的某些方法会使用默认的Scheduler,比如Flux.interval()方法就默认使用Schedulers.parallel()方法,当然可以通过设置Scheduler来更改这种默认。
在响应式链中,有两种方式可以切换执行上下文,分别是publishOn()和subscribeOn()方法,前者在流式链中的位置很重要。在Reactor中,可以以任意形式添加任意数量的订阅者来满足你的需求,但是,只有在设置了订阅方法后,才能激活这条订阅链上的全部对象。只有这样,请求才会上溯到发布者,进而产生源序列。
在订阅链中切换执行上下文
publishOn()
publishOn()就和普通操作一样,添加在操作链的中间,它会影响在它下面的所有操作的执行上下文。看个例子:
1 | Java复制代码public class FluxWithPublishOnSubscribeOn { |
subscribeOn()
1 | Java复制代码public class FluxWithPublishOnSubscribeOn { |
subscribeOn()方法会把订阅之后的整个订阅链都切换到新的执行上下文中。无论在subscribeOn()哪里,都可以把最前面的订阅之后的订阅序列进行切换,当然了,如果后面还有publishOn(),publishOn()会进行新的切换。
本文转载自: 掘金