【翻译】Reactor Netty参考指南 - 4TCP客

Reactor Netty参考指南目录


原文地址

Reactor Netty提供了易于使用、易于配置的TcpClient。它隐藏了创建TCP客户端所需的大部分Netty的功能,并增加了Reactive Streams背压。

4.1.连接和断开

要将TCP客户端连接到给定的端点,您必须创建并且配置一个TcpClient实例。默认情况下,hostlocalhostpost12012。下面是创建一个TcpClient的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create() //<1>
> .connectNow(); //<2>
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 创建一个TcpClient实例用来进行的配置操作。

<2> 用阻塞的方式进行连接操作,并且等待它初始化完成。

返回的Connection对象提供了简单的连接相关的API,包括disposeNow(),调用这个方法会以阻塞的方式关闭客户端。

4.1.1.Host和Port

想要连接特定的hostport,您可以使用以下方式来配置TCP客户端。示例如下:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com") //<1>
> .port(80) //<2>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 配置TCP的host

<2> 配置TCP的port

4.2.预先初始化

默认情况下,TcpClient初始化资源的操作在需要使用的时候才进行。这意味着初始化加载的时候connect operation会占用额外的时间:

  • 事件循环组
  • 主机名解析器
  • native传输库(当使用了native传输的时候)
  • 用于安全性的native库(使用了OpenSsl的时候)

当您需要预加载这些资源的时候,您可以按照以下方式来配置TcpClient

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
> java复制代码import reactor.core.publisher.Mono;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> TcpClient tcpClient =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")));
>
> tcpClient.warmup() //<1>
> .block();
>
> Connection connection = tcpClient.connectNow(); //<2>
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 初始化和加载事件循环组,主机名解析器,native传输库和用于安全性的native库

<2> 在连接远程节点的时候会进行主机名解析

4.3.写出数据

如果要发送数据到一个已有的端点,您必须添加一个I/O处理器。这个I/O处理器可以通过NettyOutbound来写出数据。

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> java复制代码import reactor.core.publisher.Mono;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 发送hello字符串给这个端点。

4.4.消费数据

如果要接收从已有端点发过来的数据,您必须添加一个I/O处理器。这个I/O处理器可以通过NettyInbound来读取数据。示例如下:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .handle((inbound, outbound) -> inbound.receive().then()) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 接收从已有端点发送过来的数据

4.5.生命周期回调

下面的生命周期回调用参数是提供给您用来扩展TcpClient的:

Callback Description
doAfterResolve 在成功解析远程地址之后调用。
doOnChannelInit 在初始化channel的时候调用。
doOnConnect 当channel将要连接的时候调用。
doOnConnected 当channel已经连接上的时候调用。
doOnDisconnected 当channel断开的时候被调用。
doOnResolve 当远程地址将要被解析的时候被调用。
doOnResolveError 在远程地址解析失败的情况下被调用。

下面是使用doOnConnecteddoOnChannelInit回调的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
> java复制代码import io.netty.handler.logging.LoggingHandler;
> import io.netty.handler.timeout.ReadTimeoutHandler;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
> import java.util.concurrent.TimeUnit;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .doOnConnected(conn ->
> conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) //<1>
> .doOnChannelInit((observer, channel, remoteAddress) ->
> channel.pipeline()
> .addFirst(new LoggingHandler("reactor.netty.examples")))//<2>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 当一个channel连接上之后添加了一个ReadTimeoutHandlerNetty pipeline。

<2> 当初始化channel的时候添加了一个LoggingHandlerNetty pipeline。

4.6.TCP层的配置

这一章节描述了三种TCP层的配置方式:

4.6.1.Channel Options

默认情况下,TCP客户端配置了以下options:

./../../reactor-netty-core/src/main/java/reactor/netty/tcp/TcpClientConnect.java

1
2
3
4
5
6
7
8
> java复制代码TcpClientConnect(ConnectionProvider provider) {
> this.config = new TcpClientConfig(
> provider,
> Collections.singletonMap(ChannelOption.AUTO_READ, false),
> () -> AddressUtils.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
> }
>
>

如果需要添加新的option或者修改已有的option,您可以使用如下的方式:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> java复制代码import io.netty.channel.ChannelOption;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

您可以通过以下的链接找到更多关于Nettychannel options的信息:

4.6.2.Wire Logger

Reactor Netty提供了线路记录(wire logging)用来检查点对点的流量。默认情况下,线路记录是关闭的。如果想要开启它,您必须将日志reactor.netty.tcp.TcpClient的设置为DEBUG等级并且按如下方式进行配置:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .wiretap(true) //<1>
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 开启线路记录

默认情况下,线路记录在输出内容的时候会使用AdvancedByteBufFormat#HEX_DUMP。您也可以通过配置TcpClient改为AdvancedByteBufFormat#SIMPLE或者AdvancedByteBufFormat#TEXTUAL

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
> java复制代码import io.netty.handler.logging.LogLevel;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
> import reactor.netty.transport.logging.AdvancedByteBufFormat;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) //<1>
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 开启线路记录并使用AdvancedByteBufFormat#TEXTUAL来输出内容。

4.6.3.Event Loop Group

默认情况下,TCP客户端使用一个”Event Loop Group”,工作线程数等于初始化的时候可以用的处理器数量(但最小是4)。您也可以使用LoopResource#create其中的一个方法来修改配置。

默认的Event Loop Group配置如下:

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
> java复制代码/**
> * Default worker thread count, fallback to available processor
> * (but with a minimum value of 4)
> */
> public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
> /**
> * Default selector thread count, fallback to -1 (no selector thread)
> */
> public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
> /**
> * Default worker thread count for UDP, fallback to available processor
> * (but with a minimum value of 4)
> */
> public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
> /**
> * Default quiet period that guarantees that the disposal of the underlying LoopResources
> * will not happen, fallback to 2 seconds.
> */
> public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
> /**
> * Default maximum amount of time to wait until the disposal of the underlying LoopResources
> * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
> */
> public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
>
> /**
> * Default value whether the native transport (epoll, kqueue) will be preferred,
> * fallback it will be preferred when available
> */
> public static final String NATIVE = "reactor.netty.native";
>
>

如果需要修改这些设置,您也可以通过如下方式进行配置:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> java复制代码import reactor.netty.Connection;
> import reactor.netty.resources.LoopResources;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
>
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .runOn(loop)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

4.7.连接池

默认情况下,TCP客户端使用一个”固定的”的连接池,最大的channel数量是500,待处理队列中最大的获取注册请求数为1000(其余配置请查看下面的系统属性)。这意味着,如果有人尝试从池中获取一个channel,但是池中没有可用的channel则会创建一个新的channel。当池中的channel数量达到了最大值时,新的获取channel的操作会被延迟,直到一个可用的channel再次返回到池中。

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
> java复制代码/**
> * Default max connections. Fallback to
> * available number of processors (but with a minimum value of 16)
> */
> public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
> /**
> * Default acquisition timeout (milliseconds) before error. If -1 will never wait to
> * acquire before opening a new
> * connection in an unbounded fashion. Fallback 45 seconds
> */
> public static final String POOL_ACQUIRE_TIMEOUT = "reactor.netty.pool.acquireTimeout";
> /**
> * Default max idle time, fallback - max idle time is not specified.
> */
> public static final String POOL_MAX_IDLE_TIME = "reactor.netty.pool.maxIdleTime";
> /**
> * Default max life time, fallback - max life time is not specified.
> */
> public static final String POOL_MAX_LIFE_TIME = "reactor.netty.pool.maxLifeTime";
> /**
> * Default leasing strategy (fifo, lifo), fallback to fifo.
> * <ul>
> * <li>fifo - The connection selection is first in, first out</li>
> * <li>lifo - The connection selection is last in, first out</li>
> * </ul>
> */
> public static final String POOL_LEASING_STRATEGY = "reactor.netty.pool.leasingStrategy";
> /**
> * Default {@code getPermitsSamplingRate} (between 0d and 1d (percentage))
> * to be used with a {@link SamplingAllocationStrategy}.
> * This strategy wraps a {@link PoolBuilder#sizeBetween(int, int) sizeBetween} {@link AllocationStrategy}
> * and samples calls to {@link AllocationStrategy#getPermits(int)}.
> * Fallback - sampling is not enabled.
> */
> public static final String POOL_GET_PERMITS_SAMPLING_RATE = "reactor.netty.pool.getPermitsSamplingRate";
> /**
> * Default {@code returnPermitsSamplingRate} (between 0d and 1d (percentage))
> * to be used with a {@link SamplingAllocationStrategy}.
> * This strategy wraps a {@link PoolBuilder#sizeBetween(int, int) sizeBetween} {@link AllocationStrategy}
> * and samples calls to {@link AllocationStrategy#returnPermits(int)}.
> * Fallback - sampling is not enabled.
> */
> public static final String POOL_RETURN_PERMITS_SAMPLING_RATE = "reactor.netty.pool.returnPermitsSamplingRate";
>
>

如果您需要禁用连接池,您可以使用如下配置:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.newConnection()
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

如果您需要为连接池中的channel设置一个特定的空闲时间,您可以使用如下配置:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
> java复制代码import reactor.netty.Connection;
> import reactor.netty.resources.ConnectionProvider;
> import reactor.netty.tcp.TcpClient;
> import java.time.Duration;
>
> public class Application {
>
> public static void main(String[] args) {
> ConnectionProvider provider =
> ConnectionProvider.builder("fixed")
> .maxConnections(50)
> .pendingAcquireTimeout(Duration.ofMillis(30000))
> .maxIdleTime(Duration.ofMillis(60))
> .build();
>
> Connection connection =
> TcpClient.create(provider)
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

当您期望负载很高时,需要谨慎使用一个最大连接数很高的连接池。因为您可以会遇到reactor.netty.http.client.PrematureCloseException异常,其根本原因是太多并发连接的opened/acquired操作导致的”Connect Timeout”。

4.7.1.度量

池化的ConnectionProvider提供了与Micrometer的内建集成。它暴露了所有前缀为reactor.netty.connection.provider的度量。

池化的ConnectionProvider度量

度量名称 类型 描述
reactor.netty.connection.provider.total.connections Gauge 所有连接的数,包括活跃的和空闲的
reactor.netty.connection.provider.active.connections Gauge 已经被成功获取了并且正在使用的连接数
reactor.netty.connection.provider.idle.connections Gauge 空闲的连接数
reactor.netty.connection.provider.pending.connections Gauge 正在等待可用连接的请求数

下面是开启集成的度量的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
> java复制代码import reactor.netty.Connection;
> import reactor.netty.resources.ConnectionProvider;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> ConnectionProvider provider =
> ConnectionProvider.builder("fixed")
> .maxConnections(50)
> .metrics(true) //<1>
> .build();
>
> Connection connection =
> TcpClient.create(provider)
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 开启内建集成的Micrometer

4.8.SSL和TLS

当您需要使用SSL或者TLS的时候,可以使用下面列出来方式进行配置。默认情况,如果OpenSSL可用的话,则使用SslProvider.OPENSSL。否则使用SslProvider.JDK。可以通过SslContextBuilder或者设置-Dio.netty.handler.ssl.noOpenSsl=true来进行切换。

下面的是使用SslContextBuilder的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> java复制代码import io.netty.handler.ssl.SslContextBuilder;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
>
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(443)
> .secure(spec -> spec.sslContext(sslContextBuilder))
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

4.8.1.服务器名称标识

默认情况下,TCP客户端将远程主机名作为SNI服务器名发送。当您需要修改默认设置的时候,您可以通过如下方式配置TCP客户端:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
> java复制代码import io.netty.handler.ssl.SslContext;
> import io.netty.handler.ssl.SslContextBuilder;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> import javax.net.ssl.SNIHostName;
>
> public class Application {
>
> public static void main(String[] args) throws Exception {
> SslContext sslContext = SslContextBuilder.forClient().build();
>
> Connection connection =
> TcpClient.create()
> .host("127.0.0.1")
> .port(8080)
> .secure(spec -> spec.sslContext(sslContext)
> .serverNames(new SNIHostName("test.com")))
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

4.9.代理支持

TCP客户端支持Netty提供的代理功能,并通过ProxyProvider构建器提供了一种特定的”非代理主机”的方式。下面是使用ProxyProvider的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> java复制代码import reactor.netty.Connection;
> import reactor.netty.transport.ProxyProvider;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .proxy(spec -> spec.type(ProxyProvider.Proxy.SOCKS4)
> .host("proxy")
> .port(8080)
> .nonProxyHosts("localhost"))
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

4.10.度量

TCP客户端支持与Micrometer的内置集成。它暴露了所有前缀为reactor.netty.tcp.client的度量。

下面的表格提供了TCP客户端度量的相关信息:

度量名称 类型 描述
reactor.netty.tcp.client.data.received DistributionSummary 收到的数据量,以字节为单位
reactor.netty.tcp.client.data.sent DistributionSummary 发送的数据量,以字节为单位
reactor.netty.tcp.client.errors Counter 发生的错误数量
reactor.netty.tcp.client.tls.handshake.time Timer TLS握手所花费的时间
reactor.netty.tcp.client.connect.time Timer 连接远程地址所花费的时间
reactor.netty.tcp.client.address.resolver Timer 解析远程地址花费的时间

下面额外的度量也是可用的:

池化的ConnectionProvider度量

度量名称 类型 描述
reactor.netty.connection.provider.total.connections Gauge 所有连接的数,包括活跃的和空闲的
reactor.netty.connection.provider.active.connections Gauge 已经被成功获取了并且正在使用的连接数
reactor.netty.connection.provider.idle.connections Gauge 空闲的连接数
reactor.netty.connection.provider.pending.connections Gauge 正在等待可用连接的请求数

ByteBufAllocator度量

度量名称 类型 描述
reactor.netty.bytebuf.allocator.used.heap.memory Gauge 堆内存的字节数
reactor.netty.bytebuf.allocator.used.direct.memory Gauge 堆外内存的字节数
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge 堆内存的个数(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge 堆外内存的个数(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge threadlocal的缓存数量(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge 微小缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge 小缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge 一般缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge 一个区域的块大小(当使用PooledByteBufAllocator的时候)

下面是开启集成的度量的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .metrics(true) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 开启内建集成的Micrometer

如果您想让TCP客户端度量与除了Micrometer之外的系统集成或者想提供自己与Micrometer的集成来添加自己的度量记录器,您可以按如下方式实现:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
> java复制代码import reactor.netty.Connection;
> import reactor.netty.channel.ChannelMetricsRecorder;
> import reactor.netty.tcp.TcpClient;
>
> import java.net.SocketAddress;
> import java.time.Duration;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .metrics(true, CustomChannelMetricsRecorder::new) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
>
> }
>
>

<1> 开启TCP客户端度量并且提供ChannelMetricsRecorder的实现。

4.11.Unix域套接字

当使用本地传输时,TCP客户端支持Unix域套接字(UDS)。

下面是使用UDS的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> java复制代码import io.netty.channel.unix.DomainSocketAddress;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock")) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 指定将使用的DomainSocketAddress

4.12.主机名解析

默认情况下,TcpClient使用Netty的域名查询机制来异步解析域名。用来替代JVM内置阻塞解析器。

当您需要修改默认设置的时候,您可以像如下通过配置TcpClient来实现:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> import java.time.Duration;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .resolver(spec -> spec.queryTimeout(Duration.ofMillis(500))) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 解析器每次执行DNS查询的超时时间设置为500ms。

下面的列表展示了可用的配置:

配置名称 描述
cacheMaxTimeToLive DNS资源记录缓存的最大存活时间(单位:秒)。如果DNS服务器返回的DNS资源记录的存活时间大于这个最大存活时间。该解析器将忽略来自DNS服务器的存活时间,并使用这个最大存活时间。默认为Integer.MAX_VALUE
cacheMinTimeToLive DNS资源记录缓存的最小存活时间(单位:秒)。如果 DNS 服务器返回的 DNS 资源记录的存活时间小于这个最小存活时间,则此解析器将忽略来自DNS服务器的存活时间并使用这个最小存活时间。默认:0。
cacheNegativeTimeToLive DNS查询失败的缓存时间(单位:秒)。默认:0。
disableOptionalRecord 禁用自动包含的可选设置,该设置会试图告诉远程DNS服务器解析器每次响应可以读取多少数据。默认情况下,该设置为启用状态。
disableRecursionDesired 用于指定该解析器在查询DNS的时候会不会带上期望递归查询(RD)的标志,默认情况下,该设置为启用状态。
maxPayloadSize 设置数据报包缓冲区的容量(单位:字节)。 默认:4096
maxQueriesPerResolve 设置解析主机名允许发送的最大DNS查询次数。默认:16
ndots 设置在进行初始化绝对查询的时候,名称中必须出现的点的数量。默认值:-1(用于判断Unix操作系统的值,否则使用1)。
queryTimeout 设置该解析器每次DNS查询的超时时间(单位:毫秒)。默认:5000
resolvedAddressTypes 解析地址的协议族列表。
roundRobinSelection 启用DnsNameResolverAddressResolverGroup,用于当命名服务器提供了多个地址的时候支持随机选择目标地址。参见RoundRobinDnsAddressResolverGroup。默认:DnsAddressResolverGroup
runOn 在给定的LoopResources上执行与DNS服务器的通信。默认情况下,LoopResources只在客户端上被使用。
searchDomains 解析器的搜索域列表。默认情况下,有效搜索域列表是使用的系统DNS搜索域。
trace 在解析器在解析失败时生成详细的跟踪信息时使用的日志记录器和日志级别。

有时候,您或许想切换为JVM内建的解析器。您可以通过如下配置TcpClient的方式来实现:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> java复制代码import io.netty.resolver.DefaultAddressResolverGroup;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .resolver(DefaultAddressResolverGroup.INSTANCE) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 设置为JVM内建的解析器。

Suggest Edit to “TCP Client


Reactor Netty参考指南目录


版权声明:如需转载,请带上本文链接、注明来源和本声明。否则将追究法律责任。https://www.immuthex.com/posts/reactor-netty-reference-guide/tcp-client

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%