开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Netty心跳检测机制原理分析

发表于 2021-11-29

这是我参与11月更文挑战的第29天,活动详情查看:2021最后一次更文挑战

前言

所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.

image.png

这三个参数的含义:

  • readerIdleTimeSeconds
    读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的IdleStateEvent 事件.
  • writerIdleTimeSeconds
    写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的IdleStateEvent 事件.
  • allIdleTimeSeconds
    读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

源码分析

要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:

1
java复制代码pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

1. IdleStateHandler#channelRead

IdleStateHandler#channelRead方法,msg消息透传,不做任何业务了逻辑处理,让channelPipe中的下一个handler处理channelRead方法。

image.png

2.IdleStateHandler#channelActive

channelActive方法, 1. 初始化 2. channelActive 事件
image.png

initialize方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
default:
break;
}

state = 1;
initOutputChanged(ctx);

lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}

Netty心跳检测案例

HeartBeatServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class HeartBeatServer {

public static void main(String[] args) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
//会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
//实现userEventTriggered方法处理对应事件
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartBeatServerHandler());
}
});
System.out.println("netty server start。。");
ChannelFuture future = bootstrap.bind(9000).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}

HeartBeatClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码public class HeartBeatClient {
public static void main(String[] args) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
});

System.out.println("netty client start。。");
Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
String text = "Heartbeat Packet";
Random random = new Random();
while (channel.isActive()) {
int num = random.nextInt(10);
Thread.sleep(2 * 1000);
channel.writeAndFlush(text);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}

static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" + msg);
if (msg != null && msg.equals("idle close")) {
System.out.println(" 服务端关闭连接,客户端也关闭");
ctx.channel().closeFuture();
}
}
}
}

HeartBeatServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {

int readIdleTimes = 0;

@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if ("Heartbeat Packet".equals(s)) {
ctx.channel().writeAndFlush("ok");
} else {
System.out.println(" 其他信息处理 ... ");
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;

String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
readIdleTimes++; // 读空闲的计数加1
break;
case WRITER_IDLE:
eventType = "写空闲";
// 不处理
break;
case ALL_IDLE:
eventType = "读写空闲";
// 不处理
break;
}



System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
if (readIdleTimes > 3) {
System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
ctx.channel().writeAndFlush("idle close");
ctx.channel().close();
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}

演示结果:

image.png

常见的心跳检测

比如说Nacos注册中心, Eureka注册中心等等,都有自己的心跳检测机制。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

LeetCode643 子数组最大平均数 I

发表于 2021-11-29

这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战。

题目描述:

643. 子数组最大平均数 I - 力扣(LeetCode) (leetcode-cn.com)

给你一个由 n 个元素组成的整数数组 nums 和一个整数 k 。

请你找出平均数最大且 长度为 k 的连续子数组,并输出该最大平均数。

任何误差小于 10^-5的答案都将被视为正确答案。

示例一

1
2
3
ini复制代码输入:nums = [1,12,-5,-6,50,3], k = 4
输出:12.75
解释:最大平均数 (12-5-6+50)/4 = 51/4 = 12.75

示例二

1
2
ini复制代码输入: nums = [5], k = 1
输出: 5.00000

提示:

  • n == nums.length
  • 1 <= k <= n <= 10^5
  • -10^4 <= nums[i] <= 10^4

思路分析

滑动窗口

根据题意,我们的长度为固定的K,既然长度固定,那么平均值最大即代表和最大,我们只需要一个长度为k的窗口,每次求出这个窗口内的k个数的和,同时记录,等下一次与这次比较,两者取其大

这有个小小的优化点就是,我们每向右边移动一步,其实不必再重新求和,这样如果k很大的话,会造成很多的浪费,每次滑动我们只需要将上次的和减去滑出去(头)再加上滑进来(尾)的即可得到本次的和。

AC代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Kotlin复制代码class Solution {
fun findMaxAverage(nums: IntArray, k: Int): Double {
var sum = 0
for(i in 0 until k) {
sum += nums[i]
}
var maxSum = sum
for(i in k until nums.size) {
sum = sum - nums[i-k] + nums[i]
maxSum = Math.max(maxSum,sum)
}
return maxSum.toDouble() / k
}
}

总结

滑动窗口类的题还是很容易理解的,知名见意,我们就当手上拿了一个长度为k的框框套在数组上即可。

参考

子数组最大平均数 I - 子数组最大平均数 I - 力扣(LeetCode) (leetcode-cn.com)

643. 子数组最大平均数 I - 子数组最大平均数 I - 力扣(LeetCode) (leetcode-cn.com)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

聊聊 Kafka: Consumer 源码解析之 Consu

发表于 2021-11-29

一、Consumer 的使用

Consumer 的源码解析主要来看 KafkaConsumer,KafkaConsumer 是 Consumer 接口的实现类。KafkaConsumer 提供了一套封装良好的 API,开发人员可以基于这套 API 轻松实现从 Kafka 服务端拉取消息的功能,这样开发人员根本不用关心与 Kafka 服务端之间网络连接的管理、心跳检测、请求超时重试等底层操作,也不必关心订阅 Topic 的分区数量、分区副本的网络拓扑以及 Consumer Group 的 Rebalance 等 Kafka 具体细节,KafkaConsumer 中还提供了自动提交 offset 的功能,使的开发人员更加关注业务逻辑,提高了开发效率。

下面我们来看一个 KafkaConsumer 的示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码/**
* @author: 微信公众号【老周聊架构】
*/
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();

// kafka地址,列表格式为host1:port1,host2:port2,...,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(建议多提供几个,以防提供的服务器关闭) 必须设置
props.put("bootstrap.servers", "localhost:9092");
// key序列化方式 必须设置
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化方式 必须设置
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("group.id", "consumer_riemann_test");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 可消费多个topic,组成一个list
String topic = "riemann_kafka_test";
consumer.subscribe(Arrays.asList(topic));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

从示例中可以看出 KafkaConsumer 的核心方法是 poll(),它负责从 Kafka 服务端拉取消息。核心方法的具体细节我想放在下一篇再细讲,关乎消费侧的客户端与 Kafka 服务端的通信模型。这一篇我们主要从宏观的角度来剖析下 Consumer 消费端的源码。

二、KafkaConsumer 分析

我们先来看下 Consumer 接口,该接口定义了 KafkaConsumer 对外的 API,其核心方法可以分为以下六类:

  • subscribe() 方法:订阅指定的 Topic,并为消费者自动分配分区。
  • assign() 方法:用户手动订阅指定的 Topic,并且指定消费的分区,此方法 subscribe() 方法互斥。
  • poll() 方法:负责从服务端获取消息。
  • commit*() 方法:提交消费者已经消费完成的 offset。
  • seek*() 方法:指定消费者起始消费的位置。
  • pause()、resume() 方法:暂停、继续 Consumer,暂停后 poll() 方法会返回空。

我们先来看下 KafkaConsumer 的重要属性以及 UML 结构图。
在这里插入图片描述
在这里插入图片描述

  • clientId:Consumer 的唯一标识。
  • groupId:消费者组的唯一标识。
  • coordinator:控制着 Consumer 与服务端 GroupCoordinator 之间的通信逻辑,读者可以理解为 Consumer 与服务端 GroupCoordinator 通信的门面。
  • keyDeserializer、valueDeserializer:key 和 value 的反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:ConsumerInterceptors 集合,ConsumerInterceptors.onConsumer() 方法可以在消息通过 poll() 方法返回给用户之前对其进行拦截或修改;ConsumerInterceptors.onCommit() 方法也可以在服务端返回提交 offset 成功的响应进行拦截或修改。
  • client:ConsumerNetworkClient 负责消费者与 Kafka 服务端的网络通信。
  • subscriptions:SubscriptionState 维护了消费者的消费状态。
  • metadata:ConsumerMetadata 记录了整个 Kafka 集群的元信息。
  • currentThread、refcount:分别记录的 KafkaConsumer 的线程 id 和重入次数

三、ConsumerNetworkClient

ConsumerNetworkClient 在 NetworkClient 之上进行了封装,提供了更高级的功能和更易用的 API。

我们先来看下 ConsumerNetworkClient 的重要属性以及 UML 结构图。

在这里插入图片描述
在这里插入图片描述

  • client:NetworkClient 对象。
  • unsent:缓冲队列。UnsentRequests 对象,该对象内部维护了一个 unsent 属性,该属性是 ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>>,key 是 Node 节点,value 是 ConcurrentLinkedQueue<ClientRequest>。
  • metadata:用于管理 Kafka 集群元数据。
  • retryBackoffMs:在尝试重试对给定主题分区的失败请求之前等待的时间量,这避免了在某些故障情况下在紧密循环中重复发送请求。对应 retry.backoff.ms 配置,默认 100 ms。
  • maxPollTimeoutMs:使用 Kafka 的组管理工具时,消费者协调器的心跳之间的预期时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 session.timeout.ms,但通常不应设置为高于该值的 1/3。它可以调整得更低,以控制正常重新平衡的预期时间。对应 heartbeat.interval.ms 配置,默认 3000 ms。构造函数中,maxPollTimeoutMs 取的是 maxPollTimeoutMs 与 MAX_POLL_TIMEOUT_MS 的最小值,MAX_POLL_TIMEOUT_MS 默认为 5000 ms。
  • requestTimeoutMs:配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽,则请求失败。对应 request.timeout.ms 配置,默认 305000 ms。
  • wakeupDisabled:由调用 KafkaConsumer 对象的消费者线程之外的其它线程设置,表示要中断 KafkaConsumer 线程。
  • lock:我们不需要高吞吐量,所以使用公平锁来尽量避免饥饿。
  • pendingCompletion:当请求完成时,它们在调用之前被转移到这个队列。目的是避免在持有此对象的监视器时调用它们,这可能会为死锁打开门。
  • pendingDisconnects:断开与协调器连接节点的队列。
  • wakeup:这个标志允许客户端被安全唤醒而无需等待上面的锁。为了同时启用它,避免需要获取上面的锁是原子的。

ConsumerNetworkClient 的核心方法是 poll() 方法,poll() 方法有很多重载方法,最终会调用 poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) 方法,这三个参数含义是:timer 表示定时器限制此方法可以阻塞多长时间;pollCondition 表示可空阻塞条件;disableWakeup 表示如果 true 禁用触发唤醒。

我们来简单回顾下 ConsumerNetworkClient 的功能:

3.1 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#trySend

循环处理 unsent 中缓存的请求,对每个 Node 节点,循环遍历其 ClientRequest 链表,每次循环都调用 NetworkClient.ready() 方法检测消费者与此节点之间的连接,以及发送请求的条件。若符合条件,则调用 NetworkClient.send() 方法将请求放入 InFlightRequest 中等待响应,也放入 KafkaChannel 中的 send 字段等待发送,并将消息从列表中删除。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码long trySend(long now) {
long pollDelayMs = maxPollTimeoutMs;

// send any requests that can be sent now
// 遍历 unsent 集合
for (Node node : unsent.nodes()) {
Iterator<ClientRequest> iterator = unsent.requestIterator(node);
if (iterator.hasNext())
pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));

while (iterator.hasNext()) {
ClientRequest request = iterator.next();
// 调用 NetworkClient.ready()检查是否可以发送请求
if (client.ready(node, now)) {
// 调用 NetworkClient.send()方法,等待发送请求。
client.send(request, now);
// 从 unsent 集合中删除此请求
iterator.remove();
} else {
// try next node when current node is not ready
break;
}
}
}
return pollDelayMs;
}

3.2 计算超时时间

如果没有请求在进行中,则阻塞时间不要超过重试退避时间。

3.3 org.apache.kafka.clients.NetworkClient#poll

  • 判断是否需要更新 metadata 元数据
  • 调用 Selector.poll() 进行 socket 相关的 IO 操作
  • 处理完成后的操作(处理一系列 handle*() 方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数)

3.4 调用 checkDisconnects() 方法检测连接状态

调用 checkDisconnects() 方法检测连接状态。检测消费者与每个 Node 之间的连接状态,当检测到连接断开的 Node 时,会将其在 unsent 集合中对应的全部 ClientRequest 对象清除掉,之后调用这些ClientRequest 的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码private void checkDisconnects(long now) {
// any disconnects affecting requests that have already been transmitted will be handled
// by NetworkClient, so we just need to check whether connections for any of the unsent
// requests have been disconnected; if they have, then we complete the corresponding future
// and set the disconnect flag in the ClientResponse
for (Node node : unsent.nodes()) {
// 检测消费者与每个 Node 之间的连接状态
if (client.connectionFailed(node)) {
// Remove entry before invoking request callback to avoid callbacks handling
// coordinator failures traversing the unsent list again.
// 在调用请求回调之前删除条目以避免回调处理再次遍历未发送列表的协调器故障。
Collection<ClientRequest> requests = unsent.remove(node);
for (ClientRequest request : requests) {
RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
AuthenticationException authenticationException = client.authenticationException(node);
// 调用 ClientRequest 的回调函数
handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
request.callback(), request.destination(), request.createdTimeMs(), now, true,
null, authenticationException, null));
}
}
}
}

3.5 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#maybeTriggerWakeup

检查 wakeupDisabled 和 wakeup,查看是否有其它线程中断。如果有中断请求,则抛出 WakeupException 异常,中断当前 ConsumerNetworkClient.poll() 方法。

1
2
3
4
5
6
7
8
9
java复制代码public void maybeTriggerWakeup() {
// 通过 wakeupDisabled 检测是否在执行不可中断的方法,通过 wakeup 检测是否有中断请求。
if (!wakeupDisabled.get() && wakeup.get()) {
log.debug("Raising WakeupException in response to user wakeup");
// 重置中断标志
wakeup.set(false);
throw new WakeupException();
}
}

3.6 再次调用 trySend() 方法

再次调用 trySend() 方法。在步骤 2.1.3 中调用了 NetworkClient.poll() 方法,在其中可能已经将 KafkaChannel.send 字段上的请求发送出去了,也可能已经新建了与某些 Node 的网络连接,所以这里再次尝试调用 trySend() 方法。

3.7 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#failExpiredRequests

处理 unsent 中超时请求。它会循环遍历整个 unsent 集合,检测每个 ClientRequest 是否超时,将过期请求加入到 expiredRequests 集合,并将其从 unsent 集合中删除。调用超时 ClientRequest 的回调函数 onFailure()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码private void failExpiredRequests(long now) {
// clear all expired unsent requests and fail their corresponding futures
// 清除所有过期的未发送请求并使其相应的 futures 失败
Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now);
for (ClientRequest request : expiredRequests) {
RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
// 调用回调函数
handler.onFailure(new TimeoutException("Failed to send request after " + request.requestTimeoutMs() + " ms."));
}
}

private Collection<ClientRequest> removeExpiredRequests(long now) {
List<ClientRequest> expiredRequests = new ArrayList<>();
for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) {
Iterator<ClientRequest> requestIterator = requests.iterator();
while (requestIterator.hasNext()) {
ClientRequest request = requestIterator.next();
// 检查是否超时
long elapsedMs = Math.max(0, now - request.createdTimeMs());
if (elapsedMs > request.requestTimeoutMs()) {
// 将过期请求加入到 expiredRequests 集合
expiredRequests.add(request);
requestIterator.remove();
} else
break;
}
}
return expiredRequests;
}

四、RequestFutureCompletionHandler

说 RequestFutureCompletionHandler 之前,我们先来看下 ConsumerNetworkClient.send() 方法。里面的逻辑会将待发送的请求封装成 ClientRequest,然后保存到 unsent 集合中等待发送,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public RequestFuture<ClientResponse> send(Node node,
AbstractRequest.Builder<?> requestBuilder,
int requestTimeoutMs) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
requestTimeoutMs, completionHandler);
// 创建 clientRequest 对象,并保存到 unsent 集合中。
unsent.put(node, clientRequest);

// wakeup the client in case it is blocking in poll so that we can send the queued request
// 唤醒客户端以防它在轮询中阻塞,以便我们可以发送排队的请求。
client.wakeup();
return completionHandler.future;
}

我们重点来关注一下 ConsumerNetworkClient 中使用的回调对象——RequestFutureCompletionHandler。其继承关系如下:

在这里插入图片描述
从 RequestFutureCompletionHandler 继承关系图我们可以知道,它不仅实现了 RequestCompletionHandler 接口,还组合了 RequestFuture 类,RequestFuture 是一个泛型类,其核心字段与方法如下:

  • listeners:RequestFutureListener 队列,用来监听请求完成的情况。RequestFutureListener 接口有 onSuccess() 和 onFailure () 两个方法,对应于请求正常完成和出现异常两种情况。
  • isDone():表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为 true。
  • value():记录请求正常完成时收到的响应,与 exception() 方法互斥。此字段非空表示正常完成,反之表示出现异常。
  • exception():记录导致请求异常完成的异常类,与 value() 互斥。此字段非空则表示出现异常,反之则表示正常完成。

我们之所以要分析源码,是因为源码中有很多设计模式可以借鉴,应用到你自己的工作中。RequestFuture 中有两处典型的设计模式的使用,我们来看一下:

  • compose() 方法:使用了适配器模式。
  • chain() 方法:使用了责任链模式。

4.1 RequestFuture.compose()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码/**
* 适配器
* Adapt from a request future of one type to another.
*
* @param <F> Type to adapt from
* @param <T> Type to adapt to
*/
public abstract class RequestFutureAdapter<F, T> {
public abstract void onSuccess(F value, RequestFuture<T> future);

public void onFailure(RuntimeException e, RequestFuture<T> future) {
future.raise(e);
}
}

/**
* RequestFuture<T> 适配成 RequestFuture<S>
* Convert from a request future of one type to another type
* @param adapter The adapter which does the conversion
* @param <S> The type of the future adapted to
* @return The new future
*/
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
// 适配之后的结果
final RequestFuture<S> adapted = new RequestFuture<>();
// 在当前 RequestFuture 上添加监听器
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
}

@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted;
}

使用 compose() 方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。当调用 RequestFuture<T> 对象的 complete() 或 raise() 方法时,会调用 RequestFutureListener<T> 的 onSuccess() 或 onFailure() 方法,然后调用 RequestFutureAdapter<T, S> 的对应方法,最终调用RequestFuture<S> 对象的对应方法。

在这里插入图片描述

4.2 RequestFuture.chain()

chain() 方法与 compose() 方法类似,也是通过 RequestFutureListener 在多个 RequestFuture 之间传递事件。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public void chain(final RequestFuture<T> future) {
// 添加监听器
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
// 通过监听器将 value 传递给下一个 RequestFuture 对象
future.complete(value);
}

@Override
public void onFailure(RuntimeException e) {
// 通过监听器将异常传递给下一个 RequestFuture 对象
future.raise(e);
}
});
}

好了,ConsumerNetworkClient 的源码分析告一段落了,希望文章对你有帮助,我们下期再见。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot集成Swagger(十)常用注解介绍

发表于 2021-11-29

「这是我参与11月更文挑战的第27天,活动详情查看:2021最后一次更文挑战」


相关文章

Java随笔记:Java随笔记


前言

  • 突然发现自己漏东西了,文档描述的注解好像只讲了两个。。
  • 在此补上,方便大家还有我自己回头的查看。

一、Api

  • @Api标注在类上,主要是说明类的作用。

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    less复制代码@RestController
    @Api(tags = "Swagger测试类")
    public class SwaggerTestController {
    ​
       @RequestMapping(value = "test-swagger",method = RequestMethod.GET)
       public StudentResponse dyTest(){
           return new StudentResponse();
      }
    }
  • 重启后效果如下:

  • image-20211129214641293.png

二、ApiModel

  • @ApiModel用在类上,表示对类进行说明,用于实体类中的参数接收说明

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    less复制代码@Data
    @ApiModel(value = "StudentResponse",description = "测试类返回结果")
    public class StudentResponse{
    ​
       @ApiModelProperty(value = "姓名")
       private String name;
       @ApiModelProperty(value = "年龄")
       private int age;
       @ApiModelProperty(value = "爱好")
       private String like;
    }
  • 效果如下:

  • image-20211129215142056.png

三、ApiModelProperty

  • @ApiModelProperty() 用于字段,表示对 model 属性的说明。

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    less复制代码@Data
    @ApiModel(value = "学生类",description = "这是类的详细描述信息呀")
    public class StudentResponse{
    ​
       @ApiModelProperty(value = "姓名")
       private String name;
       @ApiModelProperty(value = "年龄")
       private int age;
       @ApiModelProperty(value = "爱好")
       private String like;
    }
  • 效果如下:

  • image-20211129215210243.png

四、ApiParam

  • @ApiParam 用于 Controller 中方法的参数说明。

  • 新建添加实体类

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    less复制代码@Data
    @ApiModel(value = "StudentRequest",description = "添加学生实体类")
    public class StudentRequest {
    ​
       @ApiModelProperty(value = "姓名")
       private String name;
       @ApiModelProperty(value = "年龄")
       private int age;
       @ApiModelProperty(value = "爱好")
       private String like;
    }
    ​
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    less复制代码@RestController
    @Api(tags = "Swagger测试类")
    public class SwaggerTestController {
       @PostMapping("Student")
       @ApiResponses({@ApiResponse(code = 200,message = "success", response = StudentResponse.class),
               @ApiResponse(code = 500,message = "failed", response = StudentResponse.class)})
       @ApiOperation(value = "新增学生信息")
       public void addStudent(@ApiParam(value = "学生类", required = true) StudentRequest studentRequest){
      }
    }
  • 效果如下:

  • image-20211129223037073.png

五、ApiOperation

  • @ApiOperation 用在 Controller 里的方法上,说明方法的作用。
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    less复制代码@RestController
    @Api(tags = "Swagger测试类")
    public class SwaggerTestController {
       @RequestMapping(value = "test-swagger",method = RequestMethod.GET)
       @ApiOperation(value = "查询学生详情",notes = "参数为id")
       public StudentResponse dyTest(){
           return new StudentResponse();
      }
    }
  • 效果如下:

  • image-20211129220031536.png

总结

  • 这几个注解,也是我们平时使用最多的注解。
  • 莫急,未完待续,明儿我们再讲解几个常用注解。
  • 本篇文章没有像之前的讲的那么细,我认为只要将方法教了,具体的学习方式每个人都是不同的。
  • 感兴趣的小伙伴们可以自己点进去看看都有哪些参数,可以自己每个都试试,玩玩!
  • 希望对你们有所帮助!谢谢!

路漫漫其修远兮,吾必将上下求索~

如果你认为i博主写的不错!写作不易,请点赞、关注、评论给博主一个鼓励吧~hahah

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 8中 lambda 表达式详解

发表于 2021-11-29

lambda 表达式,相信大家都不陌生,就算没有用过,那应该也听说过。我也是一样,在使用新特性 stream 流处理集合相关的代码时接触到这种语法,其他地方倒是不经常使用。所以也是仅仅知道一些皮毛,对于其中的原理什么的也不怎么清楚。

今天准备系统的学习一番,话不多说,接下来就开始我们的学习。

lambda 表达式介绍

lambda 表达式是 Java 8 的一个新特性,可以取代大部分的匿名内部类,简化了匿名委托的使用,让你让代码更加简洁,优雅。

比较官方的定义是这样的:

lambda 表达式是一个可传递的代码块(或者匿名函数),可以在以后执行一次或多次。

这个匿名函数没有名称,但它有参数列表、函数主体、返回类型,可能还有一个可以抛出的异常列表。lambda 表达式也可称为闭包。

在 Java 中传递一个代码段并不容易,你不能直接传递代码段。Java 是一种面向对象语言,所以必须构造一个对象,这个对象的类需要有一个方法包含所需的代码。接下来就看看 Java 是怎么来处理代码块的。

lambda 表达式的语法

Java 中有一个 Comparator 接口用来排序。这是 Java 8 以前的代码形式:

1
2
3
4
5
6
java复制代码public class LengthComparator implements Comparator<String> {
@Override
public int compare(String a, String b) {
return a.length() - b.length();
}
}
1
2
java复制代码String[] strArr = new String[]{"abcde", "qwer"};
Arrays.sort(strArr, new LengthComparator());

我们需要定义一个实现了 Comparator 接口的类,并实现里面的 compare() 方法,然后把这个类当做参数传给 sort 方法。

而我们使用 lambda 表达式就可以这样来写:

1
java复制代码Arrays.sort(strArr, (String a, String b) -> a.length() - b.length());

其中的 (String a, String b) -> a.length() - b.length() 就是一个 lambda 表达式。

lambda 表达式就是一个代码块,以及必须传入代码的变量规范

lambda 表达式的一些例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// 1. 不需要参数,返回值为 5  
() -> 5

// 2. 接收一个参数(数字类型),返回其2倍的值
x -> 2 * x

// 3. 接受2个参数(数字),并返回他们的差值
(x, y) -> x – y

// 4. 接收2个int型整数,返回他们的和
(int x, int y) -> x + y

// 5. 接受一个 string 对象,并在控制台打印,不返回任何值(看起来像是返回void)
(String s) -> System.out.print(s)

再看一个例子加深理解:

1
2
3
4
5
6
7
8
9
10
11
java复制代码// 用匿名内部类的方式来创建线程 
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("hello world");
}
});


// 使用Lambda来创建线程
new Thread(() -> System.out.println("hello world"));

注意:

如果一个 lambda 表达式只在某些分支返回一个值,而另外一些分支不返回值,这是不合法的。

例如,(int x) -> { if (x>= 0) return 1; } 就不合法。

函数式接口

Java 中有很多封装代码块的接口,比如上面的 Comparator 或 ActionListener,lambda 表达式与这些接口是兼容的。

但并不是所有的接口都可以使用 lambda 表达式来实现。lambda 规定接口中只能有一个需要被实现的方法(只包含一个抽象方法),不是规定接口中只能有一个方法。 这种接口就称为函数式接口。

Java 8 中有另一个新特性:default, 被 default 修饰的方法会有默认实现,不是必须被实现的方法,所以不影响 Lambda 表达式的使用。

上面的 Comparator 和 ActionListener,包括 Runnable 就是只有一个需要被实现的方法的接口。即函数式接口。

1
2
3
4
5
6
7
java复制代码@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used...
*/
public abstract void run();
}

我们来观察下 Runnable 接口,接口上面有一个注解 @FunctionalInterface。

通过观察 @FunctionalInterface 这个注解的源码,可以知道这个注解有以下特点:

  1. 该注解只能标记在有且仅有一个抽象方法的接口上。
  2. JDK8 接口中的静态方法和默认方法,都不算是抽象方法。
  3. 接口默认继承 java.lang.Object,所以如果接口显示声明覆盖了 Object 中方法,那么也不算抽象方法。
  4. 该注解不是必须的,如果一个接口符合”函数式接口”定义,那么加不加该注解都没有影响。加上该注解能够更好地让编译器进行检查。如果编写的不是函数式接口,但是加上了@FunctionInterface,那么编译器会报错。

我们再来看一下 Comparator 接口的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@FunctionalInterface
public interface Comparator<T> {

int compare(T o1, T o2);

boolean equals(Object obj);

default Comparator<T> reversed() {
return Collections.reverseOrder(this);
}

default Comparator<T> thenComparing(Comparator<? super T> other) {
Objects.requireNonNull(other);
return (Comparator<T> & Serializable) (c1, c2) -> {
int res = compare(c1, c2);
return (res != 0) ? res : other.compare(c1, c2);
};
}

public static <T> Comparator<T> nullsFirst(Comparator<? super T> comparator) {
return new Comparators.NullComparator<>(true, comparator);
}
}

这里只贴出来了部分代码,可以看到排除掉接口的中的静态方法、默认方法和覆盖的 Object 中的方法之后,就剩下一个抽象方法 int compare(T o1, T o2);, 符合 lambda 函数式接口的规范。

JDK 中提供一些其他的函数接口如下:

image.png

这里也不展开讲了,展开讲又是一篇新的文章了。

方法引用

Java awt 包中有一个 Timer 类,作用是经过一段时间就执行一次。
用 lambda 表达式来处理:

1
java复制代码Timer timer = new Timer(1000, event -> System.out.println("this time is " + new Date()));

这里面的 lambda 表达式可以这样表示:

1
java复制代码Timer timer = new Timer(1000, System.out::println);

表达式 System.out::println 就是一个方法引用(method reference),它指示编译器生成一个函数式接口的实例,覆盖这个接口的抽象方法来调用给定的方法。

方法引用需要用 ::运算符分隔方法名与对象或类名。主要有3种情况:

1
2
3
java复制代码1. object::instanceMethod
2. Class::instanceMethod
3. Class::staticMethod

具体解释这里不再叙述,有兴趣的可以看看《Java 核心技术卷1》。

注意:

只有当 lambda 表达式的体只调用一个方法而不做其他操作时,才能把 lambda 表达式重写为方法引用

构造器引用

构造器引用与方法引用很类似,只不过方法名 new。例如,Person::new 是 Person 构造器的一个引用。

假如有一个字符串列表。可以把它转换为一个 Person 对象数组,为此要在各个字符串上调用构造器:

1
2
3
java复制代码ArrayList<String> names = ... ;
Stream<Persion> stream = names.stream().map(Person::new);
List<Person> people = stream.collect(Collectors.toList());

其中,map 方法会为各个列表元素调用 Person(String) 构造器。

这里的 stream 和 map 会在下一篇博客中学习,这篇暂不讨论。

变量作用域

看下面这个例子:

1
2
3
4
5
6
7
8
9
10
java复制代码public static void repeatMessage(String text, int delay){
ActionListener listener = event ->
{
System.out.printLn(text);
};
new Timer(delay, listener).start();
}

// 调用
repeatMessage("Hello", 1000);

可以看到, lambda 表达式可以捕获外围作用域中变量的值。在 Java 中,要确保所捕获的值是明确定义的,这里有一个重要的限制。在 lambda 表达式中,只能引用值不会改变的变量。这是为了保证并发执行过程的安全。

lambda 表达式中捕获的变量必须实际上是事实最终变量。就是这个变量初始化之后就不会再为它赋新值。

lambda 表达式与匿名类的区别

使用匿名类与 Lambda 表达式的一大区别在于关键词的使用。对于匿名类,关键词 this 解读为匿名类,而对于 Lambda 表达式,关键词 this 解读为写就 Lambda 的外部类。也就是说,Lambda 表达式主体内使用的 this 关键字和其所在的类实例相同。

Lambda 表达式与匿名类的另一不同在于两者的编译方法。Java 编译器编译 Lambda 表达式并将他们转化为类里面的私有函数。

  • 匿名内部类可以为任意接口创建实例——不管接口包含多少个抽象方法,只要匿名内部类实现所有的抽象方法即可;但 Lambda 表达式只能为函数式接口创建实例。
  • 匿名内部类可以为抽象类甚至普通类创建实例;但 Lambda 表达式只能为函数式接口创建实例。
  • 匿名内部类实现的抽象方法的方法体允许调用接口中定义的默认方法;但 Lambda 表达式的代码块不允许调用接口中定义的默认方法。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

记录一次部署微信小程序的过程并上线微信小程序

发表于 2021-11-29

「这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战」

前言

  上一章已经简单介绍了基于Spring Boot创建一个微信小程序的过程。今天将简单介绍将项目部署到服务器中,并将前端代码提交到微信待审核。下面直接开始正题。
上一章的介绍#【Spring Boot 快速入门】二十一、基于Spring Boot 开发一个微信小程序

启动项目

  后端基于SpringBoot的微信小程序服务的所有接口已经开发完成,那么现在开始将服务部署到云服务器中,登录服务器。
图片.png
使用Maven将项目打包完成,

图片.png
将项目的文件打包上传到服务器中
图片.png

  通过命令 ls 可以看到已经将文件上传到服务器了。使用命令:java -jar BootWxTool-0.0.1-SNAPSHOT.jar 启动Spring Boot项目。启动过程如下图。

1
js复制代码java -jar BootWxTool-0.0.1-SNAPSHOT.jar

图片.png
  项目启动完成之后,无法访问服务,可能是安全组没有配置,到服务器一看,没有配置安全组,将项目中使用到的端口在服务器安全组中进行配置。

图片.png

配置Nginx

将安全组配置之后,还需要配置Nginx进行转发,本次使用的端口不是80端口。找到Nginx配置文件所在位置:

1
js复制代码/www/server/nginx/conf

通过命令vim nginx.conf查看配置文件的路径为:因此需要到这个路径下对服务进行配置。

1
js复制代码include /www/server/panel/vhost/nginx/*.conf;

图片.png

进入到配置文件所在的文件夹

1
js复制代码cd /www/server/panel/vhost/nginx/

新建一个文件

1
js复制代码mkdir wxtool.conf

  输入如下信息,listen后面是需要监听的端口信息,server_name是需要配置的域名信息。root后面跟着的是项目所在的路径信息。需要注意的是要加入代理信息。否则可能无法访问到项目路径。proxy_***之后的信息就是需要配置的代理信息了。ssl_certificate和ssl_certificate_key是申请的SSL文件的路径信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
js复制代码server
{
listen 80;
listen 443 ssl http2;
server_name weixin.******.com;
index index.php index.html index.htm default.php default.htm default.html;
root /java/springboot;
if ($server_port !~ 443){
rewrite ^(/.*)$ https://$host$1 permanent;
}

location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host:80;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
#HTTP_TO_HTTPS_END
ssl_certificate /etc/letsencrypt/live/weixin/6199589_weixin.ruankaoti.com.pem;
ssl_certificate_key /etc/letsencrypt/live/weixin/6199589_weixin.ruankaoti.com.key;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE;
ssl_prefer_server_ciphers on;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
error_page 497 https://$host$request_uri;

#SSL-END


error_page 404 /404.html;
error_page 502 /502.html;
#ERROR-PAGE-END


#禁止访问的文件或目录
location ~ ^/(\.user.ini|\.htaccess|\.git|\.svn|\.project|LICENSE|README.md)
{
return 404;
}

location ~ .*\.(gif|jpg|jpeg|png|bmp|swf)$
{
expires 30d;
error_log off;
access_log off;
}

location ~ .*\.(js|css)?$
{
expires 12h;
error_log off;
access_log off;
}
access_log /www/wwwlogs/weixin.*****.com.log;
error_log /www/wwwlogs/weixin.*****.com.error.log;
}

当配置文件和申请的SSL证书都配置完成之后,在宝塔页面Nginx管理台中点击重载配置即可。

图片.png

测试项目

在浏览器中输入我们配置的域名信息https://weixin.****.com 可以看到已经可以正常访问网页了。

图片.png

打开微信开发者工具,将域名信息配置到文件中,进行测试可以正常访问,截图如下:

图片.png
  然后将前端的代码上传到测试版本,然后递交审核之后就可以正常发布了。整个审核流程预计一天左右。审核完成之后,需要管理人员登录到微信公众平台得后台进行上线,上线以后,大家就可以在小程序中访问项目了。给大家上一张已经发布上线的项目截图。

图片.png

结语

  好了,以上就是记录一次部署微信小程序的过程并上线微信小程序,感谢您的阅读,希望您喜欢,如对您有帮助,欢迎点赞收藏。如有不足之处,欢迎评论指正。下次见。

  作者介绍:【小阿杰】一个爱鼓捣的程序猿,JAVA开发者和爱好者。公众号【Java全栈架构师】维护者,欢迎关注阅读交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

日志系统的更新语句执行流程~

发表于 2021-11-29

「这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战」

MySQL可以恢复某时间段内任意一秒的状态。

  • 例如游戏回档。此功能主要得益于mysql的日志系统。

当执行一条更新语句:

1
2
sql复制代码    update T set c=c+1 where ID=1 
-- ID为索引

在一个表上有更新的时候,跟这个表有关的查询缓存会失效,所以这条语句就会把表T上所有缓存结果都清空。这也就是一般不建议使用查询缓存的原因。

接下来,分析器会通过词法和语法解析知道这是一条更新语句。优化器决定要使用ID这个索引。然后,执行器负责具体执行,找到这一行,然后更新。

与查询流程不一样的是,更新流程还涉及两个重要的日志模块:redo log(重做日志)和 binlog(归档日志)。

redo log

redo log时InnoDB 特有的日志系统。

《孔乙己》一文中,酒店掌柜有一个粉板,专门用来记录客人的赊账记录。如果赊账的人不多,那么他可以把顾客名和账目写在板上。但如果赊账的人多了,粉板总会有记不下的时候,这个时候掌柜一定还有一个专门记录赊账的账本。

如果有人要赊账或者还账的话,掌柜一般有两种做法:

  • 一种做法是直接把账本翻出来,把这次赊的账加上去或者扣除掉;
  • 另一种做法是先在粉板上记下这次的账,等打烊以后再把账本翻出来核算。

在生意红火柜台很忙时,掌柜一定会选择后者,因为前者操作实在是太麻烦了。首先,得找到这个人的赊账总额那条记录。密密麻麻几十页,掌柜要找到那个名字,找到之后再拿出算盘计算,最后再将结果写回到账本上。

相比之下,还是先在粉板上记一下方便。

同样,在MySQL里也有这个问题,如果每一次的更新操作都需要写进磁盘,然后磁盘也要找到对应的那条记录,然后再更新,整个过程IO成本、查找成本都很高。为了解决这个问题,MySQL的设计者就用了类似酒店掌柜粉板的思路来提升更新效率。

而粉板和账本配合的整个过程,其实就是MySQL里经常说到的WAL技术,WAL的全称是Write-Ahead Logging,它的关键点就是先写日志,再写磁盘,也就是先写粉板,等不忙的时候再写账本。

具体来说,当有一条记录需要更新的时候,InnoDB引擎就会先把记录写到redo log(粉板)里面,并更新内存,这个时候更新就算完成了。同时,InnoDB引擎会在适当的时候,将这个操作记录更新到磁盘里面,而这个更新往往是在系统比较空闲的时候做,这就像打烊以后掌柜做的事。

如果今天赊账的不多,掌柜可以等打烊后再整理。但如果某天赊账的特别多,粉板写满了,又怎么办呢?这个时候掌柜只好放下手中的活儿,把粉板中的一部分赊账记录更新到账本中,然后把这些记录从粉板上擦掉,为记新账腾出空间。

与此类似,InnoDB的redo log是固定大小的,比如可以配置为一组4个文件,每个文件的大小是1GB,那么这块“粉板”总共就可以记录4GB的操作。从头开始写,写到末尾就又回到开头循环写,如下面这个图所示。

image.png

write pos是当前记录的位置,一边写一边后移,写到第3号文件末尾后就回到0号文件开头。checkpoint是当前要擦除的位置,也是往后推移并且循环的,擦除记录前要把记录更新到数据文件。

write pos和checkpoint之间的是“粉板”上还空着的部分,可以用来记录新的操作。如果write pos追上checkpoint,表示“粉板”满了,这时候不能再执行新的更新,得停下来先擦掉一些记录,把checkpoint推进一下。

有了redo log,InnoDB就可以保证即使数据库发生异常重启,之前提交的记录都不会丢失,这个能力称为crash-safe。

要理解crash-safe这个概念,可以想想前面赊账记录的例子。只要赊账记录记在了粉板上或写在了账本上,之后即使掌柜忘记了,比如突然停业几天,恢复生意后依然可以通过账本和粉板上的数据明确赊账账目。

binlog

redo log是InnoDB引擎特有的日志,而Server层也有自己的日志,称为binlog(归档日志)。

为什么会有两份日志?

因为最开始MySQL里并没有InnoDB引擎。MySQL自带的引擎是MyISAM,但是MyISAM没有crash-safe的能力,binlog日志只能用于归档。而InnoDB是另一个公司以插件形式引入MySQL的,既然只依靠binlog是没有crash-safe能力的,所以InnoDB使用另外一套日志系统——也就是redo log来实现crash-safe能力。

这两种日志有以下三点不同。

  1. redo log是InnoDB引擎特有的;binlog是MySQL的Server层实现的,所有引擎都可以使用。
  2. redo log是物理日志,记录的是“修改之后的值”;binlog是逻辑日志,记录的是这个语句的原始逻辑,比如“给ID=2这一行的c字段加1 ”。
  3. redo log是循环写的,空间固定会用完;binlog是可以追加写入的。“追加写”是指binlog文件写到一定大小后会切换到下一个,并不会覆盖以前的日志。

update语句执行流程

  1. 执行器先找引擎取ID=2这一行。如果ID=2这一行所在的数据页本来就在内存中,就直接返回给执行器;否则,需要先从磁盘读入内存,然后再返回。
  2. 执行器拿到引擎给的行数据,把这个值加上1,比如原来是N,现在就是N+1,得到新的一行数据,再调用引擎接口写入这行新数据。
  3. 引擎将这行新数据更新到内存中,同时将这个更新操作记录到redo log里面,此时redo log处于prepare状态。然后告知执行器执行完成了,随时可以提交事务。
  4. 执行器生成这个操作的binlog,并把binlog写入磁盘。
  5. 执行器调用引擎的提交事务接口,引擎把刚刚写入的redo log改成提交(commit)状态,更新完成。

image.png

redo log的写入拆成了两个步骤(倒数第三步、倒数第一步):prepare和commit,这就是”两阶段提交”。

两阶段提交

为什么必须有“两阶段提交”呢?这是为了让两份日志之间的逻辑一致。

怎样让数据库恢复到半个月内任意一秒的状态?

binlog会记录所有的逻辑操作,并且是采用“追加写”的形式。备份系统中一般会保存最近半个月(也可能更久)的所有binlog,同时系统会定期做整库备份。这里的“定期”取决于系统的重要性,可以是一天一备,也可以是一周一备。

当需要恢复到指定的某一秒时,比如某天下午两点发现中午十二点有一次误删表,需要找回数据,那可以这么做:

  • 首先,找到最近的一次全量备份,从这个备份恢复到临时库;
  • 然后,从备份的时间点开始,将备份的binlog依次取出来,重放到中午误删表之前的那个时刻。

这样你的临时库就跟误删之前的线上库一样了,然后你可以把表数据从临时库取出来,按需要恢复到线上库去。

为什么日志需要“两阶段提交”?这里不妨用反证法来进行解释。

由于redo log和binlog是两个独立的逻辑,如果不用两阶段提交,要么就是先写完redo log再写binlog,或者采用反过来的顺序。看看这两种方式会有什么问题。

仍然用前面的update语句来做例子。假设当前ID=2的行,字段c的值是0,再假设执行update语句过程中在写完第一个日志后,第二个日志还没有写完期间发生了crash,会出现什么情况呢?

  1. 先写redo log后写binlog。 假设在redo log写完,binlog还没有写完的时候,MySQL进程异常重启。由于redo log写完之后,系统即使崩溃,仍然能够把数据恢复回来,所以恢复后这一行c的值是1。但是由于binlog没写完就crash了,这时候binlog里面就没有记录这个语句。因此,之后备份日志的时候,存起来的binlog里面就没有这条语句。然后会发现,如果需要用这个binlog来恢复临时库的话,由于这个语句的binlog丢失,这个临时库就会少了这一次更新,恢复出来的这一行c的值就是0,与原库的值不同。
  2. 先写binlog后写redo log。 如果在binlog写完之后crash,由于redo log还没写,崩溃恢复以后这个事务无效,所以这一行c的值是0。但是binlog里面已经记录了“把c从0改成1”这个日志。所以,在之后用binlog来恢复的时候就多了一个事务出来,恢复出来的这一行c的值就是1,与原库的值不同。

可以看到,如果不使用“两阶段提交”,那么数据库的状态就有可能和用它的日志恢复出来的库的状态不一致。

简单说,redo log和binlog都可以用于表示事务的提交状态,而两阶段提交就是让这两个状态保持逻辑上的一致。

本文至此结束,希望对你有所帮助!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【面试必刷】Mysql编程题:第六期

发表于 2021-11-29

「这是我参与11月更文挑战的第29天,活动详情查看:2021最后一次更文挑战」。

Test 1

有一个薪水表 salaries 简况如下:

image.png
请你获取薪水第二多的员工的 emp_no 以及其对应的薪水 salary。
image.png

考察知识点

INNER JOIN,ORDER BY,FROM 中嵌套查询结果。

解题思路

:one: 先获取第二高薪水

1
2
3
4
sql复制代码SELECT DISTINCT(s2.salary)
FROM salaries AS s2
ORDER BY s2.salary DESC
LIMIT 1, 1

:two: 再查找薪资与第二高薪水相等的员工相关信息。

1
2
3
4
5
6
sql复制代码SELECT emp_no, s1.salary
FROM salaries AS s1
WHERE s1.salary = (SELECT DISTINCT(s2.salary)
FROM salaries AS s2
ORDER BY s2.salary DESC
LIMIT 1, 1)

Test 2

有一个员工表 employees 简况如下:

image.png
有一个薪水表 salaries 简况如下:

image.png
请你查找薪水排名第二多的员工编号 emp_no、薪水 salary 、last_name 以及 first_name ,不能使用 order by 完成,以上例子输出为:
(温馨提示: sqlite 通过的代码不一定能通过 mysql ,因为 SQL 语法规定,使用聚合函数时,select 子句中一般只能存在以下三种元素:常数、聚合函数,group by 指定的列名。如果使用非 group by 的列名,sqlite 的结果和 mysql 可能不一样)
image.png

考察知识点

INNER JOIN,MAX,子查询。

解题思路

先利用 MAX() 函数找出 salaries 中当前薪水最高者,再找出小于最高薪水的 MAX(salary) ,即为第二高薪水。然后在内连接而成的表中根据题中条件筛选即可。

1
2
3
4
5
6
7
sql复制代码SELECT e.emp_no, salary, last_name, first_name
FROM employees AS e INNER JOIN salaries AS s
ON e.emp_no = s.emp_no
AND s.salary = (SELECT max(salary)
FROM salaries
WHERE salary < (SELECT MAX(salary)
FROM salaries))

题目来源:牛客网-SQL数据库实战题

每日打卡,❤ 点个赞再走吧!!!❤

在这里插入图片描述
后续会继续分享 Mysql 方面的文章,如果感兴趣的话可以点个关注不迷路哦~。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

性能优化反思:减少DB查询,合理使用成员变量。 前言 举个栗

发表于 2021-11-29

前言

之前整理了一篇性能优化反思的文章:不要在for循环中操作DB,反响还不错,今天继续以性能优化为主题,分享最近的总结思考:减少DB查询次数,合理使用成员变量

高内聚,低耦合是非常深入人心的设计思想,在做到高内聚低耦合的同时,我们也要考虑到值传递的问题:要避免在抽取函数,封装代码时不合理的值传递,避免在多个函数内部重复查询相同的DB。

796e4519341baf05f5edf63bec328d2a.jpeg

举个栗子

需求描述

  1. 我们的项目是交友类APP,有划卡片喜欢、不喜欢、超级喜欢的动作,也有赠送礼物、邀请约会等动作
  2. 上述动作有各种判断,比如每天有喜欢的上限,又区分是不是会员;赠送礼物时又要判断是否开通了会员,扣费的话要不要使用会员价;邀请约会的时候要判断是不是好友等等;各种看起来平平无奇的场景融合在一起就让代码结构变得异常复杂了。程序设计变得异常重要。
  3. 想不用着急往下看,可以先想一想如果让你来做程序设计,你会怎么实现呢?

…

动脑筋想一想吧,才能更好的体会下面的设计思路,或者在评论区输入您的解决方案

…

需求分析

  1. 基于上述需求,我们定义了几个概念:动作、消费、记录;
  2. 把动作进行拆分:基础动作、付费动作、全局动作;把消费进行拆分:充值、消费、领券等;
  3. 根据拆分的概念实现逻辑代码,比如基础动作抽象为一个类,付费动作和全局动作继承这个类:付费的动作单独抽象为一个类,把付费相关的动作放到这里;全局动作指的是不管是不是会员,不换是不是消费,都能进行的动作,也抽象为一个单独的类。

(在项目开发第一版,我们是没做这种抽象的,在一个类里面实现了各种动作、消费、记录,随着项目的推进,变得非常臃肿混乱。)

上面讲的可能有些抽象,看下面的代码示例会清晰很多哦~

代码示例

为了行为紧凑,方便大家理解,我不直接粘贴我们的逻辑代码,而是把关键代码段拿出来分析。

整体结构

  1. 下述代码是上面提到的全局动作类,它继承了基础动作类,所有的动作在基础动作类中定义
  2. 规范了输入参数和输出参数这些成员变量
  3. 构造方法传入当前用户id和对方id,所有的动作肯定是有双方的
  4. 规范了setAction()设置动作、getActionResult()获得动作的执行结果,统一输入输出的格式和规范,不管在哪里调用,都遵守统一的规范
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
php复制代码class UserUniversalAction extends BaseUserAction
{
//输入参数
protected $_userid; //当前用户ID
protected $_actionId; //动作ID
protected $_extra = []; //动作携带的额外参数
protected $_otherUserid; //对方用户ID
protected $_data = []; //中间存储
protected $_houseOpen = [];
protected $_userConsume = null;
protected $_now;

//输出参数
protected $_pass = true; //财务性约束,有没有消费通过
protected $_rechargeType = null; //消费的类型
protected $_propCount = 0;
protected $_consumeSuccess = false; //消费成功-默认false
protected $_canInvite = false; //
protected $_errorCode = 0; //非财务行约束,动作的关系执行条件
protected $_needAfterAction = true;
//protected $_out = []; //动作执行完成后,输出结果

public $hasRight = false;

public function __construct($userid, $otherUserid = '')
{
parent::__construct($userid, 0);
$this->_otherUserid = $otherUserid;
//初始化参数
$this->_houseOpen = HouseOpen::getCurrentOpen();
//消费初始化
$this->_userConsume = new UserConsume($this->_userid);
$this->_now = time();
}

//设置执行动作
public function setAction($actionId, $extra = [])
{
//设置动作参数
$this->_actionId = $actionId;
$this->_extra = $extra;

//前置动作,执行权限校验等
$this->_beforeAction();
//只有非财务性约束和财务性约束通过后,才能继续执行
if ($this->_errorCode == 0 && $this->_pass) {
//执行动作
$this->_actionExecute();
if ($this->_errorCode == 0) {
//后置动作
$this->_afterAction();
}
}
}

//获取动作执行结果
public function getActionResult()
{
return [
//财务性限制
'pass' => $this->_pass,
'rechargeType' => $this->_rechargeType,
'propCount' => $this->_propCount,
'canInvite' => $this->_canInvite,
//业务性限制
'errorCode' => $this->_errorCode,
'out' => $this->_data
];
}

设置执行动作详解

细心的同学可能已经发现了:我们又对setAction()做了进一步的拆解,拆分为:

_beforeAction:前置动作,执行权限校验等,比如只有开通了会员才允许超级喜欢,只有成为了好友才允许邀请约会。

_actionExecute:执行动作,比如触发了超级喜欢,比如向对方发起了约会邀请。

_afterAction:后置动作,比如发起约会时给对方发通知栏消息告知对方;比如约会结束之后发推送消息告知对方的约会感受。

这样做的好处是异常清晰,几乎所有的动作都可以理解为三步:动作前、动作中、动作后。

另外一个比较硬核的地方是传入的第二个参数 $extra = []:

传入的第一个参数很好理解:$actionId 就是我们定义的动作id,我们根据动作id判断要执行哪些动作。

第二个参数$extra = [],extra是扩展参数、可变参数的概念。就和我开篇提到的减少DB查询,合理使用成员变量 呼应上了:

把需要在多处使用到的参数传入,而不是每次都通过查询DB的方式获得。我们以参数形式传入的数据可以赋值给成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
php复制代码    //设置执行动作
public function setAction($actionId, $extra = [])
{
//设置动作参数
$this->_actionId = $actionId;
$this->_extra = $extra;

//前置动作,执行权限校验等
$this->_beforeAction();
//只有非财务性约束和财务性约束通过后,才能继续执行
if ($this->_errorCode == 0 && $this->_pass) {
//执行动作
$this->_actionExecute();
if ($this->_errorCode == 0) {
//后置动作
$this->_afterAction();
}
}
}

硬核部分,注意看

b4f752eebd5491b6ccd0990d8c697134.jpeg

小悟空友情提示:文章有点长,硬核部分开始了,请同学们坚持一会,认真看。

下面的示例代码能让大家更好的理解如何合理的使用成员变量

老规矩先说需求:在约会结束时进行判断,如果线上语音约会时间小于1分钟则补偿给用户约会券(我们认为约会时间小于1分钟的就是体验不好的约会,不能让用户白花钱,要给予优惠券补偿)

如果是常规设计:我们需要至少查询3次DB,即:

  1. 触发结束约会时修改状态,进行一系列读写操作,返回给客户端最新的数据状态
  2. 在_afterAppointmentFinish中查询语音房是否是开放的状态(我们产品是有营业概念的,只有营业中可执行约会动作)
  3. 在_afterAppointmentFinish中根据约会id,查询双方约会时长等信息

通过成员变量传参的方式,只需要1次查询DB,即:

  1. 触发结束约会时修改状态,进行一系列操作,返回给客户端最新的数据状态的同时,通过$this->_data = $appointmentModel->toArray();赋值给成员变量; _afterAppointmentFinish()中通过$this->_data取值就可以了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
php复制代码   protected function _actionExecute()
{
//执行权限校验
switch ($this->_actionId) {
.
.
.
case self::TYPE_ACTION_END:
$this->_doAppointmentEndActionExecute();
break;
}
}

protected function _afterAction()
{
//动作后置操作
switch ($this->_actionId) {
.
.
.
case self::TYPE_ACTION_END:
$this->_afterAppointmentFinish();
break;
default:
}
}

protected function _doAppointmentEndActionExecute()
{
$appointmentModel = AppointmentInfo::query()->selectRaw('id,userid,"inviteeUserid",status,endtime,"callStartTimestamp","callDuration","isConsume","appointmentOpenId"')
->where('id', $this->_extra['appointmentId'])->first();
.
.
.
$appointmentModel->endtime = time();
$appointmentModel->status = AppointmentInfo::TYPE_STATUS_END;
$appointmentModel->save();
$this->_data = $appointmentModel->toArray();
.
.
.
}

protected function _afterAppointmentFinish()
{
$houseOpen = $this->_houseOpen; //减少1次DB查询
if ($houseOpen['status'] != HouseOpen::HOUSE_STATUS_OPEN) {
return false;
}
//减少2次DB查询
if (isset($this->_data)
&& $this->_data['isConsume'] == AppointmentInfo::TYPE_IS_CONSUME
&& $this->_data['appointmentOpenId'] == $houseOpen['currentAppointmentOpenId']
&& $this->_data['callDuration'] < 60) { //约会时长不足1分钟,花了多少补偿多少精酿券
.
.
.
}
}
}

上面只是一个简单的栗子,随着项目推进,应用场景增多,合理使用成员变量会体现出更高的价值。

回顾

大家再回顾一下我开篇提到的输入参数,这些都是成员变量,其中 _extra ,_data ,_houseOpen 都是易于扩展的数组类型,我们可以通过合理的使用成员变量,减少冗余的DB查询,提高程序的运行效率。

1
2
3
4
5
6
7
8
9
php复制代码    //输入参数
protected $_userid; //当前用户ID
protected $_actionId; //动作ID
protected $_extra = []; //动作携带的额外参数
protected $_otherUserid; //对方用户ID
protected $_data = []; //中间存储
protected $_houseOpen = [];
protected $_userConsume = null;
protected $_now;

总结

要知道每次DB查询都是有网络耗时的;我们把数据存到成员变量,从内存中读取数据的耗时是可以忽略不计的。

欢迎互动

时间过得可真快,转眼今天已经29号了,我也终于在今天完成了本月更文28天的挑战。

希望以后能更多的写出如今天一样的文章。这篇收尾的文章比较硬核,起码没水,也算给11月的更文挑战画上完美的句号了。

感谢大家的支持!!!欢迎三连一波~

1fda3718df4635061fe8bca8aec99870.jpeg

硬核文章推荐

PHP转Go 2021年年中总结

如何第一时间收到接口报错?不用测试妹子再质疑你是不是接口挂了。

Git使用实战:多人协同开发,紧急修复线上bug的Git操作指南。

性能优化反思:不要在for循环中操作DB

性能优化反思:不要在for循环中操作DB 进阶版

一起学习

公众号:程序员升级打怪之旅

微信号:wangzhongyang1993

福利🧧:点这里–>半价买掘金小册,额外领红包

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【实战】SpringBoot+Redis实现发布订阅

发表于 2021-11-29

「这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战」

发布订阅机制

发布订阅:消息发布者发布消息 和 消息订阅者接收消息,两者之间通过某种媒介联系起来

image.png

常见场景:1. 微博动态:在微博关注了小A,当小A发微博过后,小A的粉丝都会收到小A发的动态 2. APP站内消息,当有新消息的时候,都会类似消息框的模式通知用户

  1. 当一个客户端通过 PUBLISH 命令向订阅者发布消息的时候,称这个客户端为发布者publisher
  2. 当一个客户端通过subscribe 或者 PSUBSCRIBE 接收消息时,称这个客户端为 订阅者 subscriber

为了解耦发布者和订阅者之间的关系,Redis使用了频道channel(频道)作为两者之间的中介,发布者直接把消息发送给channel(频道),而channel负责把消息发送给订阅者,发布者和订阅者之间没有直接的联系,都不知道对方的存在

客户端发到频道的消息,将会被推送到所有订阅此频道的客户端

客户端不需要主动去获取消息,只需要订阅频道,这个频道的内容就会被推送过来

eg: 有订阅者1,2,3 订阅了频道channel,当有消息发布给频道时,这个消息就会被发送到三个订阅者客户端

image.png

实现步骤

  1. 引入依赖
1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  1. 添加配置文件
1
2
3
4
5
6
yaml复制代码spring:
redis:
host: 127.0.0.1
database: 5
password:
port: 6379
  1. 新建一个监听类,用于监听消息
1
2
3
4
5
6
7
8
9
java复制代码@Component
public class ReceiveListener implements MessageListener {

@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("我是监听者小A,我监听到的消息是 " + message.toString());
}

}
  1. 创建一个监听容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
scala复制代码@Configuration
public class ReceiveListenerConfig extends CachingConfigurerSupport {

/**
* 消息监听容器
*
* @param factory
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory factory){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
//订阅一个通道 该处的通道名是发布消息时的名称
container.addMessageListener(catAdapter(),new PatternTopic("topic"));
return container;
}

/**
* 消息监听适配器,绑定消息处理器
*
* @return
*/
@Bean
MessageListenerAdapter catAdapter(){
return new MessageListenerAdapter(new ReceiveListener());
}

}

一切就绪,新建一个测试类,测试发布订阅是否成

image.png

查看运行结果,成功监听到了发布者发布的消息

image.png

适用场景

redis 的发布订阅的消息是有缺点的无法实现持久化,所以存在丢失的风险,因此Redis的发布订阅适用于实时但是可靠性要求不高的场景

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…115116117…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%