开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

花一个周末,掌握 SpringCloud Ribbon 核心

发表于 2021-02-27

前言

继 SpringCloud Feign 之后的第二篇分布式框架文章,同样秉承单周末一个 SpringCloud 组件的大目标为原则

如果想看 Feign 的小伙伴,猛戳这里,Feign 核心原理与你不期而遇

在平常使用 SpringCloud 中,一般会使用 Feign,因为 Feign 内部集成了 Ribbon

但是 Ribbon 又是一个不可忽视的知识点,并且比 Feign 要难很多。列举文章大纲主题

  1. 如何获取注册中心服务实例
  2. 非健康服务实例如何下线
  3. Ribbon 底层原理实现
  4. 自定义 Ribbon 负载均衡策略

文章使用 SpringCloud Ribbon 源代码 Hoxton.SR9 版本:2.2.6.RELEASE

另外在文章结尾,说了一些看源码过程中的感想,以及 Ribbon 中笔者实现不合理的流程说明

概念小贴士

负载均衡

负载均衡是指通过负载均衡策略分配到多个执行单元上,常见的负载均衡方式有两种

  • 独立进程单元,通过负载均衡策略,将请求进行分发到不同执行上,类似于 Nginx
  • 客户端行为,将负载均衡的策略绑定到客户端上,客户端会维护一份服务提供者列表,通过客户端负载均衡策略分发到不同的服务提供者

Ribbon

Ribbon 是 Netflix 公司开源的一款负载均衡组件,负载均衡的行为在客户端发生,所以属于上述第二种

一般而言,SpringCloud 构建以及使用时,会使用 Ribbon 作为客户端负载均衡工具。但是不会独立使用,而是结合 RestTemplate 以及 Feign 使用,Feign 底层集成了 Ribbon,不用额外的配置,开箱即用

文章为了更贴切 Ribbon 主题,所以使用 RestTemplate 充当网络调用工具

RestTemplate 是 Spring Web 下提供访问第三方 RESTFul Http 接口的网络框架

环境准备

注册中心选用阿里 Nacos,创建两个服务,生产者集群启动,消费者使用 RestTemplate + Ribbon 调用,调用总体结构如下

生产者代码如下,将服务注册 Nacos,并对外暴露 Http Get 服务

消费者代码如下,将服务注册 Nacos,通过 RestTemplate + Ribbon 发起远程负载均衡调用

RestTemplate 默认是没有负载均衡的,所以需要添加 @LoadBalanced

启动三个生产者实例注册 Nacos,启动并且注册成功如下所示

想要按严格的先后顺序介绍框架原理,而不超前引用尚未介绍过的术语,这几乎是不可能的,笔者尽可能介绍明白

如何获取注册中心服务实例

先来看一下 Ribbon 是如何在客户端获取到注册中心运行实例的,这个点在之前是我比较疑惑的内容

服务注册相关的知识点,会放到 Nacos 源码解析说明

先来举个例子,当我们执行一个请求时,肯定要进行负载均衡对吧,这个时候代码跟到负载均衡获取服务列表源码的地方

解释一下上面标黄色框框的地方:

  • RibbonLoadBalancerClient:负责负载均衡的请求处理
  • ILoadBalancer:接口中定义了一系列实现负载均衡的方法,相当于一个路由的作用,Ribbon 中默认实现类 ZoneAwareLoadBalancer
  • unknown:ZoneAwareLoadBalancer 是多区域负载均衡器,这个 unkonwn 代表默认区域的意思
  • allServerList:代表了从 Nacos 注册中心获取的接口服务实例,upServerList 代表了健康实例

现在想要知道 Ribbon 是如何获取服务实例的就需要跟进 getLoadBalancer()

getLoadBalancer

首先声明一点,getLoadBalancer() 方法的语意是从 Ribbon 父子上下文容器中获取名称为 ribbon-produce,类型为 ILoadBalancer.class 的 Spring Bean

之前在讲 Feign 的时候说过,Ribbon 会为每一个服务提供者创建一个 Spring 父子上下文,这里会从子上下文中获取 Bean

看到这里并没有解决我们的疑惑,以为方法里会有拉取服务列表的代码,然鹅只是返回一个包含了服务实例的 Bean,所以我们只能去跟下这个 Bean 的上下文

我们需要从负载均衡客户端着手,因为默认是 ZoneAwareLoadBalancer,那我们需要跟进它何时被创建,初始化都做了什么事情

ZoneAwareLoadBalancer

ZoneAwareLoadBalancer 是一个根据区域(Zone)来进行负载均衡器,因为如果不同机房跨区域部署服务列表,跨区域的方式访问会产生更高的延迟,ZoneAwareLoadBalancer 就是为了解决此类问题,不过默认都是同一区域

ZoneAwareLoadBalancer 很重要,或者说它代表的 负载均衡路由角色 很重要。进行服务调用前,会使用该类根据负载均衡算法获取可用 Server 进行远程调用,所以我们要掌握创建这个负载均衡客户端时都做了哪些

ZoneAwareLoadBalancer 是在服务第一次被调用时通过子容器创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Bean @ConditionalOnMissingBean  // RibbonClientConfiguration 被加载,从 IOC 容器中获取对应实例填充到 ZoneAwareLoadBalancer
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
...
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
// 调用父类构造方法
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}

在 DynamicServerListLoadBalancer 中调用了父类 BaseLoadBalancer 初始化了一部分配置以及方法,另外自己也初始化了 Server 服务列表等元数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
// 调用父类 BaseLoadBalancer 初始化一些配置,包括 Ping(检查服务是否可用)Rule(负载均衡规则)
super(clientConfig, rule, ping);
// 较重要,获取注册中心服务的接口
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
// 初始化步骤分了两步走,第一步在上面,这一步就是其余的初始化
restOfInit(clientConfig);
}

先来说一下 BaseLoadBalancer 中初始化的方法,这里主要对一些重要参数以及 Ping、Rule 赋值,另外根据 IPing 实现类执行定时器,下面介绍 Ping 和 Rule 是什么

方法大致做了以下几件事情:

  1. 设置客户端配置对象、名称等关键参数
  2. 获取每次 Ping 的间隔以及 Ping 的最大时间
  3. 设置具体负载均衡规则 IRule,默认 ZoneAvoidanceRule,根据 server 和 zone 区域来轮询
  4. 设置具体 Ping 的方式,默认 DummyPing,直接返回 True
  5. 根据 Ping 的具体实现,执行定时任务 Ping Server

这里会介绍被填入的 IPing 和 IRule 是什么东东,并且都有哪些实现

IPing 服务探测

IPing 接口负责向 Server 实例发送 ping 请求,判断 Server 是否有响应,以此来判断 Server 是否可用

接口只有一个方法 isAlive,通过实现类完成探测 ping 功能

1
2
3
java复制代码public interface IPing {
public boolean isAlive(Server server);
}

IPing 实现类如下:

  • PingUrl:通过 ping 的方式,发起网络调用来判断 Server 是否可用(一般而言创建 PingUrl 需要指定路径,默认是 IP + Port)
  • PingConstant:固定返回某服务是否可用,默认返回 True,表示可用
  • NoOpPing:没有任何操作,直接返回 True,表示可用
  • DummyPing:默认的类,直接返回 True,实现了 initWithNiwsConfig 方法

IRule 负载均衡

IRule 接口负责根据不用的算法和逻辑处理负载均衡的策略,自带的策略有 7 种,默认 ZoneAvoidanceRule

  1. BestAvailableRule:选择服务列表中最小请求量的 Server
  2. RandomRule:服务列表中随机选择 Server
  3. RetryRule:根据轮询的方式重试 Server
  4. ZoneAvoidanceRule:根据 Server 的 Zone 区域和可用性轮询选择 Server
  5. …

上面说过,会有两个初始化步骤,刚才只说了一个,接下来说一下 这个其余初始化方法 restOfInit,虽然取名叫其余初始化,但是就重要性而言,那是相当重要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
// 初始化服务列表,并启用定时器,对服务列表作出更新
enableAndInitLearnNewServersFeature();
// 更新服务列表,enableAndInitLearnNewServersFeature 中定时器的执行的就是此方法
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

获取服务列表以及定时更新服务列表的代码都在此处,值得仔细看着源码。关注其中更新服务列表方法就阔以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
// 获取服务列表数据
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);

if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
// 更新所有服务列表
updateAllServerList(servers);
}

第一个问题兜兜转转,终于要找到如何获取的服务列表了,serverListImpl 实现自 ServerList,因为我们使用的 Nacos 注册中心,所以 ServerList 的具体实现就是 NacosServerList

1
2
3
4
java复制代码public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
public List<T> getUpdatedListOfServers();
}

ServerList 中只有两个接口方法,分别是 获取初始化服务列表集合、获取更新的服务列表集合,Nacos 实现中两个调用都是一个实现方法,可能设计如此

相当于 Ribbon 提供出接口 ServerList,注册中心开发者们谁想和 Ribbon 集成,那你就实现这个接口吧,到时候 Ribbon 负责调用 ServerList 实现类中的方法实现

Ribbon 和各服务注册中心之间,这种实现方式和 JDBC 与各数据库之间很像

兜兜转转中问题已经明朗,一起总结下注册中心获取服务实例这块内容

  1. 负载均衡客户端在初始化时向 Nacos 注册中心获取服务注册列表信息
  2. 根据不同的 IPing 实现,向获取到的服务列表 串行发送 ping,以此来判断服务的可用性。没错,就是串行,如果你的实例很多,可以 考虑重写 ping 这一块的逻辑
  3. 如果服务的可用性 发生了改变或者被人为下线,那么重新拉取或更新服务列表
  4. 当负载均衡客户端有了这些服务注册类列表,自然就可以进行 IRule 负载均衡策略

非健康服务实例如何下线

首先笔者做了两个 “大胆” 的实验,第一次是对生产者 SpringBoot 项目执行关闭流程,这时候 Nacos 注册中心是 实时感知到并将此服务下该实例删除

证明 Nacos 客户端是有 类似于钩子函数的存在,感知项目停止就会向 Nacos 服务端注销实例。但是这个时候要考虑一件事情,那就是在 暴力 Kill 或者执行关闭操作 的情况下,存在于 Ribbon 客户端服务列表缓存能不能感知

第二次我这边测试流程是这样的,可以极大程度还原生产上使用 Ribbon 可能会遇到的问题

  1. 改变客户端负载均衡策略为 随机负载 RandomRule,大家自己可以进行测试,不固定负载规则
  2. 注册三个生产者服务实例到 Nacos 上,检查 确保服务组下实例正常注册
  3. 操作重点来了,先通过消费方实例请求下对应的生产者接口,保证 Ribbon 将对应 Server 缓存到客户端
  4. 停掉一个生产者服务,此时 马上使用 Jmeter 调用,Jmeter 线程组发起请求 100 次(一定要赶到更新 Server 缓存之前发起 Jmeter 请求)
  5. 这时就会看到会发生随机失败,也就是说停掉一个服务后,最坏结果会有 30 秒的生产服务不可用,这个时间可配置,后面会讲到为什么 30 秒

服务列表定时维护

针对于服务列表的维护,在 Ribbon 中有两种方式,都是通过定时任务的形式维护客户端列表缓存

  1. 使用 IPing 的实现类 PingUrl,每隔 10 秒会去 Ping 服务地址,如果返回状态不是 200,那么默认该实例下线
  2. Ribbon 客户端内置的扫描,默认每隔 30 秒去拉取 Nacos 也就是注册中心的服务实例,如果已下线实例会在客户端缓存中剔除

这一块源码都不贴了,放两个源代码位置,感兴趣自己看看就行了

1
2
diff复制代码+ DynamicServerListLoadBalancer#enableAndInitLearnNewServersFeature
+ BaseLoadBalancer#setupPingTask

如果你面试的时候,面试官问了本小节相关内容,把这两个点都能答出来,基本上 SpringCloud 源码就差不多了

Ribbon 底层原理实现

底层原理实现这一块内容,会先说明使用 Ribbon 负载均衡调用远端请求的全过程,然后着重看一下 RandomRule 负载策略底层是如何实现

  1. 创建 ILoadBalancer 负载均衡客户端,初始化 Ribbon 中所需的 定时器和注册中心上服务实例列表
  2. 从 ILoadBalancer 中,通过 负载均衡选择出健康服务列表中的一个 Server
  3. 将服务名(ribbon-produce)替换为 Server 中的 IP + Port,然后生成 HTTP 请求进行调用并返回数据

上面已经说过,ILoadBalancer 是负责负载均衡路由的,内部会使用 IRule 实现类进行负载调用

1
2
3
4
java复制代码public interface ILoadBalancer {
public Server chooseServer(Object key);
...
}

chooseServer 流程中调用的就是 IRule 负载策略中的 choose 方法,在方法内部获取一个健康 Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public Server choose(ILoadBalancer lb, Object key) {
...
Server server = null;
while (server == null) {
...
List<Server> upList = lb.getReachableServers(); // 获取服务列表健康实例
List<Server> allList = lb.getAllServers(); // 获取服务列表全部实例
int serverCount = allList.size(); // 全部实例数量
if (serverCount == 0) { // 全部实例数量为空,返回 null,相当于错误返回
return null;
}
int index = chooseRandomInt(serverCount); // 考虑到效率问题,使用多线程 ThreadLocalRandom 获取随机数
server = upList.get(index); // 获取健康实例
if (server == null) {
// 作者认为出现获取 server 为空,证明服务列表正在调整,但是!这只是暂时的,所以当前释放出了 CPU
Thread.yield();
continue;
}
if (server.isAlive()) { // 服务为健康,返回
return (server);
}
...
}
return server;
}

简单说一下随机策略 choose 中流程

  1. 获取到全部服务、健康服务列表,判断全部实例数量是否等于 0,是则返回 null,相当于发生了错误
  2. 从全部服务列表里获取下标索引,然后去 健康实例列表获取 Server
  3. 如果获取到的 Server 为空会放弃 CPU,然后再来一遍上面的流程,相当于一种重试机制
  4. 如果获取到的 Server 不健康,设置 Server 等于空,再歇一会,继续走一遍上面的流程

比较简单,有小伙伴可能就问了,如果健康实例小于全部实例怎么办?这种情况下存在两种可能

  1. 运气比较好,从全部实例数量中随机了比较小的数,刚好健康实例列表有这个数,那么返回 Server
  2. 运气比较背,从全部实例数量中随机了某个数,健康实例列表数量为空或者小于这个数,直接会下标越界异常

留下一个思考题:

为什么不直接从健康实例中选择实例呢

如果直接从健康实例列表选择,就能规避下标越界异常,为什么作者要先从全部实例中获取 Server 下标?

自定义 Ribbon 负载均衡策略

这种自定义策略,在框架中都支持的比较友好,根据上面提的问题,我们自定义一款策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@Slf4j
public class MyRule extends AbstractLoadBalancerRule {
@Override
public Server choose(Object key) {
ILoadBalancer loadBalancer = getLoadBalancer();
while (true && ) {
Server server = null;
// 获取已启动并且可访问的服务列表
List<Server> reachableServers = loadBalancer.getReachableServers();
if (CollectionUtils.isEmpty(reachableServers)) return null;
int idx = ThreadLocalRandom.current().nextInt(reachableServers.size());
server = reachableServers.get(idx);
if (server == null || server.isAlive()) {
log.warn("Ribbon 服务实例异常, 获取为空 || 状态不健康");
Thread.yield();
continue;
}
return server;
}
}

... initWithNiwsConfig 不用实现
}

说一下我们自己实现的 MyRule 负载的逻辑:

  1. IRule 获取服务列表没有在调用方实现,而是抽象 AbstractLoadBalancerRule,所以我们要获取服务列表继承就好了
  2. 和随机负载规则大致相似,只不过这里简化了流程,直接从健康的服务实例列表获取 Server 实例
  3. 确定 Server 不为空并且节点健康后返回,如果不符合则打印日志,睡一会再重复
  4. 如果保险起见,最好在 while 中加入一个循环次数的条件,避免死循环

然后把 MyRule 注册到 SPring IOC 容器中就好了,在初始化时就会代替默认的 Rule 负载规则

关于 IPing 的思考

在阅读 Ribbon Ping 这一块源代码时,发现了两处个人认为不太合理的地方

  1. setPingInterval 设置 Ping 间隔时执行设置 Ping 任务无意义
  2. BaseLoadBalancer 构造函数中 ping 为 null,又再次调用了 setPingInterval,结果只会返回空

setPingInterval 和 setPing 两个方法发生在 BaseLoadBalancer 初始化时,相当于接着上面逻辑继续。先说明执行逻辑,再看下不合理的地方

setupPingTask 用于定期执行对 Server 的 ping 任务,也就是检测 Server 是否可用

个人觉得在 setPingInterval 中没有必要执行 setupPingTask 方法

作出上述结论有以下依据:

  1. 第一次执行 setPingInterval 时,ping 必然为空,那么会在 canSkipPing 中返回 True,继而直接结束 setPingInterval 方法
  2. 后来想了下,会不会在别的地方引用,需要强制刷新,然鹅全局搜索了下引用,只有在此次初始化时调用,当然不排除别的依赖包会使用此方法
  3. 综上所述,setPingInterval 执行设置 Ping 任务的方法无意义

另外还有一点,作者感觉代码中调用的方法没有实际意义。和上述类似,都是在 ping 为空时执行了 setPingInterval 方法

以上这两点是笔者跟源码时,发现不妥当的地方,所以在这里占了一些篇幅说明,主要是想表达两点自己的想法给读者朋友

  1. 不要对源码有敬畏之心,应该有敬畏之心的是生产环境! 不要感觉看框架源码是一件高不可攀的事情,其实有时候你理解不了的代码,可能只是多人维护后,混乱的产物,条件允许的情况下,还是要多跟进源码去看一看
  2. 直言说出自己的见解,如果只有自己去想,那么很可能没有答案,通过文章间接的方式让更多小伙伴看到,指正错误言论亦或者得到肯定

结言

整体来看,文章更 注重表达设计思想以及源码分析,所以阅读文章需要一定的源码功底。同时文章是针对问题而展开叙述,哪怕源码不理解也能有所收获

Ribbon 这块内容从初始化负载均衡客户端 ILoadBalancer 说起,讲述了初始化过程中具体的内容,包括如何开启 IPing 定时器以及服务列表更新定时器

另外通过源码查看到 Ribbon 的服务列表其实是向 Nacos 提供的接口发起服务调用 获取并保存到本地缓存,继而牵引出如何保证不健康实例下线:IPing 定时器和服务更新定时器

文末章节说了下请求 Ribbon 负载均衡的全链路以及如何自己定义一个负载均衡算法。最最后面也说了下自己看源码过程中对 SpringCloud IPing 感觉无意义的代码,当然,不排除是为了别的包集成而留下的

微信搜索【源码兴趣圈】,关注公众号后回复 123 领取内容涵盖 GO、Netty、Seata、SpringCloud Alibaba、开发规范、面试宝典、数据结构等学习资料!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

rabbitMq消息投递不丢失,保证幂等性 rabbitMq

发表于 2021-02-26

rabbitMq消息投递不丢失,保证幂等性

MQ消息投递,MQ服务器宕机导致丢失,rabbitMq通过durable参数持久化,也会概率上产生丢失。

图片

RabbitMQ 如何保证消息不丢失?

1.丢数据场景

  • 生产端丢数据场景,例如生产者将数据推送RabbitMq时,因网络原因导致数据丢失
  • rabbitmq丢数据,例如没有开启持久化,rabbitmq重启导致丢数据。或者开启持久化,在持久化到磁盘过程中挂了。
  • 消费端丢数据场景,例如消费端消费过程中挂了,rabbitmq认为消费了并删除,导致丢数据。

2.解决方案

2.1.消息持久化

将queue、exchange、message都持久化,但不能保证100%不丢失数据,消息持久化解决因为服务器异常奔溃导致的消息丢失。

queue持久化

1
2
3
4
5
ini复制代码//durable=true,实现queue的持久化

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name", true, false, false, null);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
typescript复制代码//Channel类中queueDeclare的完整定义如下:
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue(队列名称)the name of the queue
* @param durable(持久化)true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive(排他队列) true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete(自动删除) true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。
这里需要注意三点:
1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

exchange持久化

如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。

1
2
bash复制代码//durable=true,持久化
channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typescript复制代码//exchangeDeclare的完整定义如下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

message持久化

queue队列持久化为true,但message没有持久化,重启后message还是会丢失。
需要queue和message都设置持久化,broker服务重启后,队列存在,消息也存在。

1
2
less复制代码//MessageProperties.PERSISTENT_TEXT_PLAIN 为消息持久化
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typescript复制代码//basicPublish完整定义如下:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;

exchange表示exchange的名称
routingKey表示routingKey的名称
body代表发送的消息体

//BasicProperties定义如下,deliveryMode=1代表不持久化,deliveryMode=2代表持久化
public BasicProperties(
String contentType,//消息类型如:text/plain
String contentEncoding,//编码
Map<String,Object> headers,
Integer deliveryMode,//1:nonpersistent 2:persistent
Integer priority,//优先级
String correlationId,
String replyTo,//反馈队列
String expiration,//expiration到期时间
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId)

//MessageProperties.PERSISTENT_TEXT_PLAIN定义如下:其中deliveryMode=2表示持久化
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2,0, null, null, null,null, null, null, null,null, null);

消息推送到rabbitmq后,先保存到cache中,然后异步刷入到磁盘中。

消息什么时候刷到磁盘?

写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。
每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

2.2.事务+confirm

在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端。
RabbitMQ的可靠性涉及producer端的确认机制、broker端的镜像队列的配置以及consumer端的确认机制,要想确保消息的可靠性越高,那么性能也会随之而降,鱼和熊掌不可兼得,关键在于选择和取舍。

消息持久化解决因为服务器异常奔溃导致的消息丢失,但不能解决发布者将消息发送之后,消息有没有正确到达broker代理服务器。如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

这时RabbitMQ为我们提供了两种方式:

  • 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  • 通过将channel设置成confirm模式来实现;
2.2.1.事务机制

rabbitMQ事务机制三个方法:

  • txSelect()用于将当前channel设置成transaction模式
  • txCommit用于提交事务
  • txRollback用于回滚事务

在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

1
2
3
4
5
6
7
8
9
ini复制代码try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}

使用事务机制的话会降低RabbitMQ的性能,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。

2.2.2.confirm模式

producer端confirm模式的实现原理

生产者将信道设置成confirm模式,在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。

编程模式

客户端实现生产者confirm有三种编程方式:

  1. 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
  2. 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
  3. 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

第1种

1
2
3
4
5
6
scss复制代码\\普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。

channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}

第二种

1
2
3
4
5
6
7
scss复制代码channel.confirmSelect();
for(int i=0;i<batchCount;i++){
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}

第三种

异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
csharp复制代码SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});

while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}

性能对比

性能由低到高:事务模式(tx) < 普通confirm模式 < 批量confirm模式 < 异步confirm模式

2.2.3.消息确认(Consumer端)

为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

RabbitMQ管理平台界面上可以看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数。也可以通过命令行来查看上述信息:
img

代码示例(关闭自动消息确认,进行手动ack):

1
2
3
4
5
6
7
8
9
scss复制代码   QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(ConfirmConfig.queueName, false, consumer);

while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
// do something with msg.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

broker将在下面的情况中对消息进行confirm:

  • broker发现当前消息无法被路由到指定的queues中(如果设置了mandatory属性,则broker会发送basic.return)
  • 非持久属性的消息到达了其所应该到达的所有queue中(和镜像queue中)
  • 持久消息到达了其所应该到达的所有queue中(和镜像中),并被持久化到了磁盘(fsync)
  • 持久消息从其所在的所有queue中被consume了(如果必要则会被ack)

basicRecover:是路由不成功的消息可以使用recovery重新发送到队列中。

basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。

basicNack:可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的所有未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。

2.3.设置集群镜像模式

RabbitMQ的mirrored-queue即镜像队列,这个相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能完全的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好很多,很多现实生产环境下都是配置了mirrored-queue的。

2.4.消息补偿机制

消息提前持久化+定时任务

图片

上图流程:

(1)订单服务生产者在投递消息之前,先把消息持久化到Redis或DB中,建议Redis,高性能。消息的状态为发送中。

(2)confirm机制监听消息是否发送成功?如ack成功消息,删除Redis中此消息。

(3)如果nack不成功的消息,这个可以根据自身的业务选择是否重发此消息。也可以删除此消息,由自己的业务决定。

(4)这边加了个定时任务,来拉取隔一定时间了,消息状态还是为发送中的,这个状态就表明,订单服务是没有收到ack成功消息。

(5)定时任务会作补偿性的投递消息。这个时候如果MQ回调ack成功接收了,再把Redis中此消息删除。

幂等性保证

1.唯一性索引

根据业务规则,设置表字段唯一性。

2.乐观锁方案

借鉴数据库的乐观锁机制,根据version版本,也就是在操作库存前先获取当前商品的version版本号,然后操作的时候带上此version号。我们梳理下,我们第一次操作库存时,得到version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传递的version还是1,再执行上面的sql语句时,就不会执行;因为version已经变为2了,where条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。

1
sql复制代码update table set count=count-1,version=version+1 where id=2 and version=1

3.分布式锁

若是是分布是系统,构建全局惟一索引比较困难,例如惟一性的字段无法肯定,这时候能够引入分布式锁,经过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,而后作操做,以后释放锁,这样实际上是把多线程并发的锁的思路,引入多多个系统,也就是分布式系统中得解决思路。要点:某个长流程处理过程要求不能并发执行,能够在流程执行以前根据某个标志(用户ID+后缀等)获取分布式锁,其余流程执行时获取锁就会失败,也就是同一时间该流程只能有一个能执行成功,执行完成后,释放分布式锁(分布式锁要第三方系统提供);

2.唯一ID+指纹码机制

唯一ID:如数据库的主键id

指纹码:业务规则标识唯一的。如时间戳+银行返回的唯一码。需要注意的是,这个指纹码不一定就是我们系统生产的,可能是我们自己业务规则或者是外部返回的一些规则经过拼接后的东西。其目的:就是为了保障此次操作达到绝对唯一的。

唯一ID+指纹码机制,利用数据库主键去重。如:

1
sql复制代码Select count(id) from table where id = 唯一ID+指纹码

好处:实现简单,就一个拼接,而后查询判断是否重复。

坏处:高并发下若是是单个数据库就会有写入性能瓶颈

解决方案:根据 ID 进行分库分表,对 id 进行算法路由,落到一个具体的数据库,而后当这个 id 第二次来又会落到这个数据库,这时候就像我单库时的查重同样了。利用算法路由把单库的幂等变成多库的幂等,分摊数据流量压力,提升性能。

3.2.Redis 原子性

相信你们都知道 redis 的原子性操做,我这里就不须要过多介绍了。性能

使用 redis 的原子性去实现须要考虑两个点

一是 是否 要进行数据落库,若是落库的话,关键解决的问题是数据库和缓存如何作到原子性? 数据库与缓存进行同步确定要进行写操做,到底先写 redis 仍是先写数据库,这是个问题,涉及到缓存更新与淘汰的问题

二是若是不落库,那么都存储到缓存中,如何设置定时同步的策略? 不入库的话,可使用双重缓存等策略,保障一个消息副本,具体同步可使用相似 databus 这种同步工具。

摘抄

1.honeypps.com/mq/rabbitmq…

2.mp.weixin.qq.com/s/f5DKk_alX…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数据结构与算法

发表于 2021-02-26

⭐️ 本文已收录到 AndroidFamily,技术和职场问题,请关注公众号 [彭旭锐] 和 [BaguTree Pro] 知识星球提问。

学习数据结构与算法的关键在于掌握问题背后的算法思维框架,你的思考越抽象,它能覆盖的问题域就越广,理解难度也更复杂。在这个专栏里,小彭将基于 Java / Kotlin 语言,为你分享常见的数据结构与算法问题,及其解题框架思路。

本文是数据结构与算法系列的第 7 篇文章,完整文章目录请移步到文章末尾~

前言

二分查找也称折半查找(Binary Search),是一种效率较高的查找方法(对数时间复杂度),也是面试中经常考到的问题。虽然它的思想很简单,但据《编程珠玑》所述,二分查找算法的实现是极易犯错的,典型的 “一听就懂,一写就错”。 在算法面试中,如果能表现出迅速将自己的思考转变为代码并清晰写在白板上的能力,你的表现会优于只会夸夸其谈而写不出代码的人。


目录


  1. 二分查找基础

1.1 问题描述

二分查找的基本问题是:给定一个无重复的有序整型数组,数组大小在 [1,10000][1,10000][1,10000] 之间,查找目标数 t 在数组中的索引,不存在则返回 -1。

1.2 算法描述

二分查找算法的核心思想是:减治,即逐渐缩小包含目标数 t 的数组范围(缩小问题规模)来解决问题。

  • 1、一开始,数组范围是整个原数组;
  • 2、将目标数与数组的中位数比较,如果中位数大于目标数,则抛弃数组的后半部分,反之抛弃前半部分;
  • 3、重复这个过程,直到找到目标数 t 或者数组范围为空为止。

例如,下图演示了在一组数据中查找目标数 7 的过程:

图片引用自维基百科

1.3 二分查找的优势

  • 1、最省内存

二分查找算法基于已排序的原数组,属于本地查找算法。而基于二叉堆 / 散列表的查找算法还需要使用额外空间。

  • 2、对数时间复杂度

二分查找的时间复杂度仅为O(lgn)O(lgn)O(lgn)。

1.4 二分查找的局限性

  • 1、依赖于顺序表

二分查找算法适用于顺序表,而不适用与链表。这是因为顺序表随机访问元素的时间复杂度为O(1)O(1)O(1),而链表随机访问的时间复杂度为O(n)O(n)O(n),后者实现二分查找的时间复杂度为O(nlgn)O(nlgn)O(nlgn),这比O(n)O(n)O(n)顺序遍历链表还慢。

  • 2、依赖于数据有序

二分查找的必要条件之一是数据有序,否则最低需要O(nlgn)O(nlgn)O(nlgn)的时间复杂度进行预先排序(快速排序)。如果插入 / 删除操作不频繁,那么排序操作的时间成本可以被多次查找操作的成本均摊。这意味着二分查找适合静态有序的数据类型,或者插入 / 删除不频繁的动态数据数据。否则,应该采用二叉堆等动态数据类型。

  • 3、不适用数据量太大的场景

二分查找依赖于顺序表,意味着存储数据就需要一块连续内存。如果程序的内存不足以分配这样一块连续的数组,那么就无法使用二分查找。


  1. 二分查找解题框架

前面的内容相信大家很快就能理解了,我们直接看二分查找的原始题目 704 二分查找 【题解】,并根据这个例子来讨论二分查找的解题框架。

1
2
3
4
5
6
7
css复制代码给定一个 n 个元素有序的(升序)整型数组 nums 和一个目标值 target  
写一个函数搜索 nums 中的 target,如果目标值存在返回下标,否则返回 -1。

提示:
你可以假设 nums 中的所有元素是不重复的。
n 将在 [1, 10000]之间。
nums 的每个元素都将在 [-9999, 9999]之间。

2.1 解题框架三要素

二分查找解题框架由三个主要部分组成:

  • 1、预处理

这个步骤主要处理特殊用例与数据预处理,对于特殊用例可以直接返回结果。而如果数据未排序,则先进行排序。排序过程一般使用快速排序,时间复杂度O(nlgn)O(nlgn)O(nlgn),空间复杂度O(1)O(1)O(1)。

  • 2、二分查找

主要思路是做 「排除法」,即:对于闭区间[left,right][left , right][left,right],每次观察中位数,根据中位数的值,把区间划分为两个区间:

  • 一定不包含解的区间(抛弃)
  • 可能包含解的区间

下一次遍历时,抛弃「一定不包含解的区间」,而在「可能包含解的区间」继续搜索。这里会存在两种写法,两种写法划分的区别是「区间范围是不一样」。

  • 写法 1:尝试排除左区间

这种写法尝试判断「左区间」是否存在解,不存在则抛弃。此时,[left,right][left , right][left,right]应该划分为:

1
2
csharp复制代码[left , mid - 1]
[mid , right]

当左区间存在解时,下次搜索区间就是 [left , mid - 1]
当左区间不存在解时,中位数却可能是解,所以下次搜索的区间需要覆盖 mid,即 [mid , right]

  • 写法 2:尝试排除右区间

这种写法尝试判断「右区间」是否存在解,不存在则抛弃。此时,[left,right][left , right][left,right]应该划分为:

1
2
csharp复制代码[left , mid]
[mid + 1, right]

当右区间存在解时,下次搜索区间就是 [mid + 1, right]
当右区间不存在解时,中位数却可能是解,所以下次搜索的区间需要覆盖 mid,即 [left , mid]

一定需要定义闭区间吗?

其实不是。其他一些解题模板定义了左闭右开的区间,不过这样反而增加了理解的难度。要知道一个左闭右开的区间[left,right)[left , right)[left,right)一定存在一个等价的闭区间[left,right−1][left , right - 1][left,right−1],所以在我的模板里,就统一采用了闭区间啦。

  • 3、后处理

在退出循环以后,只剩下 1 个数未检查,如果该元素满足条件则是目标解,否则题目无解。

2.2 解题框架

上一节所述题目参考代码如下:

参考代码 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码fun search(nums: IntArray, target: Int): Int {
if (nums.size == 0) {
return -1
}
var left = 0
var right = nums.size - 1
while (left < right) {
写法 1:尝试排除左区间
val mid = (left + right) ushr 1
if (nums[mid] < target) {
// [mid]严格小于目标值,不是解
// 那么,下次搜索区间为[mid + 1,right]
left = mid + 1
} else {
// [mid]可能是解
// 那么,下次搜索区间为[left,mid]
right = mid
}
}
return if (nums[left] == target) left else -1
}

参考代码 2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码fun search(nums: IntArray, target: Int): Int {
if (nums.size == 0) {
return -1
}
var left = 0
var right = nums.size - 1
while (left < right) {
写法 2:尝试排除右区间
val mid = (left + right + 1) ushr 1
if (nums[mid] > target) {
// [mid]严格大于目标值,不是解
// 那么,下次搜索区间为[left,mid - 1]
right = mid - 1
} else {
// [mid]可能是解
// 那么,下次搜索区间为[mid,right]
left = mid
}
}
return if (nums[left] == target) left else -1
}

提示: 以上为 Kotlin 代码,Kotlin 中 shr 和 ushr 是移位运算,shr 是有符号右移,ushr 是无符号右移。

下面,我依次分析模板中需要注意的地方:

  • 细节 1:left = 0,right = nums.size - 1

在 第 2.1 节 中,我们定义了问题为闭区间[left,right][left , right][left,right],因此 left 的初值为 0,而 right 的初始值为数组长度 - 1。这相对于其他一些解题模板 right = nums.size 更容易理解,因为我们 left 和 right 永远指向我们关心的区间。

  • 细节 2:while(left < right)

表示二分查找的逻辑只处理区间长度大于 1 的情况,当区间只剩下一个元素的时候,退出循环执行后处理(如果最后一个元素满足条件则是目标解,否则,题目无解)。

  • 细节 3:取中位数

取中位数的代码是mid = (left + right) / 2,这个写法是不严谨的。因为在 left 或 right 很大的时候,left + right 有可能发生溢出,所有较严谨的写法是:

1
css复制代码mid = left + (right - left) / 2

另外,/ 2也可以用 移位操作 代替:

1
css复制代码mid = (left + right) ushr 1

提示: 可以在 JDK Arrays.java 中看到类似的写法:

1
2
3
> ini复制代码int mid = (low + high) >>> 1;
>
>
  • 细节 4:区间的划分

在 第 2.1 节 中,我们介绍了二分查找的两种写法,要特别理解两种写法中区分的划分方法。

尝试排除左区间的写法:

1
2
3
4
5
6
ini复制代码[left , mid - 1] 尝试抛弃
[mid , right]

即:
left = mid
right = mid - 1

尝试排除右区间的写法:

1
2
3
4
5
6
ini复制代码[left , mid]
[mid + 1, right] 尝试抛弃

即:
left = mid + 1
right = mid
  • 细节 5:向上取整与向下取整

当区间中含有奇数个元素时,中位数只有一个,例如,对于区间[1,2,3][1,2,3][1,2,3],中位数是222。而当区间中含有偶数个元素时,中位数其实有两个。例如,对于区间[1,2,3,4][1,2,3,4][1,2,3,4],中位数是222或者444。

此时,取前一个中位数222称为 向下取整,取后一个中位数444称为 向上取整(奇数区间向上取整和向下取整是同一个数):

1
2
3
4
5
6
css复制代码向下取整:
(left + right) ushr 1
left + (right - left) / 2
向上取整:
(left + right + 1) ushr 1
left + (right - left + 1) / 2

那么,我们应该选择哪个中位数呢,选择不同中位数的结果一样吗?其实是不一样的。 这取决于我们在 细节 4 中采用的写法:

尝试排除左区间的写法:当搜索区间只剩下两个元素时,应该采用向上取整。

「反证法」:因为[left,right][left , right][left,right]区间被划分为:[left,mid−1][left , mid - 1][left,mid−1] 和 [mid,right][mid , right][mid,right],如果选择向下取整(取前一个中位数,mid 的值等于 left),那么在左区间 无法 排除时,会进入left = mid的分支。此时,left 和 right 的值没有改变,出现区间不会缩小的情况,进入死循环。

尝试排除右区间的写法:当搜索区间只剩下两个元素时,应该采用向下取整。

「反证法」:因为[left,right][left , right][left,right]区间被划分为:[left,mid][left , mid][left,mid] 和 [mid+1,right][mid + 1 , right][mid+1,right],如果选择向上取整(取后一个中位数,mid 的值等于 right),那么在右区间 无法 排除时,会进入right = mid的分支。此时,left 和 right 的值没有改变,出现区间不会缩小的情况,进入死循环。

  • 细节 6:if (nums[left] == target) left else -1

执行后处理:在退出循环以后,只剩下 1 个数未检查,如果该元素满足条件则是目标解,否则题目无解。


  1. 举一反三

让我们回顾前面讲的二分查找原始题目,你能找出题目中的关键词吗?题目中最关键的信息是:查找无重复升序数组中的目标数,即:

关键信息 描述
顺序表 必要条件
有序(或单调性) 必要条件
数据量不大 必要条件
无重复 /
一个目标数 /

其中,「无重复」和「一个目标数」不是必要条件,修改这两个因素可以延伸出更多题目。

提示: 有序数组的本质是数组值符合单调性,详见 第 3.5 节。

3.1 搜索满足条件的目标数

35. Search Insert Position 【题解】
69. Sqrt(x) 【题解】
162. Find Peak Element (Medium) 【题解】
287. Find the Duplicate Number
374. Guess Number Higher or Lower 【题解】

相对于【题 704 二分查找】,这类题最大的不同是目标数是 「未知数」,要求我们找出这个目标数。这类题目的差不太大,我们要修正的只不过是排除区间的判断条件,使得下一次搜索的区间能够包含满足条件的目标数即可。

【35. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
kotlin复制代码class Solution {
fun searchInsert(nums: IntArray, target: Int): Int {
if (nums.isEmpty()) {
return 0
}
// 如果中位数严格小于 target,那么左区间一定不是插入位置
var left = 0
var right = nums.size - 1
while (left < right) {
val mid = (left + right) ushr 1
if (nums[mid] < target) {
left = mid + 1
} else {
right = mid
}
}
return if (nums[left] < target) left + 1 else left
}
}

【69. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kotlin复制代码class Solution {
fun mySqrt(x: Int): Int {
if (x < 0) {
return -1
}
if (x <= 1) {
return x
}
// 中位数平方严格大于 x,那么右区间一定不是解
var left: Long = 0L
var right: Long = (x ushr 1).toLong()
while (left < right) {
val mid: Long = ((left + right + 1) ushr 1).toLong()
if (mid * mid > x) {
right = mid - 1
} else {
left = mid
}
}
return left.toInt()
}
}

【162. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
kotlin复制代码class Solution {
fun findPeakElement(nums: IntArray): Int {
if (nums.isEmpty()) {
return -1
}
if (nums.size == 1) {
return 0
}
var left = 0
var right = nums.size - 1

while (left < right) {
val mid = (left + right + 1) ushr 1
// 如果中位数严格小于前驱,那么右区间一定不是解
if (nums[mid] < nums[mid - 1]) {
right = mid - 1
} else {
left = mid
}
}
return left
}




}
`}`

【374. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码class Solution : GuessGame() {
override fun guessNumber(n: Int): Int {
// 中位数严格小于目标数,那么左区间一定不是解
var left = 0
var right = n
while (left < right) {
val mid = (left + right) ushr 1
if (1 == guess(mid)) {
left = mid + 1
} else {
right = mid
}
}
return left
}
}

3.2 搜索目标数的边界

34. Find First and Last Position of Element in Sorted Array (Medium) 【题解】

相对于【题 704 二分查找】,这类题最大的不同是数组存在 「重复」,要求我们找出目标数的开始位置和结束位置。我们依旧可以通过判断中位数与目标值的关系,来确定下一轮的搜索区间:

  • 中位数严格小于目标数,那么一定不是开始位置
  • 中位数严格大于目标数,那么一定不是结束位置

注意,比较容易出错的是:当我们找到一个与目标值相同的位置,再线性地向左和向右搜索目标值的边界,时间复杂度是 O(n)O(n)O(n),而不再是O(lgn)O(lgn)O(lgn)。

【34. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
kotlin复制代码class Solution {
fun searchRange(nums: IntArray, target: Int): IntArray {
if (nums.isEmpty()) {
return intArrayOf(-1, -1)
}

val leftIndex = nums.firstOf(target)
if (-1 == leftIndex) {
return intArrayOf(-1, -1)
}
val rightIndex = nums.lastOf(target)
return intArrayOf(leftIndex, rightIndex)
}

private fun IntArray.firstOf(target: Int): Int {
var left = 0
var right = size - 1
while (left < right) {
val mid = (left + right) ushr 1
// 中位数严格小于目标数,那么一定不是开始位置
if (this[mid] < target) {
left = mid + 1
} else {
right = mid
}
}
return if (this[left] == target) left else -1
}

private fun IntArray.lastOf(target: Int): Int {
var left = 0
var right = size - 1
while (left < right) {
val mid = (left + right + 1) ushr 1
// 中位数严格大于目标数,那么一定不是结束位置
if (this[mid] > target) {
right = mid - 1
} else {
left = mid
}
}
return left
}




}
`}`

3.3 搜索旋转排序数组

33. Search in Rotated Sorted Array (Medium) 【题解】
81. Search in Rotated Sorted Array II (Medium) 【题解】

相对于【题 704 二分查找】,这类题最大的不同是数组是 「旋转排序数组」,有序数组经过旋转后就不再是有序的,我们不能够直接对数组进行二分查找。但如果把数组看为左右两个有序数组拼接起来的,这两部分内部依旧是有序的,可以使用二分查找。所以我们的解题思路是:判断当前中位数是的位置:

  • 位于左半部分:左边的元素严格有序,可以采用尝试抛弃左区间的写法
  • 位于右半部分:右边的元素严格有序,可以采用尝试抛弃右区间的写法

两种写法的模板我们已经很熟悉了,但是需要注意到两种写法需要用到同一个中位数。而在前面的介绍中我们知道:抛弃左区间的写法使用前中位数,抛弃右区间的写法使用后中位数,该如何处理呢?其实我们可以观察到,在左半部分,区间 [left,mid] 是严格升序的,等于有 [left,mid - 1]也是升序的,所以可以直接使用 mid - 1 对应的前中位数。

更进一步,当数组中存在重复数字,如果中位数和左端点的数字相同,那么我们无法确定目标数是在左区间完全相同,还是右区间完全相同。此时,我们将 left++ 或者 right–,相当于去掉一个重复的干扰项。

图片引用自 LeetCode

【33. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
kotlin复制代码class Solution {
fun search(nums: IntArray, target: Int): Int {
if (nums.isEmpty()) {
return -1
}
var left = 0
var right = nums.size - 1
while (left < right) {
val mid = (left + right + 1) ushr 1
if (nums[0] < nums[mid]) {
// 区间 [left,mid] 严格升序,尝试抛弃左区间
// 有 [left,mid - 1] 也是升序的,所以可以直接使用 mid - 1 对应的前中位数
val mid2 = mid - 1
if (nums[mid2] < target || nums[0] > target) {
left = mid2 + 1 // 下次搜索[mid2,right]
} else {
right = mid2 // 下次搜索[left,mid2]
}
} else {
// 区间 [mid,right] 严格升序,尝试抛弃右区间
if (nums[mid] > target || nums[nums.size - 1] < target) { // nums[0] < target 在 [3,1] 1 出错
right = mid - 1 // 下次搜索[left,mid-1]
} else {
left = mid // 下次搜索[mid,right]
}
}
}
return if (nums[left] == target) left else -1
}
}

【81. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
kotlin复制代码class Solution {
fun search(nums: IntArray, target: Int): Int {
if (nums.isEmpty()) {
return -1
}
var left = 0
var right = nums.size - 1
while (left < right) {
val mid = (left + right + 1) ushr 1
// 因为数组存在重复数字,如果中点和左端的数字相同,我们并不能确定是左区间全部相同,还是右区间完全相同。
// 此时,我们将 left++ ,相当于去掉一个重复的干扰项
if(nums[left] == nums[mid]){
left ++
continue
}
if (nums[0] < nums[mid]) {
// 区间 [left,mid] 严格升序,尝试抛弃左区间
// 有 [left,mid - 1] 也是升序的,所以可以直接使用 mid - 1 对应的前中位数
val mid2 = mid - 1
if (nums[mid2] < target || nums[0] > target) {
left = mid2 + 1 // 下次搜索[mid2,right]
} else {
right = mid2 // 下次搜索[left,mid2]
}
} else {
// 区间 [mid,right] 严格升序,尝试抛弃右区间
if (nums[mid] > target || nums[nums.size - 1] < target) { // nums[0] < target 在 [3,1] 1 出错
right = mid - 1 // 下次搜索[left,mid-1]
} else {
left = mid // 下次搜索[mid,right]
}
}
}
return if (nums[left] == target) left else -1
}
}
153. Find Minimum in Rotated Sorted Array (Medium) 【题解】
154. Find Minimum in Rotated Sorted Array II (Hard) 【题解】

这道题不是寻找目标值,而是寻找旋转排序数组中的**「最小值」**。用排除法,我们要做的就是排除一定不存在最小值的区间:

  • 位于左半部分:元素都比区间最后一个元素大,抛弃左区间
  • 位于右半部分:中位数比区间最后一个元素大,抛弃右区间

更进一步,当数组中存在重复数组,如果中位数和左端点的数字相同,那么我们无法确定是分界点是在左区间还是右区间。与上一个问题类似,我们依旧可以将 left++ 或者 right–,从而减少一个重复的干扰项。

【153. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kotlin复制代码class Solution {
fun findMin(nums: IntArray): Int {
if (nums.isEmpty()) {
return -1
}
if (nums.size == 1) {
return nums[0]
}
// 寻找比左边小的点
var left = 0
var right = nums.size - 1
while (left < right) {
val mid = (left + right) ushr 1
if (nums[mid] > nums[right]) { // nums[mid] > nums[nums.size - 1] 在 [3,1,3] 错
left = mid + 1
} else {
right = mid
}
}
return nums[left]
}
}

【154. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
kotlin复制代码class Solution {
fun findMin(nums: IntArray): Int {
if (nums.isEmpty()) {
return -1
}
if (nums.size == 1) {
return nums[0]
}
// 寻找比左边小的点
var left = 0
var right = nums.size - 1
while (left < right) {
val mid = (left + right) ushr 1
// 排除一个元素
if (nums[right] == nums[mid]) {
right--
continue
}

if (nums[mid] > nums[right]) { // nums[mid] > nums[nums.size - 1] 在 [3,1,3] 错
left = mid + 1
} else {
right = mid
}
}
return nums[left]
}




}
`}`

3.4 搜索山脉数组

1095. Find in Mountain Array (Hard) 【题解】

这道题与搜索旋转排序数组类似的地方在于:原数组整体是无序的,但组成数组的两部分内部是有序的。不同之处在于:搜索旋转排序数组可以通过比较中位数的值与的关系来判断当前所处的区间,但是山脉数组不再成立。

所以我们需要先找到山脉数组的峰值,再分别在两个区间内搜索目标值。

【1095. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
kotlin复制代码class Solution {
fun findInMountainArray(target: Int, mountainArr: MountainArray): Int {
if (0 == mountainArr.length()) {
return -1
}
// 先找到山脉数组的峰值,再分别在两个区间内搜索目标值
val topIndex = findTopIndex(mountainArr, target)
if (mountainArr.get(topIndex) == target) {
return topIndex
}
val targetIndex = findInAsc(mountainArr, target, 0, topIndex)
if (-1 != targetIndex) {
return targetIndex
}
return findInDesc(mountainArr, target, topIndex, mountainArr.length() - 1)
}

private fun findInAsc(mountainArr: MountainArray, target: Int, start: Int, end: Int): Int {
// 中位数严格小于目标数,那么左区间一定不是解
var left = 0
var right = end
while (left < right) {
val mid = (left + right) ushr 1
if (mountainArr.get(mid) < target) {
left = mid + 1
} else {
right = mid
}
}
return if (mountainArr.get(left) == target) left else -1
}

private fun findInDesc(mountainArr: MountainArray, target: Int, start: Int, end: Int): Int {
// 中位数严格小于目标数,那么右区间一定不是解
var left = 0
var right = end
while (left < right) {
val mid = (left + right + 1) ushr 1
if (mountainArr.get(mid) < target) {
right = mid - 1
} else {
left = mid
}
}
return if (mountainArr.get(left) == target) left else -1
}

private fun findTopIndex(mountainArr: MountainArray, target: Int): Int {
// 如果中位数严格小于前驱,那么右区间一定不是解
var left = 0
var right = mountainArr.length()
while (left < right) {
val mid = (left + right + 1) ushr 1
if (mountainArr.get(mid) < mountainArr.get(mid - 1)) {
right = mid - 1
} else {
left = mid
}
}
return left
}




}
`}`

3.5 最大值极小化

410. Split Array Largest Sum 【题解】
875. Koko Eating Bananas 【题解】
1300. Sum of Mutated Array Closest to Target 【题解】
1482. Minimum Number of Days to Make m Bouquets 【题解】
1552. Magnetic Force Between Two Balls
LCP 12. 小张刷题计划 【题解】

这类问题也许是二分查找问题中最难的,相对于【题 704 二分查找】,这类题最大的不同是 「没有明显的排序数组」「没有明显的目标数的条件」,灵活性非常大,对于没有经验的同学会很难联想到二分查找算法。

对于这类题目,最重要的点是挖掘题目中隐含的 「单调性」。在前面的题目中大多用到「排序数组」,其实排序数组就隐含了一种单调性,即随着下标 x 的增大,数组值单调递增或者单调递减。

单调性

单调性(monotonicity)也可以叫做增减性,可以定性地描述两个变量之间的关系。当变量xxx在其定义区间内增大时,函数y=f(x)y= f(x)y=f(x)随着增大(或减小),则称函数 y 在该区间单调递增(或单调递减)。

【题 410. 木棍切割问题】 是最大值最小化题目的经典问题:

1
2
3
复制代码给定一个非负整数数组 nums 和一个整数 m ,你需要将这个数组分成 m 个非空的连续子数组。

设计一个算法使得这 m 个子数组各自和的最大值最小。

当你看到 “最大值最小化” 类似的字样时,你应该想一想是不是可以用二分查找解决。那么,怎么挖掘题目中的两个变量之间的单调性呢?

这里有一个小套路:

  • 1、分析题目中存在那几个变量;
  • 2、找出题目中给出的某个固定值(例如此题的整数 m),这个固定值其实可以看作一个限制条件;
  • 3、挖掘变量间的单调性:当变量 1 在区间内递增时,变量 2 单调递增或递减;
  • 4、在变量 1 的区间内执行二分搜索,找出满足限制条件的最小值。

例如此题,经过分析可以看出题目中存在以下几个变量:

  • 分割数 count
  • 最大子数组和 maxSum

这两个变量的单调性是:当「最大子数组和」减小时,那么我们就需要多分割几次,「分割数」单调递增;而当「最大子数组和」增大时,那么我们就不需要划分那么多次了,「分割数」单调递减。

至于二分查找的步骤,我们要做的是在「最大子数组和」的取值区间内执行二分查找,找到分割数等于 m 的「目标数」。接下来就是编码啦,按照解题框架编写即可。

【410. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
kotlin复制代码class Solution {
fun splitArray(nums: IntArray, m: Int): Int {
// 最大数组和决定分隔数组个数,并符合单调性,使用二分法找出最小数组和和最大数组和的区间内,满足数组个数 m 的最小值
// 最大数组和:整个数组和
// 最小数组和:数组中最大的数
var maxSum = 0
var minSum = Integer.MIN_VALUE
for (num in nums) {
maxSum += num
minSum = Math.max(minSum, num)
}
// 中位数对应的分隔数组大于严格小于 m,那么左区间一定不是解(需要调大最大数组和)
var left = minSum
var right = maxSum
while (left < right) {
val mid = (left + right) ushr 1
if (nums.split(mid) > m) {
left = mid + 1
} else {
right = mid
}
}
return left
}

private fun IntArray.split(maxSum: Int): Int {
var count = 1 // 最少分割为 1 个数组
var curSum = 0
for (num in this) {
if (curSum + num > maxSum) {
count++
curSum = 0
}
curSum += num
}
return count
}




}
`}`

【875. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
kotlin复制代码class Solution {
fun minEatingSpeed(piles: IntArray, H: Int): Int {
if (piles.size > H) {
return H
}
// 速度决定时长,并符合单调性,使用二分法找出最小速度和最大速度区间内,满足时长 H 的最小值
// 最大速度:香蕉最多的堆
// 最小速度:1
val minSpeed = 1
var maxSpeed = piles.let {
var max = 1
for (pile in it) {
max = Math.max(max, pile)
}
max
}
var left = minSpeed
var right = maxSpeed
while (left < right) {
// 中位数时长大于 H,那么左区间一定不是解
val mid = (left + right) ushr 1
val hour = piles.getHour(mid)
if (hour > H) {
left = mid + 1
} else {
right = mid
}
}
return left
}

/**
* 获取当前速度需要的时间
*/
private fun IntArray.getHour(speed: Int): Int {
var hour = 0
for (pile in this) {
// hour += (pile - 1) / speed + 1 // 向上取整
hour += (pile + speed - 1) / speed // 向上取整
}
return hour
}




}
`}`

【1300. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
kotlin复制代码class Solution {
fun findBestValue(arr: IntArray, target: Int): Int {
// value 决定数组和,并符合单调性,使用二分法找出 value 和 value 区间内,最接近 target 的最小值
// 最大 value:数组的最大值
// 最小 value:0
val minValue = 0
val maxValue = arr.let {
var max = Integer.MIN_VALUE
for (num in arr) {
max = Math.max(max, num)
}
max
}
// “最接近”的判断条件不好写,因为选定了一个 value 求和以后,value 变小,接近程度可能变大也可能变小,所以需要寻找别的方案。
// 如果选择一个 value ,对应的数组和是第一个大于等于 target 的,那么目标值可能在 value 也可能在 value - 1(也可以找最后一个小于等于target的)

// 如果中位数对应的数组和严格小于 target,那么左区间一定不是解
var left = minValue
var right = maxValue
while (left < right) {
val mid = (left + right) ushr 1
if (arr.sum(mid, target) < target) {
left = mid + 1
} else {
right = mid
}
}
val diff1 = Math.abs(arr.sum(left - 1, target) - target)
val diff2 = Math.abs(arr.sum(left, target) - target)
return if (diff1 <= diff2) left - 1 else left
}

private fun IntArray.sum(value: Int, target: Int): Int {
var sum = 0
for (num in this) {
sum += Math.min(value, num)
}
return sum
}




}
`}`

【1482. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
kotlin复制代码class Solution {
fun minDays(bloomDay: IntArray, m: Int, k: Int): Int {
// 天数决定花束,并符合单调性,使用二分法找出最小天数和最大天数区间内,满足花束 m 的最小值
// 最大天数:最后盛开的花
// 最小天数:1
val minDay = 1
val maxDay = bloomDay.let {
var max = 0
for (bloom in bloomDay) {
max = Math.max(max, bloom)
}
max
}

// 如果中位数可以采集的花束严格小于 m,那么左区间一定不是解
var left = minDay
var right = maxDay
while (left < right) {
val mid = (left + right) ushr 1
if (bloomDay.collect(mid, k) < m) {
left = mid + 1
} else {
right = mid
}
}
return if (bloomDay.collect(left, k) >= m) left else -1
}

private fun IntArray.collect(day: Int, k: Int): Int {
var bouquets = 0
var collected = 0
var index = 0

while (index < size) {
if (this[index] <= day) {
if (++collected == k) {
// 完成一束
bouquets++
collected = 0
}
} else {
collected = 0
}
index++
}
return bouquets
}




}
`}`

【LCP12. 题解】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
kotlin复制代码class Solution {
fun minTime(time: IntArray, m: Int): Int {
// 一天做题时间决定完成天数,并符合单调性,使用二分法找出最小做题时间和最大做题时间区间内,满足 m 天的最小值
// 最大做题时间:所有题目的和
// 最小做题时间:每天都将题目交给小杨,0(如果不能求助,就是最难题目的完成题目时间)

val minTime = 0
val maxTime = time.let {
var sum = 0
for (theTime in time) {
sum += theTime
}
sum
}

fun IntArray.puzzle(dayTime: Int): Int {
var count = 1 // 最少需要花费一天
var today = 0 // 大于 dayTime 隔一天
var todayDifficult = -1 // 找出当天内最难的题目交给小杨
var isTodayHelped = false // 当前是否已经求助过
var index = 0
while (index < time.size) {
// 最难一题
todayDifficult = if (-1 == todayDifficult || time[index] >= time[todayDifficult]) index else todayDifficult
if (today + time[index] > dayTime) {
if (isTodayHelped) {
count++
isTodayHelped = false
todayDifficult = -1
today = 0
} else {
isTodayHelped = true
today -= time[todayDifficult]
}
} else {
today += time[index++]
}
}
return count
}

// 如果中位数对应的完成天数严格大于 m,那么只有增大做题时间,左区间一定不是解
var left = minTime
var right = maxTime
while (left < right) {
val mid = (left + right) ushr 1
if (time.puzzle(mid) > m) {
left = mid + 1
} else {
right = mid
}
}
return if (time.puzzle(left) <= m) left else 0
}




}
`}`

  1. 总结

如果你也遇到二分查找算法一直写不对的问题,希望今天介绍的解题框架能对你有所启发。需要注意,一定要配合练习才能融会贯通,不要背解题框架。文章 第 3 节 整理了大量典型例题,应该能覆盖常见的题型,多去看看。最后,欢迎各位大佬们点赞、留言、转发!


参考资料

  • 《二分查找词条》 —— 维基百科
  • 《二分查找详解》 —— labuladong 著
  • 《数据结构与算法之美》(第15、16课) —— 王争 jiang,极客时间 出品
  • 《用「排除法」(减治思想)写二分查找问题、与其它二分查找模板的比较》 —— liweiwei1419 著
  • 《编程之法·面试和算法心得》(第4章) —— July 著
  • 《编程珠玑》(第2、4、9章) —— [美] Jon Bentley 著

推荐阅读

数据结构与算法系列完整目录如下(2023/07/11 更新):

  • #1 链表问题总结
  • #2 链表相交 & 成环问题总结
  • #3 计算器与逆波兰表达式总结
  • #4 高楼丢鸡蛋问题总结
  • #5 为什么你学不会递归?谈谈我的经验
  • #6 回溯算法解题框架总结
  • #7 下次面试遇到二分查找,别再写错了
  • #8 什么是二叉树?
  • #9 什么是二叉堆 & Top K 问题
  • #10 使用前缀和数组解决 “区间和查询” 问题
  • #11 面试遇到线段树,已经这么卷了吗?
  • #12 使用单调队列解决 “滑动窗口最大值” 问题
  • #13 使用单调栈解决 “下一个更大元素” 问题
  • #14 使用并查集解决 “朋友圈” 问题
  • #15 如何实现一个优秀的 HashTable 散列表
  • #16 简答一波 HashMap 常见面试题
  • #17 二叉树高频题型汇总
  • #18 下跳棋,极富想象力的同向双指针模拟

Java & Android 集合框架系列文章: 跳转阅读

LeetCode 上分之旅系列文章:跳转阅读

⭐️ 永远相信美好的事情即将发生,欢迎加入小彭的 Android 交流社群~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用Dom4j操作XML

发表于 2021-02-26

自建博客地址:bytelife.net,欢迎访问! 本文为博客自动同步文章,为了更好的阅读体验,建议您移步至我的博客👇

本文作者: Jeffrey
本文链接: bytelife.net/articles/47…
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!


引言:XML(可扩展标记语言)在软件开发工程中取得了广泛的应用。在Java语言中操作XML有许多方法,最常用的方法就是使用JDom、Dom4j等第三方组件。本文将简单介绍使用Dom4j操作XML的基本方法。

本文采用的Dom4j版本为1.6.1,下载地址见文章结尾。 废话不多说,先来看一下本文使用的XML文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<class id="1">
<student>
<num>0001</num>
<name>张三</name>
<age>19</age>
</student>

<student>
<num>0002</num>
<name>李四</name>
<age>21</age>
<hobby>
<name>足球</name>
<name>篮球</name>
</hobby>
</student>

<teacher>
<name>王老师</name>
<age>40</age>
<course>Java</course>
</teacher>
</class>

在这个XML文件中,可以看到根节点为class,它有student和teacher子节点,而student子节点中又包含num、name、age、hobby等孙子辈节点,teacher子节点中包含name、age、course等孙子辈节点。 下面就使用dom4j来操作这个xml文件。

解析XML

当使用dom4j操作xml时,你想做的第一件事可能就是解析一个Xml文档,这个操作在dom4j中十分容易,使用下面的代码即可以轻松的解析xml文件并返回一个Document对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码package cn.javacodes.dom4j;

import java.io.File;

import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

public class TestDom4j {

public static void main(String[] args) throws Exception {
// 获取SAX阅读器
SAXReader reader = new SAXReader();
// 获取Document对象
Document doc = reader.read(new File("d:/DemoXML.xml"));
// 获取根节点
Element root = doc.getRootElement();
// 输出测试
System.out.println("根节点:" + root.getName() + ",id="
\+ root.attributeValue("id"));
}

}

输出结果:

根节点:class,id=1

使用迭代器Iterator

一个Element对象可以通过几个方法来返回一个标准的Java迭代器: (1)迭代所有子元素

1
2
3
4
5
java复制代码// 迭代root元素的所有子元素
for (Iterator i = root.elementIterator(); i.hasNext(); ) {
Element element = (Element) i.next();
System.out.println(element.getName());
}

输出结果:

student student teacher

(2)通过元素名称迭代

1
2
3
4
5
java复制代码// 通过元素名称“student”迭代子元素
for ( Iterator i = root.elementIterator( "student" ); i.hasNext(); ) {
Element foo = (Element) i.next();
System.out.println(foo.getName());
}

输出结果:

student student

(3)迭代所有属性

1
2
3
4
5
java复制代码// 迭代root元素的所有属性
for ( Iterator i = root.attributeIterator(); i.hasNext(); ) {
Attribute attribute = (Attribute) i.next();
System.out.println(attribute.getName() + ":" + attribute.getValue());
}

输出结果:

id:1

获取元素值

通常我们都需要获取xml元素标签内部的文本,也就是元素值,下面一个简单的例子递归显示所有的元素值:

1
2
3
4
5
6
7
8
9
10
java复制代码public  static void showAllElementText(Element e){
for (Iterator i = e.elementIterator(); i.hasNext(); ) {
Element element = (Element) i.next();
if (!element.elements().isEmpty()) {
showAllElementText(element);
} else {
System.out.println(element.getName()+"="+element.getTextTrim());
}
}
}

输出结果

num=0001 name=张三 age=19 num=0002 name=李四 age=21 name=足球 name=篮球 name=王老师 age=40 course=Java

使用XPath表达式

在Dom4j中使用XPath表达式可以更加轻松的操作XML文档,使用XPath表达式可以使用仅一行代码来进行复杂的操作,在Dom4j中使用XPath的几个简单示例代码如下: (1)查询单个节点(默认查找第一个):

1
2
3
4
5
6
7
8
java复制代码// 获取SAX阅读器
SAXReader reader = new SAXReader();
// 获取Document对象
Document doc = reader.read(new File("d:/DemoXML.xml"));
// 获取student元素的name节点
Node node = doc.selectSingleNode("//student/name");
// 输出测试
System.out.println(node.getName() + "=" + node.getText());

(2)查询多个节点

1
2
3
4
5
6
7
java复制代码// 获取所有student元素的name节点
List<Node> list = doc.selectNodes("//student");
// 输出测试
for (Node node : list) {
    System.out.println(node.getName()
\+ ":" \+ node.valueOf("name"));
}

以上为两种经常使用的方法,另外如果你想在一个XHTML文档中查找到所有的超文本链接,可以使用下面这个窍门轻松实现:

1
2
3
4
5
6
7
java复制代码    public void findLinks(Document document) throws DocumentException {
List list = document.selectNodes( "//a/@href" );
for (Iterator iter = list.iterator(); iter.hasNext(); ) {
Attribute attribute = (Attribute) iter.next();
String url = attribute.getValue();
}
}

如果你需要任何有关学习XPah表达式语言的帮助,你可以访问Zvon tutorial进行学习,这里可以通过各种各样的例子帮助你学习。

快速循环

如果你需要操作一个十分庞大的XML文档,那么你应该使用快速循环的方法以避免在每次循环都创建Iterator对象,下面是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码    public void treeWalk(Document document) {
treeWalk( document.getRootElement() );
}

public void treeWalk(Element element) {
for ( int i = 0, size = element.nodeCount(); i < size; i++ ) {
Node node = element.node(i);
if ( node instanceof Element ) {
treeWalk( (Element) node );
}
else {
// 这里写你想要做的操作
}
}
}

创建XML Document对象

在使用Dom4j时经常需要创建一个新的document,下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;

public class Foo {

public Document createDocument() {
Document document = DocumentHelper.createDocument();
Element root = document.addElement( "root" );

Element author1 = root.addElement( "author" )
.addAttribute( "name", "James" )
.addAttribute( "location", "UK" )
.addText( "James Strachan" );

Element author2 = root.addElement( "author" )
.addAttribute( "name", "Bob" )
.addAttribute( "location", "US" )
.addText( "Bob McWhirter" );

return document;
}
}

写入XML文件

使用Dom4j将Document对象写入到XML文件十分简单,你只需要1行代码即可解决:

1
java复制代码document.write( new FileWriter( "foo.xml" ));

如果你想修改输出的格式,例如更易读的排版或者压缩(紧凑)的排版,再或者你想通过Writer或OutputStream进行输出,那么你可以使用XMLWriter类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码import org.dom4j.Document;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.XMLWriter;

public class Foo {

public void write(Document document) throws IOException {

// 写入到一个文件
XMLWriter writer = new XMLWriter(
new FileWriter( "output.xml" )
);
writer.write( document );
writer.close();


// 更加美观的排版
OutputFormat format = OutputFormat.createPrettyPrint();
writer = new XMLWriter( System.out, format );
writer.write( document );

// 更加紧凑的排版
format = OutputFormat.createCompactFormat();
writer = new XMLWriter( System.out, format );
writer.write( document );
}
}

Document对象与XML代码互转

如果你想通过一个Document对象或其他任何节点对象(例如Attribute或Element),你可以通过asXML()方法将它转换为XML文本字符串,例如:

1
2
java复制代码        Document document = ...;
String text = document.asXML();

如果你想从一个XML文本字符串转为一个Document对象,你可以使用DocumentHelper.parseText()方法进行解析:

1
2
java复制代码        String text = "<person> <name>James</name> </person>";
Document document = DocumentHelper.parseText(text);

XSLT

通过Sum公司提供的JAXP API在一个Document上应用XSLT十分简单。这里有一个使用JAXP创建一个transformer并应用到Document上的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;

import org.dom4j.Document;
import org.dom4j.io.DocumentResult;
import org.dom4j.io.DocumentSource;

public class Foo {

public Document styleDocument(
Document document,
String stylesheet
) throws Exception {

// 使用JAXP加载transformer
TransformerFactory factory = TransformerFactory.newInstance();
Transformer transformer = factory.newTransformer(
new StreamSource( stylesheet )
);

// 样式化document
DocumentSource source = new DocumentSource( document );
DocumentResult result = new DocumentResult();
transformer.transform( source, result );

// 返回转换后的document
Document transformedDoc = result.getDocument();
return transformedDoc;
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JVM相关 - 深入理解 Systemgc()

发表于 2021-02-25

本文基于 Java 17-ea,但是相关设计在 Java 11 之后是大致一样的

我们经常在面试中询问 System.gc() 究竟会不会立刻触发 Full GC,网上也有很多人给出了答案,但是这些答案都有些过时了。本文基于最新的 Java 的下一个即将发布的 LTS 版本 Java 17(ea)的源代码,深入解析 System.gc() 背后的故事。

为什么需要System.gc()

1. 使用并管理堆外内存的框架,需要 Full GC 的机制触发堆外内存回收

JVM 的内存,不止堆内存,还有其他很多块,通过 Native Memory Tracking 可以看到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
ini复制代码Native Memory Tracking:

Total: reserved=6308603KB, committed=4822083KB
- Java Heap (reserved=4194304KB, committed=4194304KB)
(mmap: reserved=4194304KB, committed=4194304KB)

- Class (reserved=1161041KB, committed=126673KB)
(classes #21662)
( instance classes #20542, array classes #1120)
(malloc=3921KB #64030)
(mmap: reserved=1157120KB, committed=122752KB)
( Metadata: )
( reserved=108544KB, committed=107520KB)
( used=105411KB)
( free=2109KB)
( waste=0KB =0.00%)
( Class space:)
( reserved=1048576KB, committed=15232KB)
( used=13918KB)
( free=1314KB)
( waste=0KB =0.00%)

- Thread (reserved=355251KB, committed=86023KB)
(thread #673)
(stack: reserved=353372KB, committed=84144KB)
(malloc=1090KB #4039)
(arena=789KB #1344)

- Code (reserved=252395KB, committed=69471KB)
(malloc=4707KB #17917)
(mmap: reserved=247688KB, committed=64764KB)

- GC (reserved=199635KB, committed=199635KB)
(malloc=11079KB #29639)
(mmap: reserved=188556KB, committed=188556KB)

- Compiler (reserved=2605KB, committed=2605KB)
(malloc=2474KB #2357)
(arena=131KB #5)

- Internal (reserved=3643KB, committed=3643KB)
(malloc=3611KB #8683)
(mmap: reserved=32KB, committed=32KB)

- Other (reserved=67891KB, committed=67891KB)
(malloc=67891KB #2859)

- Symbol (reserved=26220KB, committed=26220KB)
(malloc=22664KB #292684)
(arena=3556KB #1)

- Native Memory Tracking (reserved=7616KB, committed=7616KB)
(malloc=585KB #8238)
(tracking overhead=7031KB)

- Arena Chunk (reserved=10911KB, committed=10911KB)
(malloc=10911KB)

- Tracing (reserved=25937KB, committed=25937KB)
(malloc=25937KB #8666)

- Logging (reserved=5KB, committed=5KB)
(malloc=5KB #196)

- Arguments (reserved=18KB, committed=18KB)
(malloc=18KB #486)

- Module (reserved=532KB, committed=532KB)
(malloc=532KB #3579)

- Synchronizer (reserved=591KB, committed=591KB)
(malloc=591KB #4777)

- Safepoint (reserved=8KB, committed=8KB)
(mmap: reserved=8KB, committed=8KB)
  • Java Heap: 堆内存,即-Xmx限制的最大堆大小的内存。
  • Class:加载的类与方法信息,其实就是 metaspace,包含两部分: 一是 metadata,被-XX:MaxMetaspaceSize限制最大大小,另外是 class space,被-XX:CompressedClassSpaceSize限制最大大小
  • Thread:线程与线程栈占用内存,每个线程栈占用大小受-Xss限制,但是总大小没有限制。
  • Code:JIT 即时编译后(C1 C2 编译器优化)的代码占用内存,受-XX:ReservedCodeCacheSize限制
  • GC:垃圾回收占用内存,例如垃圾回收需要的 CardTable,标记数,区域划分记录,还有标记 GC Root 等等,都需要内存。这个不受限制,一般不会很大的。
  • Compiler:C1 C2 编译器本身的代码和标记占用的内存,这个不受限制,一般不会很大的
  • Internal:命令行解析,JVMTI 使用的内存,这个不受限制,一般不会很大的
  • Symbol: 常量池占用的大小,字符串常量池受-XX:StringTableSize个数限制,总内存大小不受限制
  • Native Memory Tracking:内存采集本身占用的内存大小,如果没有打开采集(那就看不到这个了,哈哈),就不会占用,这个不受限制,一般不会很大的
  • Arena Chunk:所有通过 arena 方式分配的内存,这个不受限制,一般不会很大的
  • Tracing:所有采集占用的内存,如果开启了 JFR 则主要是 JFR 占用的内存。这个不受限制,一般不会很大的
  • Logging,Arguments,Module,Synchronizer,Safepoint,Other,这些一般我们不会关心。

除了 Native Memory Tracking 记录的内存使用,还有两种内存 Native Memory Tracking 没有记录,那就是:

  • Direct Buffer:直接内存
  • MMap Buffer:文件映射内存

针对除了堆内存以外,其他的内存,有些也是需要 GC 的。例如:MetaSpace,CodeCache,Direct Buffer,MMap Buffer 等等。早期在 Java 8 之前的 JVM,对于这些内存回收的机制并不完善,很多情况下都需要 FullGC 扫描整个堆才能确定这些区域中哪些内存可以回收。

有一些框架,大量使用并管理了这些堆外空间。例如 netty 使用了 Direct Buffer,Kafka 和 RocketMQ 使用了 Direct Buffer 和 MMap Buffer。他们都是提前从系统申请好一块内存,之后管理起来并使用。在空间不足时,继续向系统申请,并且也会有缩容。例如 netty,在使用的 Direct Buffer 达到-XX:MaxDirectMemorySize的限制之后,则会先尝试将不可达的Reference对象加入Reference链表中,依赖Reference的内部守护线程触发可以被回收DirectByteBuffer关联的Cleaner的run()方法。如果内存还是不足, 则执行System.gc(),期望触发full gc,来回收堆内存中的DirectByteBuffer对象来触发堆外内存回收,如果还是超过限制,则抛出java.lang.OutOfMemoryError.

2. 使用了 WeakReference, SoftReference 的程序,需要相应的 GC 回收。

对于 WeakReference,只要发生 GC,无论是 Young GC 还是 FullGC 就会被回收。SoftReference 只有在 FullGC 的时候才会被回收。当我们程序想主动对于这些引用进行回收的时候,需要能触发 GC 的方法,这就用到了System.gc()。

3. 测试,学习 JVM 机制的时候

有些时候,我们为了测试,学习 JVM 的某些机制,需要让 JVM 做一次 GC 之后开始,这也会用到System.gc()。但是其实有更好的方法,后面你会看到。

System.gc() 背后的原理

System.gc()实际上调用的是RunTime.getRunTime().gc():

1
2
3
csharp复制代码public static void gc() {
Runtime.getRuntime().gc();
}

这个方法是一个 native 方法:

1
csharp复制代码public native void gc();

对应 JVM 源码:

1
2
3
4
5
6
7
scss复制代码JVM_ENTRY_NO_ENV(void, JVM_GC(void))
JVMWrapper("JVM_GC");
//如果没有将JVM启动参数 DisableExplicitGC 设置为 false,则执行 GC,GC 原因是 System.gc 触发,对应 GCCause::_java_lang_system_gc
if (!DisableExplicitGC) {
Universe::heap()->collect(GCCause::_java_lang_system_gc);
}
JVM_END

首先,根据 DisableExplicitGC 这个 JVM 启动参数的状态,确定是否会 GC,如果需要 GC,不同 GC 会有不同的处理。

1. G1 GC 的处理

如果是 System.gc() 触发的 GC,G1 GC 会根据 ExplicitGCInvokesConcurrent 这个 JVM 参数决定是默认 GC (轻量 GC,YoungGC)还是 FullGC。

参考代码g1CollectedHeap.cpp:

1
2
3
4
5
6
7
8
9
10
11
12
scss复制代码//是否应该并行 GC,也就是较为轻量的 GC,对于 GCCause::_java_lang_system_gc,这里就是判断 ExplicitGCInvokesConcurrent 这个 JVM 是否为 true
if (should_do_concurrent_full_gc(cause)) {
return try_collect_concurrently(cause,
gc_count_before,
old_marking_started_before);
}// 省略其他这里我们不关心的判断分支
else {
//否则进入 full GC
VM_G1CollectFull op(gc_count_before, full_gc_count_before, cause);
VMThread::execute(&op);
return op.gc_succeeded();
}

2. ZGC 的处理

直接不处理,不支持通过 System.gc() 触发 GC。

参考源码:zDriver.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
php复制代码void ZDriver::collect(GCCause::Cause cause) {
switch (cause) {
//注意这里的 _wb 开头的 GC 原因,这代表是 WhiteBox 触发的,后面我们会用到,这里先记一下
case GCCause::_wb_young_gc:
case GCCause::_wb_conc_mark:
case GCCause::_wb_full_gc:
case GCCause::_dcmd_gc_run:
case GCCause::_java_lang_system_gc:
case GCCause::_full_gc_alot:
case GCCause::_scavenge_alot:
case GCCause::_jvmti_force_gc:
case GCCause::_metadata_GC_clear_soft_refs:
// Start synchronous GC
_gc_cycle_port.send_sync(cause);
break;

case GCCause::_z_timer:
case GCCause::_z_warmup:
case GCCause::_z_allocation_rate:
case GCCause::_z_allocation_stall:
case GCCause::_z_proactive:
case GCCause::_z_high_usage:
case GCCause::_metadata_GC_threshold:
// Start asynchronous GC
_gc_cycle_port.send_async(cause);
break;

case GCCause::_gc_locker:
// Restart VM operation previously blocked by the GC locker
_gc_locker_port.signal();
break;

case GCCause::_wb_breakpoint:
ZBreakpoint::start_gc();
_gc_cycle_port.send_async(cause);
break;

//对于其他原因,不触发GC,GCCause::_java_lang_system_gc 会走到这里
default:
// Other causes not supported
fatal("Unsupported GC cause (%s)", GCCause::to_string(cause));
break;
}
}

3. Shenandoah GC 的处理

Shenandoah 的处理和 G1 GC 的类似,先判断是不是用户明确触发的 GC,然后通过 DisableExplicitGC 这个 JVM 参数判断是否可以 GC(其实这个是多余的,可以去掉,因为外层JVM_ENTRY_NO_ENV(void, JVM_GC(void))已经处理这个状态位了)。如果可以,则请求 GC,阻塞等待 GC 请求被处理。然后根据 ExplicitGCInvokesConcurrent 这个 JVM 参数决定是默认 GC (轻量并行 GC,YoungGC)还是 FullGC。

参考源码shenandoahControlThread.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ruby复制代码void ShenandoahControlThread::request_gc(GCCause::Cause cause) {
assert(GCCause::is_user_requested_gc(cause) ||
GCCause::is_serviceability_requested_gc(cause) ||
cause == GCCause::_metadata_GC_clear_soft_refs ||
cause == GCCause::_full_gc_alot ||
cause == GCCause::_wb_full_gc ||
cause == GCCause::_scavenge_alot,
"only requested GCs here");
//如果是显式GC(即如果是GCCause::_java_lang_system_gc,GCCause::_dcmd_gc_run,GCCause::_jvmti_force_gc,GCCause::_heap_inspection,GCCause::_heap_dump中的任何一个)
if (is_explicit_gc(cause)) {
//如果没有关闭显式GC,也就是 DisableExplicitGC 为 false
if (!DisableExplicitGC) {
//请求 GC
handle_requested_gc(cause);
}
} else {
handle_requested_gc(cause);
}
}

请求 GC 的代码流程是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scss复制代码void ShenandoahControlThread::handle_requested_gc(GCCause::Cause cause) {
MonitorLocker ml(&_gc_waiters_lock);
//获取当前全局 GC id
size_t current_gc_id = get_gc_id();
//因为要进行 GC ,所以将id + 1
size_t required_gc_id = current_gc_id + 1;
//直到当前全局 GC id + 1 为止,代表 GC 执行了
while (current_gc_id < required_gc_id) {
//设置 gc 状态位,会有其他线程扫描执行 gc
_gc_requested.set();
//记录 gc 原因,根据不同原因有不同的处理策略,我们这里是 GCCause::_java_lang_system_gc
_requested_gc_cause = cause;
//等待 gc 锁对象 notify,代表 gc 被执行并完成
ml.wait();
current_gc_id = get_gc_id();
}
}

对于GCCause::_java_lang_system_gc,GC 的执行流程大概是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scss复制代码bool explicit_gc_requested = _gc_requested.is_set() &&  is_explicit_gc(_requested_gc_cause);

//省略一些代码

else if (explicit_gc_requested) {
cause = _requested_gc_cause;
log_info(gc)("Trigger: Explicit GC request (%s)", GCCause::to_string(cause));

heuristics->record_requested_gc();
// 如果 JVM 参数 ExplicitGCInvokesConcurrent 为 true,则走默认轻量 GC
if (ExplicitGCInvokesConcurrent) {
policy->record_explicit_to_concurrent();
mode = default_mode;
// Unload and clean up everything
heap->set_unload_classes(heuristics->can_unload_classes());
} else {
//否则,执行 FullGC
policy->record_explicit_to_full();
mode = stw_full;
}
}

System.gc() 相关的 JVM 参数

1. DisableExplicitGC

说明:是否禁用显式 GC,默认是不禁用的。对于 Shenandoah GC,显式 GC 包括:GCCause::_java_lang_system_gc,GCCause::_dcmd_gc_run,GCCause::_jvmti_force_gc,GCCause::_heap_inspection,GCCause::_heap_dump,对于其他 GC,仅仅限制GCCause::_java_lang_system_gc

默认:false

举例:如果想禁用显式 GC:-XX:+DisableExplicitGC

2. ExplicitGCInvokesConcurrent

说明:对于显式 GC,是执行轻量并行 GC (YoungGC)还是 FullGC,如果为 true 则是执行轻量并行 GC (YoungGC),false 则是执行 FullGC

默认:false

举例:启用的话指定:-XX:+ExplicitGCInvokesConcurrent

其实,在设计上有人提出(参考链接)想将 ExplicitGCInvokesConcurrent 改为 true。但是目前并不是所有的 GC 都可以在轻量并行 GC 对 Java 所有内存区域进行回收,有些时候必须通过 FullGC。所以,目前这个参数还是默认为 false

3. 已过期的 ExplicitGCInvokesConcurrentAndUnloads 和使用 ClassUnloadingWithConcurrentMark 替代

如果显式 GC采用轻量并行 GC,那么无法执行 Class Unloading(类卸载),如果启用了类卸载功能,可能会有异常。所以通过这个状态位来标记在显式 GC时,即使采用轻量并行 GC,也要扫描进行类卸载。 ExplicitGCInvokesConcurrentAndUnloads目前已经过期了,用ClassUnloadingWithConcurrentMark替代

参考BUG-JDK-8170388

如何灵活可控的主动触发各种 GC?

答案是通过 WhiteBox API。但是这个不要在生产上面执行,仅仅用来测试 JVM 还有学习 JVM 使用。WhiteBox API 是 HotSpot VM 自带的白盒测试工具,将内部的很多核心机制的 API 暴露出来,用于白盒测试 JVM,压测 JVM 特性,以及辅助学习理解 JVM 并调优参数。WhiteBox API 是 Java 7 引入的,目前 Java 8 LTS 以及 Java 11 LTS(其实是 Java 9+ 以后的所有版本,这里只关心 LTS 版本,Java 9 引入了模块化所以 WhiteBox API 有所变化)都是有的。但是默认这个 API 并没有编译在 JDK 之中,但是他的实现是编译在了 JDK 里面了。所以如果想用这个 API,需要用户自己编译需要的 API,并加入 Java 的 BootClassPath 并启用 WhiteBox API。下面我们来用 WhiteBox API 来主动触发各种 GC。

1. 编译 WhiteBox API

将https://github.com/openjdk/jdk/tree/master/test/lib路径下的sun目录取出,编译成一个 jar 包,名字假设是 whitebox.jar

2. 编写测试程序

将 whitebox.jar 添加到你的项目依赖,之后写代码

1
2
3
4
5
6
7
8
9
10
11
scss复制代码public static void main(String[] args) throws Exception {
WhiteBox whiteBox = WhiteBox.getWhiteBox();
//执行young GC
whiteBox.youngGC();
System.out.println("---------------------------------");
whiteBox.fullGC();
//执行full GC
whiteBox.fullGC();
//保持进程不退出,保证日志打印完整
Thread.currentThread().join();
}

3. 启动程序查看效果

使用启动参数 -Xbootclasspath/a:/home/project/whitebox.jar -XX:+UnlockDiagnosticVMOptions -XX:+WhiteBoxAPI -Xlog:gc 启动程序。其中前三个 Flag 表示启用 WhiteBox API,最后一个表示打印 GC info 级别的日志到控制台。

我的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
ini复制代码[0.036s][info][gc] Using G1
[0.048s][info][gc,init] Version: 17-internal+0-adhoc.Administrator.jdk (fastdebug)
[0.048s][info][gc,init] CPUs: 16 total, 16 available
[0.048s][info][gc,init] Memory: 16304M
[0.048s][info][gc,init] Large Page Support: Disabled
[0.048s][info][gc,init] NUMA Support: Disabled
[0.048s][info][gc,init] Compressed Oops: Enabled (32-bit)
[0.048s][info][gc,init] Heap Region Size: 1M
[0.048s][info][gc,init] Heap Min Capacity: 512M
[0.048s][info][gc,init] Heap Initial Capacity: 512M
[0.048s][info][gc,init] Heap Max Capacity: 512M
[0.048s][info][gc,init] Pre-touch: Disabled
[0.048s][info][gc,init] Parallel Workers: 13
[0.048s][info][gc,init] Concurrent Workers: 3
[0.048s][info][gc,init] Concurrent Refinement Workers: 13
[0.048s][info][gc,init] Periodic GC: Disabled
[0.049s][info][gc,metaspace] CDS disabled.
[0.049s][info][gc,metaspace] Compressed class space mapped at: 0x0000000100000000-0x0000000140000000, reserved size: 1073741824
[0.049s][info][gc,metaspace] Narrow klass base: 0x0000000000000000, Narrow klass shift: 3, Narrow klass range: 0x140000000
[1.081s][info][gc,start ] GC(0) Pause Young (Normal) (WhiteBox Initiated Young GC)
[1.082s][info][gc,task ] GC(0) Using 12 workers of 13 for evacuation
[1.089s][info][gc,phases ] GC(0) Pre Evacuate Collection Set: 0.5ms
[1.089s][info][gc,phases ] GC(0) Merge Heap Roots: 0.1ms
[1.089s][info][gc,phases ] GC(0) Evacuate Collection Set: 3.4ms
[1.089s][info][gc,phases ] GC(0) Post Evacuate Collection Set: 1.6ms
[1.089s][info][gc,phases ] GC(0) Other: 1.3ms
[1.089s][info][gc,heap ] GC(0) Eden regions: 8->0(23)
[1.089s][info][gc,heap ] GC(0) Survivor regions: 0->2(4)
[1.089s][info][gc,heap ] GC(0) Old regions: 0->0
[1.089s][info][gc,heap ] GC(0) Archive regions: 0->0
[1.089s][info][gc,heap ] GC(0) Humongous regions: 0->0
[1.089s][info][gc,metaspace] GC(0) Metaspace: 6891K(7104K)->6891K(7104K) NonClass: 6320K(6400K)->6320K(6400K) Class: 571K(704K)->571K(704K)
[1.089s][info][gc ] GC(0) Pause Young (Normal) (WhiteBox Initiated Young GC) 7M->1M(512M) 7.864ms
[1.089s][info][gc,cpu ] GC(0) User=0.00s Sys=0.00s Real=0.01s
---------------------------------
[1.091s][info][gc,task ] GC(1) Using 12 workers of 13 for full compaction
[1.108s][info][gc,start ] GC(1) Pause Full (WhiteBox Initiated Full GC)
[1.108s][info][gc,phases,start] GC(1) Phase 1: Mark live objects
[1.117s][info][gc,phases ] GC(1) Phase 1: Mark live objects 8.409ms
[1.117s][info][gc,phases,start] GC(1) Phase 2: Prepare for compaction
[1.120s][info][gc,phases ] GC(1) Phase 2: Prepare for compaction 3.031ms
[1.120s][info][gc,phases,start] GC(1) Phase 3: Adjust pointers
[1.126s][info][gc,phases ] GC(1) Phase 3: Adjust pointers 5.806ms
[1.126s][info][gc,phases,start] GC(1) Phase 4: Compact heap
[1.190s][info][gc,phases ] GC(1) Phase 4: Compact heap 63.812ms
[1.193s][info][gc,heap ] GC(1) Eden regions: 1->0(25)
[1.193s][info][gc,heap ] GC(1) Survivor regions: 2->0(4)
[1.193s][info][gc,heap ] GC(1) Old regions: 0->3
[1.193s][info][gc,heap ] GC(1) Archive regions: 0->0
[1.193s][info][gc,heap ] GC(1) Humongous regions: 0->0
[1.193s][info][gc,metaspace ] GC(1) Metaspace: 6895K(7104K)->6895K(7104K) NonClass: 6323K(6400K)->6323K(6400K) Class: 571K(704K)->571K(704K)
[1.193s][info][gc ] GC(1) Pause Full (WhiteBox Initiated Full GC) 1M->0M(512M) 84.846ms
[1.202s][info][gc,cpu ] GC(1) User=0.19s Sys=0.63s Real=0.11s

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

我用kafka两年踩过的一些非比寻常的坑

发表于 2021-02-25

前言

我的上家公司是做餐饮系统的,每天中午和晚上用餐高峰期,系统的并发量不容小觑。为了保险起见,公司规定各部门都要在吃饭的时间轮流值班,防止出现线上问题时能够及时处理。

我当时在后厨显示系统团队,该系统属于订单的下游业务。用户点完菜下单后,订单系统会通过发kafka消息给我们系统,系统读取消息后,做业务逻辑处理,持久化订单和菜品数据,然后展示到划菜客户端。这样厨师就知道哪个订单要做哪些菜,有些菜做好了,就可以通过该系统出菜。系统自动通知服务员上菜,如果服务员上完菜,修改菜品上菜状态,用户就知道哪些菜已经上了,哪些还没有上。这个系统可以大大提高后厨到用户的效率。


事实证明,这一切的关键是消息中间件:kafka,如果它有问题,将会直接影响到后厨显示系统的功能。

最近无意间获得一份BAT大厂大佬写的刷题笔记,一下子打通了我的任督二脉,越来越觉得算法没有想象中那么难了。
BAT大佬写的刷题笔记,让我offer拿到手软

接下来,我跟大家一起聊聊使用kafka两年时间踩过哪些坑?

顺序问题

1. 为什么要保证消息的顺序?

刚开始我们系统的商户很少,为了快速实现功能,我们没想太多。既然是走消息中间件kafka通信,订单系统发消息时将订单详细数据放在消息体,我们后厨显示系统只要订阅topic,就能获取相关消息数据,然后处理自己的业务即可。

不过这套方案有个关键因素:要保证消息的顺序。

为什么呢?

订单有很多状态,比如:下单、支付、完成、撤销等,不可能下单的消息都没读取到,就先读取支付或撤销的消息吧,如果真的这样,数据不是会产生错乱?

好吧,看来保证消息顺序是有必要的。

2.如何保证消息顺序?

我们都知道kafka的topic是无序的,但是一个topic包含多个partition,每个partition内部是有序的。

如此一来,思路就变得清晰了:只要保证生产者写消息时,按照一定的规则写到同一个partition,不同的消费者读不同的partition的消息,就能保证生产和消费者消息的顺序。

我们刚开始就是这么做的,同一个商户编号的消息写到同一个partition,topic中创建了4个partition,然后部署了4个消费者节点,构成消费者组,一个partition对应一个消费者节点。从理论上说,这套方案是能够保证消息顺序的。

一切规划得看似“天衣无缝”,我们就这样”顺利“上线了。

3.出现意外

该功能上线了一段时间,刚开始还是比较正常的。

但是,好景不长,很快就收到用户投诉,说在划菜客户端有些订单和菜品一直看不到,无法划菜。

我定位到了原因,公司在那段时间网络经常不稳定,业务接口时不时报超时,业务请求时不时会连不上数据库。

这种情况对顺序消息的打击,可以说是毁灭性的。

为什么这么说?

假设订单系统发了:”下单“、”支付“、”完成“ 三条消息。

而”下单“消息由于网络原因我们系统处理失败了,而后面的两条消息的数据是无法入库的,因为只有”下单“消息的数据才是完整的数据,其他类型的消息只会更新状态。

加上,我们当时没有做失败重试机制,使得这个问题被放大了。问题变成:一旦”下单“消息的数据入库失败,用户就永远看不到这个订单和菜品了。

那么这个紧急的问题要如何解决呢?
最近我建了新的技术交流群,打算将它打造成高质量的活跃群,欢迎小伙伴们加入。

我以往的技术群里技术氛围非常不错,大佬很多。

image.png

加微信:su_san_java,备注:加群,即可加入该群。

4.解决过程

最开始我们的想法是:在消费者处理消息时,如果处理失败了,立马重试3-5次。但如果有些请求要第6次才能成功怎么办?不可能一直重试呀,这种同步重试机制,会阻塞其他商户订单消息的读取。

显然用上面的这种同步重试机制在出现异常的情况,会严重影响消息消费者的消费速度,降低它的吞吐量。

如此看来,我们不得不用异步重试机制了。

如果用异步重试机制,处理失败的消息就得保存到重试表下来。

但有个新问题立马出现:只存一条消息如何保证顺序?

存一条消息的确无法保证顺序,假如:”下单“消息失败了,还没来得及异步重试。此时,”支付“消息被消费了,它肯定是不能被正常消费的。

此时,”支付“消息该一直等着,每隔一段时间判断一次,它前面的消息都有没有被消费?

如果真的这么做,会出现两个问题:

  1. ”支付“消息前面只有”下单“消息,这种情况比较简单。但如果某种类型的消息,前面有N多种消息,需要判断多少次呀,这种判断跟订单系统的耦合性太强了,相当于要把他们系统的逻辑搬一部分到我们系统。
  2. 影响消费者的消费速度

这时有种更简单的方案浮出水面:消费者在处理消息时,先判断该订单号在重试表有没有数据,如果有则直接把当前消息保存到重试表。如果没有,则进行业务处理,如果出现异常,把该消息保存到重试表。

后来我们用elastic-job建立了失败重试机制,如果重试7次后还是失败,则将该消息的状态标记为失败,发邮件通知开发人员。

终于由于网络不稳定,导致用户在划菜客户端有些订单和菜品一直看不到的问题被解决了。现在商户顶多偶尔延迟看到菜品,比一直看不菜品好太多。

消息积压

随着销售团队的市场推广,我们系统的商户越来越多。随之而来的是消息的数量越来越大,导致消费者处理不过来,经常出现消息积压的情况。对商户的影响非常直观,划菜客户端上的订单和菜品可能半个小时后才能看到。一两分钟还能忍,半个消息的延迟,对有些暴脾气的商户哪里忍得了,马上投诉过来了。我们那段时间经常接到商户投诉说订单和菜品有延迟。

虽说,加服务器节点就能解决问题,但是按照公司为了省钱的惯例,要先做系统优化,所以我们开始了消息积压问题解决之旅。

1. 消息体过大

虽说kafka号称支持百万级的TPS,但从producer发送消息到broker需要一次网络IO,broker写数据到磁盘需要一次磁盘IO(写操作),consumer从broker获取消息先经过一次磁盘IO(读操作),再经过一次网络IO。

一次简单的消息从生产到消费过程,需要经过2次网络IO和2次磁盘IO。如果消息体过大,势必会增加IO的耗时,进而影响kafka生产和消费的速度。消费者速度太慢的结果,就会出现消息积压情况。

除了上面的问题之外,消息体过大,还会浪费服务器的磁盘空间,稍不注意,可能会出现磁盘空间不足的情况。

此时,我们已经到了需要优化消息体过大问题的时候。

如何优化呢?

我们重新梳理了一下业务,没有必要知道订单的中间状态,只需知道一个最终状态就可以了。

如此甚好,我们就可以这样设计了:

  1. 订单系统发送的消息体只用包含:id和状态等关键信息。
  2. 后厨显示系统消费消息后,通过id调用订单系统的订单详情查询接口获取数据。
  3. 后厨显示系统判断数据库中是否有该订单的数据,如果没有则入库,有则更新。

果然这样调整之后,消息积压问题很长一段时间都没再出现。

2. 路由规则不合理

还真别高兴的太早,有天中午又有商户投诉说订单和菜品有延迟。我们一查kafka的topic竟然又出现了消息积压。

但这次有点诡异,不是所有partition上的消息都有积压,而是只有一个。

刚开始,我以为是消费那个partition消息的节点出了什么问题导致的。但是经过排查,没有发现任何异常。

这就奇怪了,到底哪里有问题呢?

后来,我查日志和数据库发现,有几个商户的订单量特别大,刚好这几个商户被分到同一个partition,使得该partition的消息量比其他partition要多很多。

这时我们才意识到,发消息时按商户编号路由partition的规则不合理,可能会导致有些partition消息太多,消费者处理不过来,而有些partition却因为消息太少,消费者出现空闲的情况。

为了避免出现这种分配不均匀的情况,我们需要对发消息的路由规则做一下调整。

我们思考了一下,用订单号做路由相对更均匀,不会出现单个订单发消息次数特别多的情况。除非是遇到某个人一直加菜的情况,但是加菜是需要花钱的,所以其实同一个订单的消息数量并不多。

调整后按订单号路由到不同的partition,同一个订单号的消息,每次到发到同一个partition。


调整后,消息积压的问题又有很长一段时间都没有再出现。我们的商户数量在这段时间,增长的非常快,越来越多了。

3. 批量操作引起的连锁反应

在高并发的场景中,消息积压问题,可以说如影随形,真的没办法从根本上解决。表面上看,已经解决了,但后面不知道什么时候,就会冒出一次,比如这次:

有天下午,产品过来说:有几个商户投诉过来了,他们说菜品有延迟,快查一下原因。

这次问题出现得有点奇怪。

为什么这么说?

首先这个时间点就有点奇怪,平常出问题,不都是中午或者晚上用餐高峰期吗?怎么这次问题出现在下午?

根据以往积累的经验,我直接看了kafka的topic的数据,果然上面消息有积压,但这次每个partition都积压了十几万的消息没有消费,比以往加压的消息数量增加了几百倍。这次消息积压得极不寻常。

我赶紧查服务监控看看消费者挂了没,还好没挂。又查服务日志没有发现异常。这时我有点迷茫,碰运气问了问订单组下午发生了什么事情没?他们说下午有个促销活动,跑了一个JOB批量更新过有些商户的订单信息。

这时,我一下子如梦初醒,是他们在JOB中批量发消息导致的问题。怎么没有通知我们呢?实在太坑了。

虽说知道问题的原因了,倒是眼前积压的这十几万的消息该如何处理呢?

此时,如果直接调大partition数量是不行的,历史消息已经存储到4个固定的partition,只有新增的消息才会到新的partition。我们重点需要处理的是已有的partition。

直接加服务节点也不行,因为kafka允许多个partition被同组的一个consumer消费,但不允许一个partition被同组的多个consumer消费,可能会造成资源浪费。

看来只有用多线程处理了。

为了紧急解决问题,我改成了用线程池处理消息,核心线程和最大线程数都配置成了50。

调整之后,果然,消息积压数量不断减少。

但此时有个更严重的问题出现:我收到了报警邮件,有两个订单系统的节点down机了。

不久,订单组的同事过来找我说,我们系统调用他们订单查询接口的并发量突增,超过了预计的好几倍,导致有2个服务节点挂了。他们把查询功能单独整成了一个服务,部署了6个节点,挂了2个节点,再不处理,另外4个节点也会挂。订单服务可以说是公司最核心的服务,它挂了公司损失会很大,情况万分紧急。

为了解决这个问题,只能先把线程数调小。

幸好,线程数是可以通过zookeeper动态调整的,我把核心线程数调成了8个,核心线程数改成了10个。

后面,运维把订单服务挂的2个节点重启后恢复正常了,以防万一,再多加了2个节点。为了确保订单服务不会出现问题,就保持目前的消费速度,后厨显示系统的消息积压问题,1小时候后也恢复正常了。

后来,我们开了一次复盘会,得出的结论是:

  1. 订单系统的批量操作一定提前通知下游系统团队。
  2. 下游系统团队多线程调用订单查询接口一定要做压测。
  3. 这次给订单查询服务敲响了警钟,它作为公司的核心服务,应对高并发场景做的不够好,需要做优化。
  4. 对消息积压情况加监控

顺便说一下,对于要求严格保证消息顺序的场景,可以将线程池改成多个队列,每个队列用单线程处理。

4. 表过大

为了防止后面再次出现消息积压问题,消费者后面就一直用多线程处理消息。

但有天中午我们还是收到很多报警邮件,提醒我们kafka的topic消息有积压。我们正在查原因,此时产品跑过来说:又有商户投诉说菜品有延迟,赶紧看看。这次她看起来有些不耐烦,确实优化了很多次,还是出现了同样的问题。

在外行看来:为什么同一个问题一直解决不了?

其实技术心里的苦他们是不知道的。

表面上问题的症状是一样的,都是出现了菜品延迟,他们知道的是因为消息积压导致的。但是他们不知道深层次的原因,导致消息积压的原因其实有很多种。这也许是使用消息中间件的通病吧。

我沉默不语,只能硬着头皮定位原因了。

后来我查日志发现消费者消费一条消息的耗时长达2秒。以前是500毫秒,现在怎么会变成2秒呢?

奇怪了,消费者的代码也没有做大的调整,为什么会出现这种情况呢?

查了一下线上菜品表,单表数据量竟然到了几千万,其他的划菜表也是一样,现在单表保存的数据太多了。

我们组梳理了一下业务,其实菜品在客户端只展示最近3天的即可。

这就好办了,我们服务端存着多余的数据,不如把表中多余的数据归档。于是,DBA帮我们把数据做了归档,只保留最近7天的数据。

如此调整后,消息积压问题被解决了,又恢复了往日的平静。

主键冲突

别高兴得太早了,还有其他的问题,比如:报警邮件经常报出数据库异常: Duplicate entry '6' for key 'PRIMARY',说主键冲突。

出现这种问题一般是由于有两个以上相同主键的sql,同时插入数据,第一个插入成功后,第二个插入的时候会报主键冲突。表的主键是唯一的,不允许重复。

我仔细检查了代码,发现代码逻辑会先根据主键从表中查询订单是否存在,如果存在则更新状态,不存在才插入数据,没得问题。

这种判断在并发量不大时,是有用的。但是如果在高并发的场景下,两个请求同一时刻都查到订单不存在,一个请求先插入数据,另一个请求再插入数据时就会出现主键冲突的异常。

解决这个问题最常规的做法是:加锁。

我刚开始也是这样想的,加数据库悲观锁肯定是不行的,太影响性能。加数据库乐观锁,基于版本号判断,一般用于更新操作,像这种插入操作基本上不会用。

剩下的只能用分布式锁了,我们系统在用redis,可以加基于redis的分布式锁,锁定订单号。

但后面仔细思考了一下:

  1. 加分布式锁也可能会影响消费者的消息处理速度。
  2. 消费者依赖于redis,如果redis出现网络超时,我们的服务就悲剧了。

所以,我也不打算用分布式锁。

而是选择使用mysql的INSERT INTO ...ON DUPLICATE KEY UPDATE语法:

1
2
3
4
5
6
sql复制代码INSERT INTO table (column_list)
VALUES (value_list)
ON DUPLICATE KEY UPDATE
c1 = v1,
c2 = v2,
...;

它会先尝试把数据插入表,如果主键冲突的话那么更新字段。

把以前的insert语句改造之后,就没再出现过主键冲突问题。
最近我建了新的技术交流群,打算将它打造成高质量的活跃群,欢迎小伙伴们加入。

我以往的技术群里技术氛围非常不错,大佬很多。

image.png

加微信:su_san_java,备注:加群,即可加入该群。

数据库主从延迟

不久之后的某天,又收到商户投诉说下单后,在划菜客户端上看得到订单,但是看到的菜品不全,有时甚至订单和菜品数据都看不到。

这个问题跟以往的都不一样,根据以往的经验先看kafka的topic中消息有没有积压,但这次并没有积压。

再查了服务日志,发现订单系统接口返回的数据有些为空,有些只返回了订单数据,没返回菜品数据。

这就非常奇怪了,我直接过去找订单组的同事。他们仔细排查服务,没有发现问题。这时我们不约而同的想到,会不会是数据库出问题了,一起去找DBA。果然,DBA发现数据库的主库同步数据到从库,由于网络原因偶尔有延迟,有时延迟有3秒。

如果我们的业务流程从发消息到消费消息耗时小于3秒,调用订单详情查询接口时,可能会查不到数据,或者查到的不是最新的数据。

这个问题非常严重,会导致直接我们的数据错误。

为了解决这个问题,我们也加了重试机制。调用接口查询数据时,如果返回数据为空,或者只返回了订单没有菜品,则加入重试表。

调整后,商户投诉的问题被解决了。

重复消费

kafka消费消息时支持三种模式:

  • at most once模式
    最多一次。保证每一条消息commit成功之后,再进行消费处理。消息可能会丢失,但不会重复。
  • at least once模式
    至少一次。保证每一条消息处理成功之后,再进行commit。消息不会丢失,但可能会重复。
  • exactly once模式
    精确传递一次。将offset作为唯一id与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。

kafka默认的模式是at least once,但这种模式可能会产生重复消费的问题,所以我们的业务逻辑必须做幂等设计,否则可能会产生重复数据。

而我们的业务场景保存数据时使用了INSERT INTO ...ON DUPLICATE KEY UPDATE语法,不存在时插入,存在时更新,是天然支持幂等性的。

多环境消费问题

我们当时线上环境分为:pre(预发布环境) 和 prod(生产环境),两个环境共用同一个数据库,并且共用同一个kafka集群。

需要注意的是,在配置kafka的topic的时候,要加前缀用于区分不同环境。pre环境的以pre_开头,比如:pre_order,生产环境以prod_开头,比如:prod_order,防止消息在不同环境中串了。

但有次运维在pre环境切换节点,配置topic的时候,配错了,配成了prod的topic。刚好那天,我们有新功能上pre环境。结果悲剧了,prod的有些消息被pre环境的consumer消费了,而由于消息体做了调整,导致pre环境的consumer处理消息一直失败。

其结果是生产环境丢了部分消息。不过还好,最后生产环境消费者通过重置offset,重新读取了那一部分消息解决了问题,没有造成太大损失。

后记

除了上述问题之外,我还遇到过:

  • kafka的consumer使用自动确认机制,导致cpu使用率100%。
  • kafka集群中的一个broker节点挂了,重启后又一直挂。

最近无意间获得一份BAT大厂大佬写的刷题笔记,一下子打通了我的任督二脉,越来越觉得算法没有想象中那么难了。
BAT大佬写的刷题笔记,让我offer拿到手软

这两个问题说起来有些复杂,我就不一一列举了,有兴趣的朋友可以关注我的公众号,加我的微信找我私聊。

非常感谢那两年使用消息中间件kafka的经历,虽说遇到过挺多问题,踩了很多坑,也走了很多弯路,但是实打实的让我积累了很多宝贵的经验,快速成长了。

其实kafka是一个非常优秀的消息中间件,我所遇到的绝大多数问题,都并非kafka自身的问题(除了cpu使用率100%是它的一个bug导致的之外)。
最近我建了新的技术交流群,打算将它打造成高质量的活跃群,欢迎小伙伴们加入。

我以往的技术群里技术氛围非常不错,大佬很多。

image.png

加微信:su_san_java,备注:加群,即可加入该群。

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙扫描下发二维码关注一下,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

关注公众号:【苏三说技术】,在公众号中回复:面试、代码神器、开发手册、时间管理有超赞的粉丝福利,另外回复:加群,可以跟很多BAT大厂的前辈交流和学习。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

体验Java8的流式编程 介绍 流创建 中间操作 终端操作

发表于 2021-02-25

介绍

JDK 8 不止新增了 Lambda 表达式,还有 Stream 流 ,程序员通过 Stream 流来简化对数据的处理。其本质就是计算。

可以这么理解:流就是数据通道,用于操作数据源所生成的元素序列。

我们来熟悉一下 Stream 流:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class StringSorting {
public static void main(String[] args) {
Stream.of("Java", "Python", "C++",
"C", "Shell", "Ruby",
"Scala", "Groovy", "Kotlin",
"Clojure", "Jython", "C#",
"JavaScript", "SQL")
.sorted() // 自然序排序
.forEach(s -> System.out.printf("%s ", s)); // 打印输出
}
}

输出结果:

1
shell复制代码C C# C++ Clojure Groovy Java JavaScript Jython Kotlin Python Ruby SQL Scala Shell

由上面的例子,我们可以得知使用 Stream 的步骤:

  1. 创建流。例如上面的 Stream.of()
  2. 对流的中间操作。例如上面的 sorted(),在流中对数据进行排序。
  3. 最终操作。例如上面的 forEach(),将流通过该方法进行打印。

流创建

流的创建主要有以下几种方式:

以下几种方式可以创建 Stream 流:

  • Arrays 数组工具类提供的 stream() 静态方法。
  • Stream 类中 of()、generate()、iterate()和 empty() 静态方法。
  • Stream 类中的 builder() 方法。
  • Collection 集合提供的 stream() 方法与 parallelStream() 方法。

还有其它的一些产生流的方法就不一一列举了。下面讲解一下上述创建 Stream 流的方法

Arrays.stream() 静态方法

以下是 Arrays 关于产生流的方法 stream() 及重载方法:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class Arrays {
...省略...
public static <T> Stream<T> stream(T[] array) {...}
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {...}
public static IntStream stream(int[] array) {...}
public static IntStream stream(int[] array, int startInclusive, int endExclusive) {...}
public static LongStream stream(long[] array) {...}
public static LongStream stream(long[] array, int startInclusive, int endExclusive) {...}
public static DoubleStream stream(double[] array) {...}
public static DoubleStream stream(double[] array, int startInclusive, int endExclusive) {...}
...省略...
}

由以上方法可知,通过重载形式,可以满足我们调用的基本类型与泛型的数组转化为 Stream 流。

下面是使用 Arrays.stream() 来将数组转换为流的例子:

1
2
3
4
5
6
java复制代码public class ArraysStream {
public static void main(String[] args) {
Arrays.stream(new double[] {3.1415926, 9.8, 3.3333})
.forEach(System.out::println);
}
}

Stream 的静态方法

Stream 流中共有 of()、generate() 和 iterate() 三个静态方法。

我们先看一下 Stream.of() 静态方法的源码:

1
2
3
4
5
6
7
8
9
java复制代码public interface Stream<T> extends BaseStream<T, Stream<T>> {
...省略...
@SafeVarargs
@SuppressWarnings("varargs") // Creating a stream from an array is safe
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
...省略...
}

由以上源码可知,使用 Stream.of() 静态方法,是将可变元素传递到 Arrays 数组工具类的静态方法 stream() 去转换为流,其实质还是调用 Arrays.stream() 方法。

下面我们来看一下,使用 Stream.of() 来将可变参数数组转化为流的例子:

1
2
3
4
5
6
7
java复制代码public class StreamOf {
public static void main(String[] args) {
// 将一组数字转换为流
Stream.of(9.8, 3.1415926, 3.33333)
.forEach(System.out::println);
}
}

Stream.of() 静态方法的源码和例子如上所述。·

Stream 还有产生无限元素的静态方法用于产生流,它就是generate() 静态方法:

1
2
3
4
5
java复制代码public interface Stream<T> extends BaseStream<T, Stream<T>> {
...省略...
public static<T> Stream<T> generate(Supplier<? extends T> s) {...}
...省略...
}

使用 generate() 方法创建的流,长度是无限的,它的参数是 Supplier 函数接口。

1
2
3
4
5
6
7
java复制代码public class StreamGenerate {
public static void main(String[] args) {
Stream.generate(Math::random)
.limit(10)
.forEach(i -> System.out.printf("%f ", i));
}
}

输出结果:

1
shell复制代码0.792192 0.625073 0.983115 0.372405 0.294238 0.790635 0.688823 0.238186 0.096708 0.434963

我们在看另一个创建无限流的静态方法 Stream.iterate() :

1
2
3
4
5
6
7
java复制代码public interface Stream<T> extends BaseStream<T, Stream<T>> {
...省略...
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {...}

public static<T> Stream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {}
...省略...
}

上述两个静态方法都是 iterate 用来创建无限流的,与 generate 方法不同的是,它是通过函数f迭代给指定的种子而产生无限连续有序的Stream。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class StreamIterate {
int x = 1;
Stream<Integer> numbers() {
return Stream.iterate(0, i -> {
int result = x + i;
x = i;
return result;
});
}
public static void main(String[] args) {
new StreamIterate().numbers()
.skip(20) // 过滤前 20 个
.limit(10) // 然后取 10 个
.forEach(i -> System.out.printf("%d ", i));
}
}

输出结果:

1
shell复制代码6765 10946 17711 28657 46368 75025 121393 196418 317811 514229

Stream 类中还有一个创建不含任何元素的 Stream 流:

1
2
3
4
5
java复制代码public interface Stream<T> extends BaseStream<T, Stream<T>> {
...省略...
public static<T> Stream<T> empty() {...}
...省略...
}

创建空流的例子如下:

1
2
3
4
5
java复制代码public class StreamEmpty {
public static void main(String[] args) {
Stream<Object> empty = Stream.empty();
}
}

Stream的构造器模式

Stream 类还可以使用构建器 builder() 方法创建流。

1
2
3
4
5
6
7
8
9
java复制代码public class StreamBuilder {
public static void main(String[] args) {
Stream.builder()
.add("Jan").add("Feb").add("Mar")
.add("Apr").add("May").add("Jun")
.build()
.forEach(s -> System.out.printf("%s ", s));
}
}

输出结果:

1
shell复制代码Jan Feb Mar Apr May Jun

Collection系列

Java 8 中,扩展了 Collection 接口,添加了两个默认方法,来创建流:

1
2
3
4
5
java复制代码public interface Collection<E> extends Iterable<E> {
...省略...
default Stream<E> stream() {...}
default Stream<E> parallelStream() {...}
}

上面的两个默认方法,stream() 是转化为顺序流,paralletlStream() 转换为并行流。

下面我们看一下两个默认方法的使用例子:

1
2
3
4
5
6
7
8
9
java复制代码public class ListStream {
public static void main(String[] args) {
String[] months = {"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", "Jun", "Jun", "Feb", "Jan"};
// 集合创建
List<String> list = Arrays.asList(months);
list.stream().forEach(s -> System.out.format("%s ", s));
}
}

输出结果:

1
shell复制代码Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec

但将 List 对象的 stream() 方法变换为 parallelStream() 方法后,

输出结果:

1
java复制代码Aug Jul Jun Feb Dec Oct Nov Apr Mar May Jan Sep

从上面我们可以看出,stream() 与 parallelStream() 的区别:

stream() 是转化为顺序流,它使用主线程,是单线程的;parallelStream() 是转化为并行流,它是多个线程同时运行的。因此,stream() 是按顺序输出的,而parallelStream() 不是。

那我们如何将 Map 关系映射表转化为流呢?

看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class MapStream {
public static void main(String[] args) {
Map<String, Integer> users = new HashMap<>();
users.put("小赵", 18);
users.put("小钱", 29);
users.put("小孙", 20);
users.put("小李", 29);

users.entrySet().stream().forEach(entry -> System.out.format("%s ", entry));
System.out.println();
users.keySet().stream().forEach(key -> System.out.format("%s ", key));
System.out.println();
users.values().stream().forEach(value -> System.out.format("%d ", value));
}
}

输出结果:

1
2
3
shell复制代码小孙=20 小李=29 小钱=29 小赵=18 
小孙 小李 小钱 小赵
20 29 29 18

从上面的例子就可得知,我们得到 Map 的 Entity 集合,Key 的集合和Value的集合,在将它们转化为流去处理。

中间操作

中间操作就是对上述创建的流进行处理,处理完返回的还是流,以便于其它操作。

我们来看一下都有什么中间操作:peek()、sorted()、unordered()、distinct()、filter()、map()、flatMap()、limit()、skip() 等。下面我们来详细介绍这些操作。

peek

peek 操作的方法如下:

1
java复制代码Stream<T> peek(Consumer<? super T> action);

主要目的是接收一个 Consumer 函数提供的逻辑去对流中的元素进行操作。

peek() 操作的目的是帮助调试。它允许你无修改地查看流中的元素。代码示例:

1
2
3
4
5
6
7
java复制代码public class Peeking {
public static void main(String[] args) {
Stream.of("Hello", "Hello World!", "I'm hello")
.peek(System.out::println)
.collect(Collectors.toList());
}
}

输出结果:

1
2
3
shell复制代码Hello
Hello World!
I'm hello

再看下一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class Peeking {
public static void main(String[] args) {
List<String> list = Stream.of("Hello", "Hello World!", "I'm hello")
.map(String::toUpperCase)
.collect(Collectors.toList());
System.out.println(list);

list = Stream.of("Hello", "Hello World!", "I'm hello")
.peek(String::toUpperCase)
.collect(Collectors.toList());
System.out.println(list);
}
}

输出结果:

1
2
shell复制代码[HELLO, HELLO WORLD!, I'M HELLO]
[Hello, Hello World!, I'm hello]

可以看出使用map() 操作对元素进行了转换,而使用 peek() 并没有对元素进行转换。

因此,peek() 一般用于不想改变流中元素本身的类型或者只想操作元素的内部状态时;而 map() 则用于改变流中元素本身类型,即从元素中派生出另一种类型的操作。

peek() 适用于对 Stream<T> 中 泛型的某些属性进行批处理的时候使用。

sorted

sorted 操作的方法如下:

1
2
3
java复制代码Stream<T> sorted();
// 或者
Stream<T> sorted(Comparator<? super T> comparator);

sorted() 可以使用无参方法,也可以传入 Comparator 参数。代码示例:

1
2
3
4
5
6
7
8
9
10
java复制代码public class SortedComparator {
public static void main(String[] args) throws Exception {
FileToWords.stream("Cheese.dat")
.skip(10)
.limit(10)
.sorted(Comparator.reverseOrder())
.map(w -> w + " ")
.forEach(System.out::print);
}
}

输出结果:

1
shell复制代码you what to the that sir leads in district And

sorted() 预设了一些默认的比较器。这里我们使用的是反转 “自然排序”。当然你也可以把 Lambda 函数作为参数传递给 sorted()。

distinct

distinct 的方法如下:

1
java复制代码Stream<T> distinct();

它的作用主要是去重:

1
2
3
4
5
6
java复制代码public class DistinctOperator {
public static void main(String[] args) {
Stream.of(45, 31, 21, 98, 31, 55, 982, 45, 54)
.distinct().forEach(n -> System.out.format("%d ", n));
}
}

输出结果:

1
shell复制代码45 31 21 98 55 982 54

filter

filter 的方法如下:

1
java复制代码Stream<T> filter(Predicate<? super T> predicate);

该方法是过滤操作,把不想要的数据过滤掉,留下想要的的数据。会根据传入的 Predicate 函数,将不符合的数据过滤掉,留下符合 Predicate 函数的数据。

我们看下面的例子来熟悉一下 filter() 方法的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码class Month {
private String name;
private int id;
public Month(String name, int id) { this.name = name;this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getId() { return id; }
public void setId(int id) { this.id = id; }
@Override
public String toString() {
return "Month{" + "name='" + name + '\'' + ", id=" + id + '}';
}
}
public class FilterOperator {
public static void main(String[] args) {
String[] strings = new String[]{"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
Month[] months = new Month[strings.length];
for (int i = 0; i < strings.length; i++) {
months[i] = new Month(strings[i], i+1);
}
Stream.of(months)
.filter(s -> "J".equals(s.getName().substring(0, 1)))
.forEach(System.out::println);
}
}

输出结果:

1
2
3
shell复制代码Month{name='Jan', id=1}
Month{name='Jun', id=6}
Month{name='Jul', id=7}

从结果中可以看到,凡是首字母与 J 相等的对象都被打印出来了。

map

map 的方法如下:

1
java复制代码<R> Stream<R> map(Function<? super T, ? extends R> mapper);

它接收一个 Function 函数,作用是将一个类型的对象转换为另一个类型的对象。

下面我们编写一个例子来看一下它的操作方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码class Student {
private String name;
private int age;
public Student(String name, int age) { this.name = name;this.age = age; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
@Override
public String toString() {
return "Student{" + "name='" + name + '\'' + ", age=" + age + '}';
}
}
public class MapOperator {
public static void main(String[] args) {
String[] strings = new String[]{"小赵", "小钱", "小孙",
"小李", "小周", "小吴", "小郑", "小王"};
int[] ints = new int[] {20 , 18, 31, 28, 20, 25, 31, 20};
Student[] students = new Student[strings.length];
for (int i = 0; i < strings.length; i++) {
students[i] = new Student(strings[i], ints[i]);
}
Stream.of(students).map(Student::getName).forEach(s -> System.out.printf("%s ", s));
System.out.println();
Stream.of(students).mapToInt(Student::getAge).forEach(i -> System.out.printf("%d ", i));
System.out.println();
}
}

输出结果:

1
2
shell复制代码小赵 小钱 小孙 小李 小周 小吴 小郑 小王 
20 18 31 28 20 25 31 20

从上面的例子可知,第一个输出是从 Student 对象数组中将所有学生的姓名都获取出来并打印,第二个是打印年龄。

从第二个可以看出,源码已经封装好了转换为基本类型的 mapToInt 方法,返回的结果为 IntStream,其它所有关于 map 操作的源码如下:

1
2
3
4
5
6
java复制代码// 根据 `ToIntFunction` 函数,获取 `IntStream` 流
IntStream mapToInt(ToIntFunction<? super T> mapper);
// 根据 `ToLongFunction` 函数,获取 `LongStream` 流
LongStream mapToLong(ToLongFunction<? super T> mapper);
// 根据 `ToDoubleFunction` 函数,获取 `DoubleStream` 流
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);

flatMap

flatMap 的方法如下:

1
java复制代码<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

该方法接收一个 Function 函数作为参数,将流中的每个值都转换成另一个流,然后把所有流连接成一个流,是扁平化操作。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码class Employee {
private int id;
private String name;
private double salary;
public Employee(int id, String name, double salary) { this.id = id;this.name = name;this.salary = salary; }
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public double getSalary() { return salary; }
public void setSalary(double salary) { this.salary = salary; }
@Override
public String toString() {
return "Employee{" + "id=" + id + ", name='" + name + '\'' + ", salary=" + salary + '}';
}
}
public class FlatMapOperator {
public static void main(String[] args) {
Employee[] employees1 = new Employee[] {
new Employee(1, "小赵", 1500),
new Employee(2, "小钱", 3500),
new Employee(3, "小孙", 500),
new Employee(4, "小李", 4500),
};
Employee[] employees2 = new Employee[] {
new Employee(5, "小周", 9000),
new Employee(6, "小吴", 6200),
new Employee(7, "小郑", 1850),
new Employee(8, "小王", 3210),
};
Stream.of(employees1, employees2).flatMap(e -> Arrays.stream(e)).forEach(System.out::println);
}
}

输出结果:

1
2
3
4
5
6
7
8
shell复制代码Employee{id=1, name='小赵', salary=1500.0}
Employee{id=2, name='小钱', salary=3500.0}
Employee{id=3, name='小孙', salary=500.0}
Employee{id=4, name='小李', salary=4500.0}
Employee{id=5, name='小周', salary=9000.0}
Employee{id=6, name='小吴', salary=6200.0}
Employee{id=7, name='小郑', salary=1850.0}
Employee{id=8, name='小王', salary=3210.0}

从结果上看,flatMap() 将两个数组转换成流并合并在一起。

flatMap() 也有封装好的具体类型的方法,源码如下:

1
2
3
java复制代码IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);

limit

limit 的方法如下:

1
java复制代码Stream<T> limit(long maxSize);

它的作用是根据 maxSize 参数的大小,截取流,使其元素个数不超过 maxSize。

我们给一个例子来熟悉 limit() 的使用:

1
2
3
4
5
6
7
java复制代码public class LimitOperator {
public static void main(String[] args) {
String[] strings = new String[]{"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
Stream.of(strings).limit(5).forEach(s -> System.out.printf("%s ", s));
}
}

输出结果:

1
shell复制代码Jan Feb Mar Apr May

skip

skip 的方法如下:

1
java复制代码Stream<T> skip(long n);

跳过元素,返回一个扔掉了前 n 个元素的流。若流中元素不足 n 个,则返回一个空流。与 limit 互补。

1
2
3
4
5
6
7
java复制代码public class SkipOperator {
public static void main(String[] args) {
String[] strings = new String[]{"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
Stream.of(strings).skip(10).forEach(s -> System.out.printf("%s ", s));
}
}

输出结果:

1
java复制代码Nov Dec

终端操作

终端操作就是为了结束流的中间操作,并返回结果。至此无法再继续往后传递流。

终端操作都有哪些操作呢?我们简单来看下:forEach()、forEachOrdered()、toArray()、collect()、reduce()、min()、max()、count()、anyMatch()、allMatch()、noneMatch()、findFirst()、findAny()等。下面详细介绍一下这些终端操作的使用。

forEach

forEach 的方法如下:

1
java复制代码void forEach(Consumer<? super T> action);

主要是进行编译操作,我们传递进去一个 Consumer 函数,Stream 在内部进行迭代。

1
2
3
4
5
6
7
java复制代码public class ForEachOperator {
public static void main(String[] args) {
String[] strings = new String[]{"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
Stream.of(strings).forEach(s -> System.out.printf("%s ", s));
}
}

forEachOrdered

forEachOrdered 的方法如下:

1
java复制代码void forEachOrdered(Consumer<? super T> action);

它的作用与 forEach() 一样,但是在并行流上的区别较大

1
2
3
4
5
6
7
8
9
java复制代码public class ForEachOrderedOperator {
public static void main(String[] args) {
String[] strings = new String[]{"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
Stream.of(strings).parallel().forEach(s -> System.out.printf("%s ", s));
System.out.println();
Stream.of(strings).parallel().forEachOrdered(s -> System.out.printf("%s ", s));
}
}

输出结果:

1
2
java复制代码Aug May Dec Oct Jun Feb Apr Nov Mar Jan Jul Sep 
Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec

从输出结果可知,使用并行流进行操作时,forEachOrdered() 会严格按照顺序进行输出,而forEach() 不会。

toArray

toArray 的方法如下:

1
2
java复制代码Object[] toArray();
<A> A[] toArray(IntFunction<A[]> generator);

toArray 有两个方法,一个无参,一个传入

我们给出一个例子,来熟悉使用 toArray() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class ToArrayOperator {
public static void main(String[] args) {
List<String> strings = Arrays.asList("Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec");

// 使用 toArray() 将流转化为数组
Object[] objects = strings.stream()
.filter(s -> "J".equals(s.substring(0, 1))).toArray();
System.out.println(Arrays.toString(objects));

// 使用 toArray() 将流转化为特定的数组
String[] strings1 = strings.stream()
.filter(s -> "A".equals(s.substring(0, 1))).toArray(size -> {
return new String[size];
});
System.out.println(Arrays.toString(strings1));

// 上面的写法可以使用方法引用
String[] strings3 = strings.stream()
.filter(s -> "M".equals(s.substring(0, 1))).toArray(String[]::new);
System.out.println(Arrays.toString(strings3));
}
}

输出结果:

1
2
3
shell复制代码[Jan, Jun, Jul]
[Apr, Aug]
[Mar, May]

从结果上看,我们知道 toArray() 方法就是将流转换为数组。

collect

collect 的方法如下:

1
2
3
4
5
java复制代码<R, A> R collect(Collector<? super T, A, R> collector);

<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);

collect 是收集操作。通过流将其转换为其它常用的数据结构并收集,比如转换成 List、Map 等操作。

从上述源码中,看出 collect 方法一共有两个方法。

第一个方法从接收的参数可以看出,collect 方法是通过 java.util.stream.Collector 类来收集流元素到结果集中的。

1
2
3
4
5
6
7
8
9
java复制代码public class CollectOperator {
public static void main(String[] args) {
List<String> months1 = Stream.of("Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", "Jun", "Jun", "Feb", "Jan")
.filter(s -> "J".equals(s.substring(0, 1)))
.collect(Collectors.toList());
System.out.println(months1);
}
}

输出结果:

1
shell复制代码[Jan, Jun, Jul, Jun, Jun, Jan]

从结果中看到,我们通过 filter 过滤掉首字母不是 J 的字符串,并通过 collect 转换为 List 收集起来,但是收集的集合中有重复的元素,因此,我们将其转换成 Set 在收集。

1
2
3
4
5
6
7
8
9
java复制代码public class CollectOperator {
public static void main(String[] args) {
Set<String> months = Stream.of("Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", "Jun", "Jun", "Feb", "Jan")
.filter(s -> "J".equals(s.substring(0, 1)))
.collect(Collectors.toSet());
System.out.println(months);
}
}

输出结果:

1
shell复制代码[Jul, Jun, Jan]

输出的结果不是有序的,为了保证元素的有序,我们将元素存储在 TreeSet 中。Collectors 中没有 toTreeSet() ,因此我们通过 Collectors.toCollection(Supplier<C> collectionFactory) 来构建我们需要的集合类型。

1
2
3
4
5
java复制代码Stream.of("Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
"Jun", "Jun", "Feb", "Jan")
.filter(s -> "J".equals(s.substring(0, 1)))
.collect(Collectors.toCollection(TreeSet::new));

输出结果:

1
shell复制代码[Jan, Jul, Jun]

collect 也可以生成 Map。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码class Pair {
private final Character c;
private final Integer i;
Pair(Character c) { this.c = c;this.i = (int) c; }
public Character getC() { return c; }
public Integer getI() { return i; }
@Override
public String toString() { return "Pair{" + "c=" + c + ", i=" + i + '}'; }
}
public class CollectOperator {
public static void main(String[] args) {
Map<Character, Integer> charTables = Stream.of('c', 'a', 'z', 'o', 'y')
.map(c -> new Pair(c)).collect(Collectors.toMap(Pair::getC, Pair::getI));
System.out.println(charTables.toString());
}
}

输出结果:

1
shell复制代码{a=97, c=99, y=121, z=122, o=111}

对于第二个方法,参数 Supplier 是一个生成目标类型实例的方法,代表着存储类型的对象;BiConsumer<R, ? super T> accumulator 参数是将操作目标数据填充到 Supplier 生成的目标类型的实例中去的方法,代表着如何将元素添加到容器中;而 BiConsumer<R, R> combiner 是将多个 Supplier 生成的实例整合到一起的方法,代表着规约操作,将多个结果合并。

下面给出一个简单的例子,统计月份出现的频次:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class CollectOperator {
public static void main(String[] args) {
Map<String, Integer> wordCount = Stream.of("Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", "Jun", "Jun", "Feb", "Jan")
.collect(TreeMap::new,
(map, word) ->
map.merge(word, 1, Integer::sum),
Map::putAll);
System.out.println(wordCount);
}
}

输出结果:

1
shell复制代码{Apr=1, Aug=1, Dec=1, Feb=2, Jan=2, Jul=1, Jun=3, Mar=1, May=1, Nov=1, Oct=1, Sep=1}

reduce

reduce 的方法如下:

1
2
3
4
5
java复制代码T reduce(T identity, BinaryOperator<T> accumulator);
Optional<T> reduce(BinaryOperator<T> accumulator);
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);

reduce 是通过反复的对一个输入序列的元素进行某种组合操作(求和、最大值或将所有元素都放入一个列表),最终将其组合为一个单一的概要信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class ReduceOperator {
public static void main(String[] args) {
String concat = Stream.of("Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec")
.filter(s -> "J".equals(s.substring(0, 1)))
.reduce("", (a, b) -> {
if (!"".equals(a)) {
return a + "," + b;
}
return b;
});
System.out.println(concat);
}
}

输出结果:

1
shell复制代码Jan,Jun,Jul

Stream中关于 min、max、count 的源码如下:

1
2
3
java复制代码Optional<T> min(Comparator<? super T> comparator);
Optional<T> max(Comparator<? super T> comparator);
long count();

count 的作用是求流中元素个数。

min 的作用是根据传入的 Comparator 来求 “最小” 的元素。

max 的作用是根据传入的 Comparator 来求 “最小” 的元素。

它们的底层都是依赖 reduce 实现的。

现在看一下例子代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class InfoReduceOperator {
public static void main(String[] args) {
String[] months = {"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
long count = Stream.of(months)
.filter(s -> "J".equals(s.substring(0, 1)))
.count();
System.out.printf("计算首字母是`J`的字符串的个数是:%d个",count);
System.out.println();
String min = Stream.of(months).min(Comparator.comparing(Function.identity())).get();
System.out.printf("字符串数组中最小的字符串是:%s", min);
System.out.println();
String max = Stream.of(months).max(String::compareTo).get();
System.out.printf("字符串数组中最大的字符串是:%s", max);
}
}

输出结果:

1
2
3
shell复制代码计算首字母是`J`的字符串的个数是:3个
字符串数组中最小的字符串是:Apr
字符串数组中最大的字符串是:Sep

match

match 是匹配操作,它总共有三类方法:anyMatch、allMatch 和 noneMatch 。

anyMatch 用于检查所有元素中至少有一个是匹配的。它的方法如下:

1
java复制代码boolean anyMatch(Predicate<? super T> predicate);

allMatch 用于检查所有元素是否都匹配。方法如下:

1
java复制代码boolean allMatch(Predicate<? super T> predicate);

noneMatch 用于检查所有元素都没有匹配的。方法如下:

1
java复制代码boolean noneMatch(Predicate<? super T> predicate);

下面看一下关于上面方法的使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class MatchOperator {
public static void main(String[] args) {
String[] months = {"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
boolean isAnyMatch = Stream.of(months).anyMatch(s -> "A".equals(s.substring(0, 1)));
System.out.printf("判断months数组中是否存在首字母为`A`:%s",isAnyMatch);
System.out.println();
boolean isAllMath = Stream.of(months).allMatch(s -> "A".equals(s.substring(0, 1)));
System.out.printf("判断months数组中是否所有元素的首字母都为`A`:%s", isAllMath);
System.out.println();
boolean isNoneMath = Stream.of(months).noneMatch(s -> "A".equals(s.substring(0, 1)));
System.out.printf("判断months数组中的首字母都不为`A`:%s", isNoneMath);
}
}

输出结果:

1
2
3
shell复制代码判断months数组中是否存在首字母为`A`:true
判断months数组中是否所有元素的首字母都为`A`:false
判断months数组中的首字母都不为`A`:false

find

find 是查找操作,它总共有三类方法:findFirst 和 findAny。

findFirst 的作用是返回当前流中的第一个元素。方法如下:

1
java复制代码Optional<T> findFirst();

findAny 的作用是返回当前流中的任意元素。方法如下:

1
java复制代码Optional<T> findAny();

我们来一下它们的使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class FindOperator {
public static void main(String[] args) {
String[] months = {"Jan", "Feb", "Mar", "Apr", "May",
"Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
String first = Stream.of(months).parallel()
.filter(s -> "J".equals(s.substring(0, 1)))
.findFirst().get();
System.out.printf("获取流中第一个元素:%s", first);
System.out.println();
String any = Stream.of(months).parallel()
.filter(s -> "M".equals(s.substring(0, 1)))
.findAny().get();
System.out.printf("获取流中任意一个元素:%s", any);
}
}

输出结果:

1
2
shell复制代码获取流中第一个元素:Jan
获取流中任意一个元素:Mar

findFirst() 无论流是否是并行化,总是选择流中的第一个元素。

对于非并行流,findAny() 会选择流中的第一个元素。但当我们使用 parallel() 来并行化后,findAny()才实现了选择任意元素。

如果必须选择流中最后一个元素,那就使用 reduce()。

1
2
3
java复制代码String last = Stream.of(months).parallel()
.filter(s -> "J".equals(s.substring(0, 1)))
.reduce((v1, v2) -> v2).get();

reduce() 的参数只是用最后一个元素替换了最后两个元素,最终只生成最后一个元素。

小结

使用 Stream 流对于Java是一个极大的提升,使用它编写代码看起来较为简洁优雅,并且Stream流的并行化极大的发挥了现在多核时代的优势。


公众号 「海人为记」,期待你的关注!

回复「资源」即可获得免费学习资源!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官留步!听我跟你侃会儿Docker原理 1 Docker

发表于 2021-02-25

开发人员开发完一个电商项目,该 Jar 项目包含 Redis、MySQL、ES、Haddop等若干组件。开发人员自测无误后提交给测试进行预生产测试了。

测试:你的这个服务,我在进行单元测试跟数据核对的时候总是出现不知名的bug!你要不要来看下啊?

开发:你咋测试的?是按照操作文档一步步来的么?

测试:绝对是按照文档来的啊!

开发:你重启了吗?清缓存了吗?代码是最新版吗?你用的是Chrome浏览器? 你是不是动啥东西了?

测试:这.. 这.. 这.. 我啥也没干啊!

至此,开发跟测试之间的爱恨情仇正式开始!

1 Docker 简介

1.1 Docker 由来

Docker 是基于 Go 语言开发的一个容器引擎,Docker是应用程序与系统之间的隔离层。通常应用程序对安装的系统环境会有各种严格要求,当服务器很多时部署时系统环境的配置工作是非常繁琐的。Docker让应用程序不必再关心主机环境,各个应用安装在Docker镜像里,Docker引擎负责运行包裹了应用程序的docker镜像。

Docker的理念是让开发人员可以简单地把应用程序及依赖装载到容器中,然后轻松地部署到任何地方,Docker具有如下特性。

  1. Docker容器是轻量级的虚拟技术,占用更少系统资源。
  2. 使用 Docker容器,不同团队(如开发、测试,运维)之间更容易合作。
  3. 可以在任何地方部署 Docker 容器,比如在任何物理和虚拟机上,甚至在云上。
  4. 由于Docker容器非常轻量级,因此可扩展性很强。

1.2 Docker 基本组成

镜像(image):

Docker 镜像就好比是一个目标,可以通过这个目标来创建容器服务,可以简单的理解为编程语言中的类。

容器(container):

Docker 利用容器技术,独立运行一个或者一组应用,容器是通过镜像来创建的,在容器中可执行启动、停止、删除等基本命令,最终服务运行或者项目运行就是在容器中的,可理解为是类的实例。

仓库(repository):

仓库就是存放镜像的地方!仓库分为公有仓库和私有仓库,类似Git。一般我们用的时候都是用国内docker镜像来加速。

1.3 VM 跟 Docker

****

虚拟机:

传统的虚拟机需要模拟整台机器包括硬件,每台虚拟机都需要有自己的操作系统,虚拟机一旦被开启,预分配给他的资源将全部被占用。每一个虚拟机包括应用,必要的二进制和库,以及一个完整的用户操作系统。

Docker:

容器技术是和我们的宿主机共享硬件资源及操作系统可以实现资源的动态分配。容器包含应用和其所有的依赖包,但是与其他容器共享内核。容器在宿主机操作系统中,在用户空间以分离的进程运行。

比对项 Container(容器) VM(虚拟机)
启动速度 秒级 分钟级
运行性能 接近原生 有所损失
磁盘占用 MB GB
数量 成百上千 一般几十台
隔离性 进程级别 系统级别
操作系统 只支持Linux 几乎所有
封装程度 只打包项目代码和依赖关系 共享宿主机内核 完整的操作系统

1.4 Docker 跟 DevOps

DevOps 是一组过程、方法与系统的统称,用于促进开发(应用程序/软件工程)、技术运营和质量保障(QA)部门之间的沟通、协作与整合。

DevOps 是两个传统角色 Dev(Development) 和 Ops(Operations) 的结合,Dev 负责开发,Ops 负责部署上线,但 Ops 对 Dev 开发的应用缺少足够的了解,而 Dev 来负责上线,很多服务软件不知如何部署运行,二者中间有一道明显的鸿沟,DevOps 就是为了弥补这道鸿沟。DevOps 要做的事,是偏 Ops 的;但是做这个事的人,是偏 Dev 的, 说白了就是要有一个了解 Dev 的人能把 Ops 的事干了。而Docker 是适合 DevOps 的。

1.5 Docker 跟 k8s

k8s 的全称是 kubernetes,它是基于容器的集群管理平台,是管理应用的全生命周期的一个工具,从创建应用、应用的部署、应用提供服务、扩容缩容应用、应用更新、都非常的方便,而且可以做到故障自愈,例如一个服务器挂了,可以自动将这个服务器上的服务调度到另外一个主机上进行运行,无需进行人工干涉。k8s 依托于Google自家的强大实践应用,目前市场占有率已经超过Docker自带的Swarm了。

如果你有很多 Docker 容器要启动、维护、监控,那就上k8s吧!

1.6 hello world

docker run hello-world 的大致流程图如下:

2 Docker 常见指令

官方文档:

docs.docker.com/engine/refe…

3 Docker 运行原理

Docker 只提供一个运行环境,他跟 VM 不一样,是不需要运行一个独立的 OS,容器中的系统内核跟宿主机的内核是公用的。docker容器本质上是宿主机的进程。对 Docker 项目来说,它最核心的原理实际上就是为待创建的用户进程做如下操作:

  1. 启用 Linux Namespace 配置。
  2. 设置指定的 Cgroups 参数。
  3. 切换进程的根目录(Change Root),优先使用 pivot_root 系统调用,如果系统不支持,才会使用 chroot。

3.1 namespace 进程隔离

Linux Namespaces 机制提供一种进程资源隔离方案。PID、IPC、Network 等系统资源不再是全局性的,而是属于某个特定的Namespace。每个namespace下的资源对于其他 namespace 下的资源都是透明,不可见的。系统中可以同时存在两个进程号为0、1、2的进程,由于属于不同的namespace,所以它们之间并不冲突。

PS:Linux 内核提拱了6种 namespace 隔离的系统调用,如下图所示。

3.2 CGroup 分配资源

Docker 通过 Cgroup 来控制容器使用的资源配额,一旦超过这个配额就发出OOM。配额主要包括 CPU、内存、磁盘三大方面, 基本覆盖了常见的资源配额和使用量控制。

Cgroup 是 Control Groups 的缩写,是Linux 内核提供的一种可以限制、记录、隔离进程组所使用的物理资源(如 CPU、内存、磁盘 IO 等等)的机制,被 LXC(Linux container)、Docker 等很多项目用于实现进程资源控制。Cgroup 本身是提供将进程进行分组化管理的功能和接口的基础结构,I/O 或内存的分配控制等具体的资源管理是通过该功能来实现的,这些具体的资源 管理功能称为 Cgroup 子系统。

3.3 chroot 跟 pivot_root 文件系统

chroot(change root file system)命令的功能是 改变进程的根目录到指定的位置。比如我们现在有一个$HOME/test目录,想要把它作为一个 /bin/bash 进程的根目录。

  1. 首先,创建一个 HOME/test/{bin,lib64,lib}
  2. 把bash命令拷贝到test目录对应的bin路径下 cp -v /bin/{bash,ls} $HOME/test/bin
  3. 把bash命令需要的所有so文件,也拷贝到test目录对应的lib路径下
  4. 执行chroot命令,告诉操作系统,我们将使用HOME/test /bin/bash

被chroot的进程此时执行 ls / 返回的都是$HOME/test目录下面的内容,Docker就是这样实现容器根目录的。为了能够让容器的这个根目录看起来更真实,一般在容器的根目录下挂载一个完整操作系统的文件系统,比如Ubuntu16.04的ISO。这样在容器启动之后,容器里执行ls /查看到的就是Ubuntu 16.04的所有目录和文件。

而挂载在容器根目录上、用来为容器进程提供隔离后执行环境的文件系统,就是所谓的容器镜像。更专业的名字叫作:rootfs(根文件系统)。所以一个最常见的 rootfs 会包括如下所示的一些目录和文件:

1
2
go复制代码$ ls /
bin dev etc home lib lib64 mnt opt proc root run sbin sys tmp usr var

chroot 只改变当前进程的 /,pivot_root改变当前 mount namespace的 / 。pivot_root 可以认为是 chroot 的改良版。

3.4 一致性

由于 rootfs 里打包的不只是应用,而是整个操作系统的文件和目录,也就意味着应用以及它运行所需要的所有依赖都被封装在了一起。有了容器镜像打包操作系统的能力,这个最基础的依赖环境也终于变成了应用沙盒的一部分。这就赋予了容器所谓的一致性:

无论在本地、云端,还是在一台任何地方的机器上,用户只需要解压打包好的容器镜像,那么这个应用运行所需要的完整的执行环境就被重现出来了。

3.5 UnionFS 联合文件系统

如何实现rootfs的高效可重复利用呢?Docker在镜像的设计中引入了层(layer)的概念。也就是说用户制作镜像的每一步操作都会生成一个层,也就是一个增量rootfs。介绍分层前我们先说个重要知识点,联合文件系统。

联合文件系统(UnionFS)是一种分层、轻量级并且高性能的文件系统,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下。比如现在有水果fruits、蔬菜vegetables两个目录,其中水果中有苹果和蕃茄,蔬菜有胡萝卜和蕃茄:

1
2
3
4
5
6
7
8
go复制代码$ tree
.
├── fruits
│  ├── apple
│  └── tomato
└── vegetables
  ├── carrots
  └── tomato

然后使用联合挂载的方式将这两个目录挂载到一个公共的目录 mnt 上:

1
2
go复制代码$ mkdir mnt
$ sudo mount -t aufs -o dirs=./fruits:./vegetables none ./mnt

这时再查看目录 mnt 的内容,就能看到目录 fruits 和 vegetables 下的文件被合并到了一起:

1
2
3
4
5
go复制代码$ tree ./mnt
./mnt
├── apple
├── carrots
└── tomato

可以看到在 mnt 目录下有三个文件,苹果apple、胡萝卜carrots和蕃茄tomato。水果和蔬菜的目录被union到了 mnt 目录下了。

1
2
3
4
5
go复制代码 $ echo mnt > ./mnt/apple
 $ cat ./mnt/apple
 mnt
 $ cat ./fruits/apple
 mnt

可以看到./mnt/apple的内容改了,./fruits/apple的内容也改了。

1
2
3
4
5
go复制代码 $ echo mnt_carrots > ./mnt/carrots
 $ cat ./vegetables/carrots
 old
 $ cat ./fruits/carrots
 mnt_carrots

./vegetables/carrots 并没有变化,反而是 ./fruits/carrots 的目录中出现了 carrots 文件,其内容是我们在 ./mnt/carrots 里的内容。

结论:

在mount aufs命令时候,没有对 vegetables 跟 fruits 设置权限,默认命令行上第一个的目录是可读可写的,后面的全都是只读的。有重复的文件名,在mount命令行上,越往前的被操作的优先级越高。

3.6 layer 分层

说完联合文件系统后我们再说下Docker中的分层,镜像可以通过分层来进行继承,基于基础镜像(没有父镜像)用户可以制作各种具体的应用镜像。不同 Docker 容器可以共享一些基础的文件系统层,同时再加上自己独有的改动层,大大提高了存储的效率。

Docker 中使用一种叫 AUFS(Anothe rUnionFS)的联合文件系统。AUFS 支持为每一个成员目录设定不同的读写权限。

  1. rw 表示可写可读read-write。
  2. ro 表示read-only,如果你不指权限,那么除了第一个外,ro是默认值,对于ro分支,其永远不会收到写操作,也不会收到查找whiteout的操作。
  3. rr 表示 real-read-only,与read-only不同的是,rr 标记的是天生就是只读的分支,这样,AUFS可以提高性能,比如不再设置inotify来检查文件变动通知。

当我们想修改ro层的文件时咋办?因为ro是不允许修改的啊!Docker中一般ro层还带个wh的能力。我们就需要对这个ro目录里的文件作whiteout。AUFS的whiteout的实现是通过在上层的可写的目录下建立对应的whiteout隐藏文件来实现的。比如我们有三个目录和文件如下所示:

1
2
3
4
5
6
7
8
9
go复制代码$ tree
.
├── fruits
│   ├── apple
│   └── tomato
├── test #目录为空
└── vegetables
    ├── carrots
    └── tomato

执行如下:

1
2
3
4
go复制代码 $ mkdir mnt
 $ mount -t aufs -o dirs=./test=rw:./fruits=ro:./vegetables=ro none ./mnt
 $ ls ./mnt/
 apple  carrots  tomato

在权限为 rw 的 test 目录下建个 whiteout 的隐藏文件 .wh.apple,你就会发现 ./mnt/apple 这个文件就消失了,跟执行了 rm ./mnt/apple 是一样的结果:

1
2
3
go复制代码 $ touch ./test/.wh.apple
 $ ls ./mnt
 carrots  tomato

对于AUFS来说镜像的若干基础层放置在/var/lib/docker/aufs/diff目录下,然后通过查询/sys/fs/aufs 查看被联合挂载在一起的各个层的信息,多个基础层最终被联合挂载在/var/lib/docker/aufs/mnt里面,这里面存储的就是一个成品。

Docker 目前支持的联合文件系统包括 OverlayFS, AUFS, Btrfs, VFS, ZFS 和 Device Mapper。推荐使用 overlay2 存储驱动,overlay2 是目前 Docker 默认的存储驱动,以前则是 AUFS。

3.6.1 只读层

我们以Ubuntu为例,当执行docker image inspect ubuntu:latest 会发现容器的rootfs最下面的四层,对应的正是ubuntu:latest镜像的四层。它们的挂载方式都是只读的(ro+wh),都以增量的方式分别包含了Ubuntu操作系统的一部分,四层联合起来组成了一个成品。

3.6.2 可读写层

rootfs 最上层的操作权限为 rw, 在没有写入文件之前,这个目录是空的。而一旦在容器里做了写操作,你修改产生的内容就会以增量的方式出现在这个层中。如果你想删除只读层里的文件,咋办呢?这个问题上面已经讲解过了。

最上面这个可读写层就是专门用来存放修改 rootfs 后产生的增量,无论是增、删、改,都发生在这里。而当我们使用完了这个被修改过的容器之后,还可以使用 docker commit 和 push 指令,保存这个被修改过的可读写层,并上传到 Docker Hub上,供其他人使用。并且原先的只读层里的内容则不会有任何变化,这就是增量 rootfs 的好处。

3.6.3 init 层

它是一个以-init结尾的层,夹在只读层和读写层之间。Init层是Docker项目单独生成的一个内部层,专门用来存放 /etc/hosts 等信息。

需要这样一层的原因是这些文件本来属于只读的Ubuntu镜像的一部分,但是用户往往需要在启动容器时写入一些指定的值比如 hostname,那就需要在可读写层对它们进行修改。可是,这些修改往往只对当前的容器有效,我们并不希望执行 docker commit 时,把 init 层的信息连同可读写层一起提交。

最后这6层被组合起来形成了一个完整的 Ubuntu 操作系统供容器使用。

4 Docker 网络

由上面的 Docker 原理可知 Docker 使用了 Linux 的 Namespaces 技术来进行资源隔离,如 PID Namespace 隔离进程,Mount Namespace 隔离文件系统,Network Namespace 隔离网络等。一个Network Namespace 提供了一份独立的网络环境(包括网卡、路由、Iptable规则)与其他的Network Namespace隔离,一个Docker容器一般会分配一个独立的Network Namespace。

当你安装Docker时,执行docker network ls会发现它会自动创建三个网络。

1
2
3
4
5
go复制代码[root@server1 ~]$ docker network ls
NETWORK ID          NAME                DRIVER              SCOPE
0147b8d16c64        bridge              bridge              local
2da931af3f0b        host                host                local
63d31338bcd9        none                null                local

我们在使用docker run创建Docker容器时,可以用 –net 选项指定容器的网络模式,Docker可以有以下4种网络模式:

网络模式 使用注意
host 和宿主机共享网络
none 不配置网络
bridge docker默认,也可自创
container 容器网络连通,容器直接互联,用的很少

4.1 Host 模式

等价于Vmware中的桥接模式,当启动容器的时候用host模式,容器将不会虚拟出自己的网卡,配置自己的IP等,而是使用宿主机的IP和端口。但是容器的其他方面,如文件系统、进程列表等还是和宿主机隔离的。

4.2 Container 模式

Container 模式指定新创建的容器和已经存在的一个容器共享一个 Network Namespace,而不是和宿主机共享。新创建的容器不会创建自己的网卡,配置自己的IP,而是和一个指定的容器共享IP、端口范围等。同样,两个容器除了网络方面,其他的如文件系统、进程列表等还是隔离的。两个容器的进程可以通过lo网卡设备通信。

4.3 None 模式

None 模式将容器放置在它自己的网络栈中,并不进行任何配置。实际上,该模式关闭了容器的网络功能,该模式下容器并不需要网络(例如只需要写磁盘卷的批处理任务)。

4.4 Bridge 模式

Bridge 模式是 Docker 默认的网络设置,此模式会为每一个容器分配 Network Namespace、设置IP等。当Docker Server启动时,会在主机上创建一个名为docker0的虚拟网桥,此主机上启动的Docker容器会连接到这个虚拟网桥上。虚拟网桥的工作方式和物理交换机类似,主机上的所有容器就通过交换机连在了一个二层网络中。

Docker 会从RFC1918所定义的私有IP网段中,选择一个和宿主机不同的IP地址和子网分配给docker0,连接到 docker0 的容器从子网中选择个未占用 IP 使用。一般 Docker 会用 172.17.0.0/16 这个网段,并将172.17.0.1/16 分配给 docker0 网桥(在主机上使用ifconfig命令是可以看到docker0的,可以认为它是网桥的管理接口,在宿主机上作为一块虚拟网卡使用)。

网络配置的过程大致3步:

  1. 在主机上创建一对虚拟网卡 veth pair 设备。veth设备总是成对出现的,它们组成了一个数据的通道,数据从一个设备进入,就会从另一个设备出来。因此veth设备常用来连接两个网络设备。
  2. Docker 将 veth pair 设备的一端放在新创建的容器中,并命名为eth0。另一端放在主机中,以veth65f9 这样类似的名字命名,并将这个网络设备加入到docker0网桥中,可以通过brctl show命令查看。
  3. 从 docker0 子网中分配一个IP给容器使用,并设置 docker0 的IP地址为容器的默认网关。

Bridge 模式下容器的通信

  1. 容器访问外部

假设主机网卡为eth0,IP地址10.10.101.105/24,网关10.10.101.254。从主机上一个IP为172.17.0.1/16 的容器中ping百度(180.76.3.151)。首先IP包从容器发往自己的默认网关 docker0,包到达docker0后,会查询主机的路由表,发现包应该从主机的 eth0 发往主机的网关10.10.105.254/24。接着包会转发给eth0,并从eth0发出去。这时Iptable规则就会起作用,将源地址换为 eth0 的地址。这样,在外界看来,这个包就是从10.10.101.105上发出来的,Docker容器对外是不可见的。

  1. 外部访问容器

创建容器并将容器的80端口映射到主机的80端口。当我们对主机 eth0 收到的目的端口为80的访问时候,Iptable规则会进行DNAT转换,将流量发往172.17.0.2:80,也就是我们上面创建的Docker容器。所以,外界只需访问10.10.101.105:80就可以访问到容器中的服务。

4.5 –link

容器创建后我们想通过容器名字来ping。此时需要用到–link,如下:

1
2
3
go复制代码docker run -d -P --name linux03 --link linux02 linux
docker exec -it linux03 ping linux02 可ping通。
docker exec -it linux02 ping linux03 不可ping通。

追本溯源 看下 linux03 的 /etc/hosts 会发现本质只是做了个host映射。

1
go复制代码172.17.0.3 linux03 12ft4tesa # 跟Windows的host文件一样,只是做了地址绑定

4.6 自建Bridge

我们之前直接启动的命令 (默认是使用–net bridge,可省),这个bridge就是我们的docker0。下面俩是等价的。

1
2
go复制代码docker run -d -P --name linux01 LinuxSelf
docker run -d -P --name linux01 --net bridge LinuxSelf

docker0默认不支持域名访问 , 只能用 –link 打通连接。如果我们使用自定义的网络时,docker底层已经帮我们维护好了对应关系,可以实现域名访问。

1
2
3
4
go复制代码# --driver bridge 网络模式定义为 :桥接 
# --subnet 192.168.0.0/16 定义子网 ,范围为:192.168.0.2 ~ 192.168.255.255 
# --gateway 192.168.0.1 子网网关设为: 192.168.0.1 
docker network create --driver bridge --subnet 192.168.0.0/16 --gateway 192.168.0.1 mynet

接下来

1
2
3
4
go复制代码docker run -d -P --name linux-net-01 --net mynet LinuxSelf
docker run -d -P --name linux-net-02 --net mynet LinuxSelf
docker exec -it linux-net-01 ping linux-net-02的IP  # 结果OK
docker exec -it linux-net-01 ping linux-net-02     # 结果OK

5 可视化界面

5.1 Portainer

Portainer 是 Docker 的图形化管理工具,提供状态显示面板、应用模板快速部署、容器镜像网络数据卷的基本操作(包括上传下载镜像,创建容器等操作)、事件日志显示、容器控制台操作、Swarm集群和服务等集中管理和操作、登录用户管理和控制等功能。功能十分全面,基本能满足中小型单位对容器管理的全部需求。

5.2 DockerUI

DockerUI基于Docker API,提供等同Docker命令行的大部分功能,支持container管理,image管理。不过DockerUI一个致命的缺点就是 不支持多主机。

5.3 Shipyard

Shipyard 是一个集成管理docker容器、镜像、Registries的系统,它可以简化对横跨多个主机的Docker容器集群进行管理. 通过Web用户界面,你可以大致浏览相关信息,比如你的容器在使用多少处理器和内存资源、在运行哪些容器,还可以检查所有集群上的事件日志。

6 Docker 学习指南

本来也想写常见指令、Dockerfile、Docker Compose、Docker Swarm的,不过感觉还是拉闸吧,官方文档他不香啊!推荐几个学习指南。

  1. 官方文档:docs.docker.com/engine/refe…
  2. 从入门到实践:github.com/yeasy/docke…
  3. 在线教程:vuepress.mirror.docker-practice.com
  4. PDF:公众号回复 1412

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

spring-boot记录每次请求的sql总耗时和总次数

发表于 2021-02-25

目标记录每次请求内的http、es、mysql耗时,本篇讨论mysql部分

为什么说要探索,这不是很简单的事么?但是能满足以下几点么?

  • 能记录limit等参数
  • 能将参数和sql写一起,能直接使用
  • 能记录耗时
  • 能计数累加,统计一次请求中sql执行的总数和总耗时

spring原生能力

1
2
ini复制代码logging.level.org.hibernate.SQL=debug
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=trace

通过上面两条配置。

  • ✔️可以显示sql.
  • ❌不能和参数一行显示
  • ❌不能显示limit参数
  • ❌不能计数和记录耗时
1
2
csharp复制代码2021-02-23 19:35:42.932 DEBUG 97586 --- [  restartedMain] org.hibernate.SQL                        : select admin0_.id as id1_0_, admin0_.create_time as create_t2_0_, admin0_.modify_time as modify_t3_0_, admin0_.email as email4_0_, admin0_.password as password5_0_, admin0_.status as status6_0_, admin0_.username as username7_0_ from admin admin0_ where admin0_.username=?
2021-02-23 19:35:42.949 TRACE 97586 --- [ restartedMain] o.h.type.descriptor.sql.BasicBinder : binding parameter [1] as [VARCHAR] - [root]

原生log+org.hibernate.EmptyInterceptor

org.hibernate.EmptyInterceptor提供钩子,hibernate本身提供entity的curd钩子。重写EmptyInterceptor方法,可以实现计数。但是onPrepareStatement方法只是装配sql前的事件,而且不是完整的sql。

  • ✔️ 可以显示sql
  • ❌ 不能和参数一行显示
  • ❌ 不能显示limit参数
  • ✔️ 能计数
  • ❌ 不能记录耗时
1
ini复制代码spring.jpa.properties.hibernate.ejb.interceptor=com.vison.itdoc.config.HibernateInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class HibernateInterceptor extends EmptyInterceptor {

@Override
public boolean onLoad(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) {
// Log.info("onload...", entity)
return true;
}

@Override
public String onPrepareStatement(String string) {
// count++
return INSTANCE.onPrepareStatement(string);
}

@Override
public void afterTransactionCompletion(Transaction t) {
INSTANCE.afterTransactionCompletion(t);
Log.info("after trans complete", t);
}

}

log4jdbc

log4jdbc能很好的解决sql完整显示和记录耗时的问题

1
2
3
4
vbnet复制代码2021-02-23 19:59:13.709  INFO 97586 --- [nio-8081-exec-1] jdbc.sqltiming                           : select posts0_.id as id1_2_, posts0_.create_time as create_t2_2_, posts0_.modify_time as modify_t3_2_, 
posts0_.content as content4_2_, posts0_.title as title5_2_ from posts posts0_ where 1=1 order
by posts0_.id asc limit 10 ;
{executed in 1 msec}

还能够定义超过1定时间的执行sql记录为error类型。

1
2
3
4
5
6
xml复制代码        <dependency>
<groupId>com.googlecode.log4jdbc</groupId>
<artifactId>log4jdbc</artifactId>
<version>1.2</version>
<scope>runtime</scope>
</dependency>
1
2
3
4
5
ini复制代码spring.datasource.driver-class-name: net.sf.log4jdbc.DriverSpy
#使用log4jdbc后mysql的url
spring.datasource.url=jdbc:log4jdbc:mysql://localhost:3306/xxxx?useUnicode=true&characterEncoding=UTF-8
#使用log4jdbc后oracle的url
#spring.datasource.url: jdbc:log4jdbc:oracle:thin:@127.0.0.1:1521:orcl

注意需要添加spring.datasource.driver-class-name 和更改 spring.datasource.url 将jdbc改为 jdbc:log4jdbc

log4jdbc.properties可以定义更多配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ini复制代码#配置为需要记录的包或类匹配路径
#log4jdbc.debug.stack.prefix=com.drp
#log4jdbc加载的drivers (驱动名)
#log4jdbc.drivers=oracle.jdbc.OracleDriver
log4jdbc.auto.load.popular.drivers=true
#在日志中显示warn警告
log4jdbc.statement.warn=true
#毫秒值.执行时间超过该值的SQL语句将被记录为warn级别.
log4jdbc.sqltiming.warn.threshold=2000
#毫秒值.执行时间超过该值的SQL语句将被记录为error级别.
log4jdbc.sqltiming.error.threshold=3000
#是把boolean记录为 'true'/'false' 还是 1/0. 默认设置为false,不启用,为了移植性.
#log4jdbc.dump.booleanastruefalse=true
#输出的sql,一行最大的字符数,默认90. 以后新版可能为0
#log4jdbc.dump.sql.maxlinelength=90
#如果在调试模式下转储,则转储整个堆栈跟踪 默认false
log4jdbc.dump.fulldebugstacktrace=false

#是否记录某些类型的语句,默认true
log4jdbc.dump.sql.select=true
log4jdbc.dump.sql.insert=true
log4jdbc.dump.sql.delete=true
log4jdbc.dump.sql.update=true
log4jdbc.dump.sql.create=true

#输出sql末尾处加入分号,默认false
log4jdbc.dump.sql.addsemicolon=true

#将此设置为false以不修剪已记录的SQL
log4jdbc.trim.sql=true
#将此设置为false不删除额外的空行
log4jdbc.trim.sql.extrablanklines=true

#log4jdbc.suppress.generated.keys.exception=false
  • ✔️ 可以显示sql
  • ✔️ 能和参数一起显示
  • ✔️ 能显示limit参数
  • ✔️ 能计数
  • ✔️ 能记录单个sql耗时
  • ❌ 不能统计总耗时

不足的是,单纯log4jdbc并不能满足所有。理论上log4jdbc+org.hibernate.EmptyInterceptor可以满足需求了

P6Spy

测试完毕,发现P6Spy目前最能满足需求:

  • ✔️ 可以显示sql
  • ✔️ 不能和参数一起显示
  • ✔️ 不能显示limit参数
  • ✔️ 能计数
  • ✔️ 不能记录耗时
  • ✔️ 支持curd事件前后钩子,钩子参数返回sql和执行耗时及异常信息🚀
1
2
3
4
5
xml复制代码        <dependency>
<groupId>p6spy</groupId>
<artifactId>p6spy</artifactId>
<version>3.9.1</version>
</dependency>

同log4jdbc需要改driver和url

1
2
ini复制代码spring.datasource.driver-class-name=com.p6spy.engine.spy.P6SpyDriver
spring.datasource.url=jdbc:p6spy:mysql://localhost:3306/test?useLegacyDatetimeCode=false&serverTimezone=UTC

psy.properties可以定义更多配置

1
2
3
4
5
6
7
ini复制代码#modulelist=com.p6spy.engine.spy.P6SpyFactory,com.p6spy.engine.logging.P6LogFactory,com.p6spy.engine.outage.P6OutageFactory
modulelist=com.vison.itdoc.config.CustomeP6Factory,com.p6spy.engine.logging.P6LogFactory,com.p6spy.engine.outage.P6OutageFactory
#moduelist很关键,我这里使用了自定义的Factory,因为我需要自定义event
appender=com.p6spy.engine.spy.appender.Slf4JLogger
logMessageFormat=com.p6spy.engine.spy.appender.CustomLineFormat
customLogMessageFormat=%(executionTime) ms|%(category)|%(sql)
excludecategories=result,resultset,info,debug

正常使用默认配置就可以显示出sql和耗时信息

1
csharp复制代码 4 ms|statement|select admin0_.id as id1_0_, admin0_.create_time as create_t2_0_, admin0_.modify_time as modify_t3_0_, admin0_.email as email4_0_, admin0_.password as password5_0_, admin0_.status as status6_0_, admin0_.username as username7_0_ from admin admin0_ where admin0_.username='root'

可以看到,耗时信息和实际参数

自定义事件

modulelist=com.p6spy.engine.spy.P6SpyFactory改成自定义Factory

自定义Factory

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class CustomeP6Factory implements com.p6spy.engine.spy.P6Factory {

@Override
public P6LoadableOptions getOptions(P6OptionsRepository optionsRepository) {
return new P6SpyOptions(optionsRepository);
}

@Override
public JdbcEventListener getJdbcEventListener() {
return new P6spyListener(); //使用自定义Listener
}

}

自定义事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class P6spyListener extends JdbcEventListener {

@Override
public void onAfterExecuteQuery(PreparedStatementInformation statementInformation, long timeElapsedNanos, SQLException e) {
App.sqlCount.incrementAndGet();
Log.info("execute query...", statementInformation.getSqlWithValues());
}

@Override
public void onAfterExecuteUpdate(PreparedStatementInformation statementInformation, long timeElapsedNanos, int rowCount, SQLException e) {
App.sqlCount.incrementAndGet();
Log.info("execute update..", statementInformation.getSqlWithValues());
}

@Override
public void onAfterExecute(StatementInformation statementInformation, long timeElapsedNanos, String sql, SQLException e) {
Log.info("execute..", statementInformation.getSqlWithValues());
}

}

可以看到,我在自定义事件中进行了sql计数.于是我可以在请求结束时打印每次请求的总sql执行次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class RequestInitInterceptor implements HandlerInterceptor {

public RequestInitInterceptor() {
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
App._uniq_req_no = UUID.randomUUID().toString();
App.sqlCount = new AtomicInteger(0);
Log.setMsgTraceNo(App._uniq_req_no);
Log.info("request start...", handler);
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
throws Exception {
Log.info(String.format("finish request sql执行次数:%s", App.sqlCount));
}

}

由于事件参数还给出了timeElapsedNanos,最终我们还能统计出所有sql执行的耗时。这样一来我们就能看出一次请求内,最耗时的操作具体是什么。达到类似以下效果:

配合自定义DispatcherServlet可以记录请求数据、返回数据、sql执行数据次数、耗时

1
2
3
4
5
6
7
8
9
10
csharp复制代码2021-03-01 11:19:40.694 || DEBUG || c.v.i.c.LoggableDispatcherServlet ||  || GET "/admin/posts?page=1&rows=10&sort=id&desc=false", parameters={masked}
2021-03-01 11:19:40.698 || INFO || c.v.i.c.R.preHandle:32 || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || request start... ||
2021-03-01 11:19:40.716 || INFO || c.v.i.c.P.onAfterExecuteQuery:23 || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || 执行sql || select count(*) as col_0_0_ from posts posts0_ 耗时 2 ms
2021-03-01 11:19:40.734 || DEBUG || c.v.i.c.a.P.get:44 || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || 获取数量 35
2021-03-01 11:19:40.740 || DEBUG || c.v.i.s.P.query:39 || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || Page request [number: 0, size 10, sort: id: ASC]
2021-03-01 11:19:40.762 || INFO || c.v.i.c.P.onAfterExecuteQuery:23 || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || 执行sql || select posts0_.id as id1_2_, posts0_.create_time as create_t2_2_, posts0_.modify_time as modify_t3_2_, posts0_.content as content4_2_, posts0_.title as title5_2_ from posts posts0_ where true=1 order by posts0_.id asc limit 10 耗时 3 ms
2021-03-01 11:19:40.777 || INFO || c.v.i.c.P.onAfterExecuteQuery:23 || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || 执行sql || select count(posts0_.id) as col_0_0_ from posts posts0_ where true=1 耗时 4 ms
2021-03-01 11:19:40.809 || INFO || c.v.i.c.R.afterCompletion:39 || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || finish request... || sql执行次数:3 sql总耗时:9 ms
2021-03-01 11:19:40.819 || INFO || c.v.i.c.L.doDispatch:52 || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || {"uri":"/admin/posts","clientIp":"127.0.0.1","method":"GET","request":{"page":["1"],"rows":["10"],"sort":["id"],"desc":["false"]},"status":200,"response":{"code":0,"msg":"获取成功","data":{"docs":{"content":[{"id":1,"createTime":"2021-02-22T03:53:02.039+0800","modifyTime":"2021-02-22T03:53:02.039+0800","title":"test","content":"# test\n\n## test\n\n> test to"},{"id":2,"createTime":"2021-02-22T09:57:32.837+0800","modifyTime":"2021-02-22T09:57:32.837+0800","title":"test2","content":"test2"},{"id":3,"createTime":"2021-02-22T10:07:37.769+0800","modifyTime":"2021-02-22T10:07:37.769+0800","title":"test3","content":"test3"},{"id":4,"createTime":"2021-02-22T10:13:20.691+0800","modifyTime":"2021-02-22T10:13:20.691+0800","title":"test4","content":"## 二级标题"},{"id":5,"createTime":"2021-02-22T10:14:51.925+0800","modifyTime":"2021-02-22T10:14:51.925+0800","title":"test5","content":"test5"},{"id":6,"createTime":"2021-02-22T18:19:24.530+0800","modifyTime":"2021-02-22T18:19:24.530+0800","title":"test6","content":"test6"},{"id":7,"createTime":"2021-02-22T19:50:56.805+0800","modifyTime":"2021-02-22T19:50:56.805+0800","title":"test7","content":"t12212"},{"id":8,"createTime":"2021-02-23T17:20:37.492+0800","modifyTime":"2021-02-23T17:20:37.492+0800","title":"test","content":"## 二级标题"},{"id":9,"createTime":"2021-02-23T21:02:08.924+0800","modifyTime":"2021-02-23T21:02:08.924+0800","title":"21","content":"12121"},{"id":10,"createTime":"2021-02-24T09:53:08.278+0800","modifyTime":"2021-02-24T09:53:08.278+0800","title":"test insert","content":"test insert"}],"pageable":{"sort":{"unsorted":false,"sorted":true,"empty":false},"offset":0,"pageNumber":0,"pageSize":10,"paged":true,"unpaged":false},"totalPages":4,"totalElements":35,"last":false,"numberOfElements":10,"first":true,"number":0,"sort":{"unsorted":false,"sorted":true,"empty":false},"size":10,"empty":false}},"uniq_req_no":"6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4"}}
2021-03-01 11:19:40.820 || DEBUG || c.v.i.c.LoggableDispatcherServlet || 6ebd23e2-fc32-4f58-a5b8-fd39d496d0d4 || Completed 200 OK

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【程序员不圆1】【狂神说JavaSpring5读书笔记1】

发表于 2021-02-24

【程序员不圆1】【狂神说JavaSpring5读书笔记1】

Spring理念

使现有的技术更好的使用,本身是一个大杂烩,整合了现有的技术框架

Spring是什么

一个轻量级,非入侵的的IOC,AOP框架

SpringBoot是什么

用来构建一切,一个快速开发的脚手架,可以快速开发单个微服务,约定大于配置

SpringCloud是什么

用来协调一切,基于SpringBoot实现,各种微服务,中间件

为什么用IOC

原本

1
java复制代码UserDao userDao  = new UserDaoImpl();

如果想换成UserDaoMysqlImpl()就需要更改代码

怎么实现IOC

1
2
3
4
5
java复制代码private UserDao userDao;

public void setUserDao(UserDao userDao){
this.userDao = userDao;
}

什么是IOC

对象创建与否不再是程序员控制

可以是用户控制或者是框架控制

怎么控制?传入要控制的对象就行

ApplicationContext有什么特殊

假如读取了Beans.xml文件,那么这个文件中的所有的Bean都可以用哪个ApplicationContext直接getBean获取

容器中管理的文件一起初始化

IOC对象的创建

  1. 无参构造+setter
  2. 有参构造+下标赋值
  3. 有参构造+类型赋值
  4. 有参构造+名称赋值

DI是什么

DI只是IOC的一种实现方法

DI有什么方法

注入属性

  1. 构造器注入
  2. settter注入
  3. 拓展注入

注入方法

  1. value注入:value
  2. Bean注入:ref
  3. 数组注入:array
  4. List注入:list
  5. Map注入:map,entry,key,value
  6. Set注入:set,value
  7. null:
  8. property:props,key
  9. p命名空间注入:p:name setter注入 需要导入约束
  10. c命名空间注入:c:name 构造器注入 需要导入约束

Bean的Scope

  • 单例
  • 原型
  • request、session、application、websoket都在web开发中使用

Bean的装配

  1. xml
  2. java
  3. 隐式

Bean的自动装配

  1. xml
    1. byName:上下文中找,比如xml中找名字相同的 id唯一
    2. byType:class唯一,可以不要id
  2. 注解
    1. 要求
      1. 导入约束
      2. 开启注解的支持
    2. Autowired
      1. 可以不用setter
      2. 默认bytype
      3. @nullable可以为空
      4. require=false 可以为空
      5. @Qualifier指定到底注入哪一个
    3. @Resource 默认bytype 如果找不到就byname 如果找不到就报错
    4. @component
      1. 需要开启注解的支持
      2. 衍生
        1. @Repository
        2. @Service
        3. @Controller
  3. java注入
    1. @Configuration
      1. 本身也是个Component
    2. @Bean

注入的最佳实践

xml创建bean

注解属性的注入

静态代理

自己写一个Proxy

静态代理代理模式缺点

代码量翻倍

动态代理

自动生成代理类

原理:反射

方法

  • 基于接口:默认 JDK动态代理 Proxy.newProxyInstance 代理的是接口 返回的是接口不是实现类
  • 基于类:cglib 如果目标对象没有实现了接口,必须采用CGLIB库
  • java字节码

动态代理好处

  • 一个代理类可以代理一个业务 不用一个类一个代理类

AOP实现

  1. 原生Spring API:xml 导入aop约束 切入点 advisor
  2. 自定义类: xml 主要是切面定义
  3. 注解:

AOP概念

切面:类 需要执行代理的类

通知:方法 切面必须完成的工作

目标:通知的对象

代理:创建的对象

切入点:通知执行的地点的定义?哪些方法需要执行(“execution(* com.xx.xx.xxxImpl.(..))”)

连接点:切入点匹配的执行点

Mybatis整合

  1. 方法1
1. 实体类
2. 核心配置文件
    1. Datasource
    2. sqlSessionFactory
    3. sqlSessionTemplate
3. 接口
4. Mapper.xml
5. 测试
  1. 方法2
1. sqlSessionDaoSupport

声明式事务

保证原子性

可以使用AOP编入

传播特性

针对事务来说的

一般是required

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…715716717…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%