开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

高可用RabbitMQ集群的搭建及原理分析 前言 Rabbi

发表于 2021-02-11

前言

任何一个服务,如果仅仅是单机部署,那么性能总是有上限的,RabbitMQ 也不例外,当单台 RabbitMQ 服务处理消息的能力到达瓶颈时,可以通过集群来实现高可用和负载均衡。

RabbitMQ 集群知多少

通常情况下,在集群中我们把每一个服务称之为一个节点,在 RabbitMQ 集群中,节点类型可以分为两种:

  • 内存节点:元数据存放于内存中。为了重启后能同步数据,内存节点会将磁盘节点的地址存放于磁盘之中,除此之外,如果消息被持久化了也会存放于磁盘之中,因为内存节点读写速度快,一般客户端会连接内存节点。
  • 磁盘节点:元数据存放于磁盘中(默认节点类型),需要保证至少一个磁盘节点,否则一旦宕机,无法恢复数据,从而也就无法达到集群的高可用目的。

PS:元数据,指的是包括队列名字属性、交换机的类型名字属性、绑定信息、vhost等基础信息,不包括队列中的消息数据。

RabbitMQ 中的集群主要有两种模式:普通集群模式和镜像队列模式。

普通集群模式

在普通集群模式下,集群中各个节点之间只会相互同步元数据,也就是说,消息数据不会被同步。那么问题就来了,假如我们连接到 A 节点,但是消息又存储在 B 节点又怎么办呢?

不论是生产者还是消费者,假如连接到的节点上没有存储队列数据,那么内部会将其转发到存储队列数据的节点上进行存储。虽然说内部可以实现转发,但是因为消息仅仅只是存储在一个节点,那么假如这节点挂了,消息是不是就没有了?这个问题确实存在,所以这种普通集群模式并没有达到高可用的目的。

镜像队列模式

镜像队列模式下,节点之间不仅仅会同步元数据,消息内容也会在镜像节点间同步,可用性更高。这种方案提升了可用性的同时,因为同步数据之间也会带来网络开销从而在一定程度上会影响到性能。

RabbitMQ 集群搭建

接下来让我们一起尝试搭建一个 RabbitMQ 集群:

  1. 假如之前启动过单机版,那么先删除旧数据 rm -rf /var/lib/rabbitmq/mnesia 或者删除安装目录内的 var/lib/rabbitmq/mnesia,我本机是安装在安装目录下,所以执行的是命令 rm -rf /usr/local/rabbitmq_server-3.8.4/var/lib/rabbitmq/mnesia/。
  2. 接下来需要启动以下三个命令来启动三个不同端口号的 RabbitMQ 服务,除了指定 RabbitMQ 服务端口之后还需要额外指定后台管理系统的端口,而且必须指定 node 名的前缀,因为集群中是以节点名来进行通信的,所以节点名必须唯一,默认的节点名是 rabbit@hostname,下面的命令表示指定了前缀:
1
2
3
properties复制代码RABBITMQ_NODE_PORT=5672 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit1 rabbitmq-server -detached
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit3 rabbitmq-server -detached

启动之后进入 /usr/local/rabbitmq_server-3.8.4/var/lib/rabbitmq/mnesia/ 目录查看,发现创建了 3 个节点信息:

另外通过 ps -ef | grep rabbit 也可以发现三个服务进程被启动。

  1. 现在启动的三个服务彼此之间还没有联系,现在我们需要以其中一个节点为主节点,然后其余两个节点需要加入主节点,形成一个集群服务,需要注意的是加入集群之前,需要重置节点信息,即不允许带有数据的节点加入集群。
1
2
3
4
5
6
7
8
9
10
java复制代码//rabbit2 节点重置后加入集群
rabbitmqctl -n rabbit2 stop_app
rabbitmqctl -n rabbit2 reset
rabbitmqctl -n rabbit2 join_cluster --ram rabbit1@`hostname -s` //--ram 表示这是一个内存节点
rabbitmqctl -n rabbit2 start_app

rabbitmqctl -n rabbit3 stop_app
rabbitmqctl -n rabbit3 reset
rabbitmqctl -n rabbit3 join_cluster --disc rabbit1@`hostname -s` //--disc表示磁盘节点(默认也是磁盘节点)
rabbitmqctl -n rabbit3 start_app
  1. 成功之后,执行命令 rabbitmqctl cluster_status 查询节点 rabbit1 的状态,可以看到下图所示,两个磁盘节点一个内存节点:

  1. 需要注意的是,到这里启动的集群只是默认的普通集群,如果想要配置成镜像集群,则需要执行以下命令:
1
java复制代码rabbitmqctl -n rabbit1 set_policy ha-all "^" '{"ha-mode":"all"}'

到这里 RabbitMQ 集群就算搭建完成了,不过需要注意的是,这里因为是单机版本,所以没有考虑 .erlang.cookie 文件保持一致。

基于 HAProxy + Keepalived 高可用集群

假如一个 RabbitMQ 集群中,有多个内存节点,我们应该连接到哪一个节点呢?这个选择的策略如果放在客户端做,那么会有很大的弊端,最严重的的就是每次扩展集群都要修改客户端代码,所以这种方式并不是很可取,所以我们在部署集群的时候就需要一个中间代理组件,这个组件要能够实现服务监控和转发,比如 Redis 中的 Sentinel(哨兵)集群模式,哨兵就可以监听 Redis 节点并实现故障转移。

在 RabbitMQ 集群中,通过 Keepalived 和 HAProxy 两个组件实现了集群的高可用性和负载均衡功能。

HAProxy

HAProxy 是一个开源的、高性能的负载均衡软件,同样可以作为负载均衡软件的还有 nginx,lvs 等。 HAproxy 支持 7 层负载均衡和 4 层负载均衡。

负载均衡

所谓的 7 层负载均衡和 4 层负载均衡针对的是 OSI 模型而言,如下图所示就是一个 OSI 通信模型:

上图中看到,第 7 层对应了应用层,第 4 层对应了传输层。常用的负载均衡软件如 nginx 一般工作在第 7 层,lvs(Linux Virtual Server)一般工作在第 4 层。

  • 4 层负载:

4 层负载使用了 NAT (Network Address Translation)技术,即:网络地址转换。收到客户端请求时,可以通过修改数据包里的源 IP 和端口,然后把数据包转发到对应的目标服务器。4 层负载均衡只能根据报文中目标地址和源地址对请求进行转发,无法判断或者修改请求资源的具体类型。

  • 7 层负载:

根据客户端请求的资源路径,转发到不同的目标服务器。

高可用 HAProxy

HAProxy 虽然实现了负载均衡,但是假如只是部署一个 HAProxy,那么其本身也存在宕机的风险。一旦 HAProxy 宕机,那么就会导致整个集群不可用,所以我们也需要对 HAProxy 也实现集群,那么假如 HAProxy 也实现了集群,客户端应该连接哪一台服务呢?问题似乎又回到了起点,陷入了无限循环中…

Keepalived

为了实现 HAProxy 的高可用,需要再引入一个 Keepalived 组件,Keepalived 组件主要有以下特性:

  • 具有负载功能,可以监控集群中的节点状态,如果集群中某一个节点宕机,可以实现故障转移。
  • 其本身也可以实现集群,但是只能有一个 master 节点。
  • master 节点会对外提供一个虚拟 IP,应用端只需要连接这一个 IP 就行了。可以理解为集群中的 HAProxy 节点会同时争抢这个虚拟 IP,哪个节点争抢到,就由哪个节点来提供服务。

VRRP 协议

VRRP 协议即虚拟路由冗余协议(Virtual Router Redundancy Protocol)。Keepalived 中提供的虚拟 IP 机制就属于 VRRP,它是为了避免路由器出现单点故障的一种容错协议。

总结

本文主要介绍了 RaabbitMQ 集群的相关知识,并对比了普通集群和镜像集群的区别,最后通过实践搭建了一个 RabbitMQ 集群,同时也介绍了普通的集群存在一些不足,可以结合 HAProxy 和 Keepalived 组件来实现真正的高可用分布式集群服务。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

撸完这篇线程池,我快咳血了!

发表于 2021-02-10

我们知道,线程需要的时候要进行创建,不需要的时候需要进行销毁,但是线程的创建和销毁都是一个开销比较大的操作。

为什么开销大呢?

虽然我们程序员创建一个线程很容易,直接使用 new Thread() 创建就可以了,但是操作系统做的工作会多很多,它需要发出 系统调用,陷入内核,调用内核 API 创建线程,为线程分配资源等,这一些操作有很大的开销。

所以,在高并发大流量的情况下,频繁的创建和销毁线程会大大拖慢响应速度,那么有什么能够提高响应速度的方式吗?方式有很多,尽量避免线程的创建和销毁是一种提升性能的方式,也就是把线程 复用 起来,因为性能是我们日常最关注的因素。

本篇文章我们先来通过认识一下 Executor 框架、然后通过描述线程池的基本概念入手、逐步认识线程池的核心类,然后慢慢进入线程池的原理中,带你一步一步理解线程池。

在 Java 中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下 Java 的线程池。

Executor 框架

为什么要先说一下 Executor 呢?因为我认为 Executor 是线程池的一个驱动,我们平常创建并执行线程用的一般都是 new Thread().start() 这个方法,这个方法更多强调 创建一个线程并开始运行。而我们后面讲到创建线程池更多体现在驱动执行上。

Executor 的总体框架如下,我们下面会对 Executor 框架中的每个类进行介绍。


我们首先来认识一下 Executor

Executor 接口

Executor 是 java.util.concurrent 的顶级接口,这个接口只有一个方法,那就是 execute 方法。我们平常创建并启动线程会使用 new Thread().start() ,而 Executor 中的 execute 方法替代了显示创建线程的方式。Executor 的设计初衷就是将任务提交和任务执行细节进行解藕。使用 Executor 框架,你可以使用如下的方式创建线程

1
2
3
java复制代码Executor executor = Executors.xxx // xxx 其实就是 Executor 的实现类,我们后面会说
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

execute方法接收一个 Runnable 实例,它用来执行一个任务,而任务就是一个实现了 Runnable 接口的类,但是 execute 方法不能接收实现了 Callable 接口的类,也就是说,execute 方法不能接收具有返回值的任务。

execute 方法创建的线程是异步执行的,也就是说,你不用等待每个任务执行完毕后再执行下一个任务。

比如下面就是一个简单的使用 Executor 创建并执行线程的示例

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class RunnableTask implements Runnable{

@Override
public void run() {
System.out.println("running");
}

public static void main(String[] args) {
Executor executor = Executors.newSingleThreadExecutor(); // 你可能不太理解这是什么意思,我们后面会说。
executor.execute(new RunnableTask());
}
}

Executor 就相当于是族长,大佬只发号令,族长让你异步执行你就得异步执行,族长说不用汇报任务你就不用回报,但是这个族长管的事情有点少,所以除了 Executor 之外,我们还需要认识其他管家,比如说管你这个线程啥时候终止,啥时候暂停,判断你这个线程当前的状态等,ExecutorService 就是一位大管家。

ExecutorService 接口

ExecutorService 也是一个接口,它是 Executor 的拓展,提供了一些 Executor 中没有的方法,下面我们来介绍一下这些方法

1
java复制代码void shutdown();

shutdown 方法调用后,ExecutorService 会有序关闭正在执行的任务,但是不接受新任务。如果任务已经关闭,那么这个方法不会产生任何影响。

ExecutorService 还有一个和 shutdown 方法类似的方法是

1
java复制代码List<Runnable> shutdownNow();

shutdownNow 会尝试停止关闭所有正在执行的任务,停止正在等待的任务,并返回正在等待执行的任务列表。

既然 shutdown 和 shutdownNow 这么相似,那么二者有啥区别呢?

  • shutdown 方法只是会将线程池的状态设置为 SHUTWDOWN ,正在执行的任务会继续执行下去,线程池会等待任务的执行完毕,而没有执行的线程则会中断。
  • shutdownNow 方法会将线程池的状态设置为 STOP,正在执行和等待的任务则被停止,返回等待执行的任务列表

ExecutorService 还有三个判断线程状态的方法,分别是

1
2
3
4
java复制代码boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
  • isShutdown 方法表示执行器是否已经关闭,如果已经关闭,返回 true,否则返回 false。
  • isTerminated 方法表示判断所有任务再关闭后是否已完成,如果完成返回 false,这个需要注意一点,除非首先调用 shutdown 或者 shutdownNow 方法,否则 isTerminated 方法永远不会为 true。
  • awaitTermination 方法会阻塞,直到发出调用 shutdown 请求后所有的任务已经完成执行后才会解除。这个方法不是非常容易理解,下面通过一个小例子来看一下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public static ExecutorService executorService = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

}

executorService.shutdown();
System.out.println("Waiting...");
boolean isTermination = executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("Waiting...Done");
if(isTermination){
System.out.println("All Thread Done");
}
System.out.println(Thread.currentThread().getName());
}

如果在调用 executorService.shutdown() 之后,所有线程完成任务,isTermination 返回 true,程序才会打印出 All Thread Done ,如果注释掉 executorService.shutdown() 或者在任务没有完成后 awaitTermination 就超时了,那么 isTermination 就会返回 false。

ExecutorService 当大管家还有一个原因是因为它不仅能够包容 Runnable 对象,还能够接纳 Callable 对象。在 ExecutorService 中,submit 方法扮演了这个角色。

1
2
3
java复制代码<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

submit 方法会返回一个 Future对象,<T> 表示范型,它是对 Callable 产生的返回值来说的,submit 方法提交的任务中的 call 方法如果返回 Integer,那么 submit 方法就返回 Future<Integer>,依此类推。

1
2
3
4
5
java复制代码<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

invokeAll 方法用于执行给定的任务结合,执行完成后会返回一个任务列表,任务列表每一项是一个任务,每个任务会包括任务状态和执行结果,同样 invokeAll 方法也会返回 Future 对象。

1
2
3
4
5
java复制代码<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

invokeAny 会获得最先完成任务的结果,即Callable<T> 接口中的 call 的返回值,在获得结果时,会中断其他正在执行的任务,具有阻塞性。

大管家的职责相对于组长来说标准更多,管的事情也比较宽,但是大管家毕竟也是家族的中流砥柱,他不会做具体的活,他的下面有各个干将,干将是一个家族的核心,他负责完成大管家的工作。

AbstractExecutorService 抽象类

AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 中的部分方法,它相当一个干将,会分析大管家有哪些要做的工作,然后针对大管家的要求做一些具体的规划,然后找他的得力助手 ThreadPoolExecutor 来完成目标。

AbstractExecutorService 这个抽象类主要实现了 invokeAll 和 invokeAny 方法,关于这两个方法的源码分析我们会在后面进行解释。

ScheduledExecutorService 接口

ScheduledExecutorService 也是一个接口,它扩展了 ExecutorService 接口,提供了 ExecutorService 接口所没有的功能,ScheduledExecutorService 顾名思义就是一个定时执行器,定时执行器可以安排命令在一定延迟时间后运行或者定期执行。

它主要有三个接口方法,一个重载方法。下面我们先来看一下这两个重载方法。

1
2
3
4
java复制代码public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

schedule 方法能够延迟一定时间后执行任务,并且只能执行一次。可以看到,schedule 方法也返回了一个 ScheduledFuture 对象,ScheduledFuture 对象扩展了 Future 和 Delayed 接口,它表示异步延迟计算的结果。schedule 方法支持零延迟和负延迟,这两类值都被视为立即执行任务。

还有一点需要说明的是,schedule 方法能够接收相对的时间和周期作为参数,而不是固定的日期,你可以使用 date.getTime - System.currentTimeMillis() 来得到相对的时间间隔。

1
2
3
4
java复制代码public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

scheduleAtFixedRate 表示任务会根据固定的速率在时间 initialDelay 后不断地执行。

1
2
3
4
java复制代码public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

这个方法和上面的方法很类似,它表示的是以固定延迟时间的方式来执行任务。

scheduleAtFixedRate 和 scheduleWithFixedDelay 这两个方法容易混淆,下面我们通过一个示例来说明一下这两个方法的区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class ScheduleTest {

public static void main(String[] args) {
Runnable command = () -> {
long startTime = System.currentTimeMillis();
System.out.println("current timestamp = " + startTime);
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("time spend = " + (System.currentTimeMillis() - startTime));
};

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleAtFixedRate(command,100,1000,TimeUnit.MILLISECONDS);
}
}

输出结果大致如下


可以看到,没次打印出来 current timestamp 的时间间隔大约等于 1000 毫秒,所以可以断定 scheduleAtFixedRate 是以恒定的速率来执行任务的。

然后我们再看一下 scheduleWithFixedDelay 方法,和上面测试类一样,只不过我们把 scheduleAtFixedRate 换为了 scheduleWithFixedDelay 。

1
java复制代码scheduledExecutorService.scheduleWithFixedDelay(command,10,1000,TimeUnit.MILLISECONDS);

然后观察一下输出结果


可以看到,两个 current timestamp 之间的间隔大约等于 1000(固定时间) + delay(time spend) 的总和,由此可以确定 scheduleWithFixedDelay 是以固定时延来执行的。

线程池的描述

下面我们先来认识一下什么是线程池,线程池从概念上来看就是一个池子,什么池子呢?是指管理同一组工作线程的池子,也就是说,线程池会统一管理内部的工作线程。

wiki 上说,线程池其实就是一种软件设计模式,这种设计模式用于实现计算机程序中的并发。

![image-20210202200016478](/Users/mr.l/Library/Application Support/typora-user-images/image-20210202200016478.png)

比如下面就是一个简单的线程池概念图。

注意:这个图只是一个概念模型,不是真正的线程池实现,希望读者不要混淆。

可以看到,这种其实也相当于是生产者-消费者模型,任务队列中的线程会进入到线程池中,由线程池进行管理,线程池中的一个个线程就是工作线程,工作线程执行完毕后会放入完成队列中,代表已经完成的任务。

上图有个缺点,那就是队列中的线程执行完毕后就会销毁,销毁就会产生性能损耗,降低响应速度,而我们使用线程池的目的往往是需要把线程重用起来,提高程序性能。

所以我们应该把执行完成后的工作线程重新利用起来,等待下一次使用。

线程池创建

我们上面大概聊了一下什么线程池的基本执行机制,你知道了线程是如何复用的,那么任何事物不可能是凭空出现的,线程也一样,那么它是如何创建出来的呢?下面就不得不提一个工具类,那就是 Executors。

Executors 也是java.util.concurrent 包下的成员,它是一个创建线程池的工厂,可以使用静态工厂方法来创建线程池,下面就是 Executors 所能够创建线程池的具体类型。

  • newFixedThreadPool:newFixedThreadPool 将会创建固定数量的线程池,这个数量可以由程序员通过创建 Executors.newFixedThreadPool(int nThreads)时手动指定,每次提交一个任务就会创建一个线程,在任何时候,nThreads 的值是最多允许活动的线程。如果在所有线程都处于活跃状态时有额外的任务被创建,这些新创建的线程会进入等待队列等待线程调度。如果有任何线程由于执行期间出现意外导致线程终止,那么在执行后续任务时会使用等待队列中的线程进行替代。
  • newWorkStealingPool:newWorkStealingPool 是 JDK1.8 新增加的线程池,它是基于 fork-join 机制的一种线程池实现,使用了 Work-Stealing 算法。newWorkStealingPool 会创建足够的线程来支持并行度,会使用多个队列来减少竞争。work-stealing pool 线程池不会保证提交任务的执行顺序。
  • newSingleThreadExecutor:newSingleThreadExecutor 是一个单线程的执行器,它只会创建单个线程来执行任务,如果这个线程异常结束,则会创建另外一个线程来替代。newSingleThreadExecutor 会确保任务在任务队列中的执行次序,也就是说,任务的执行是 有序的。
  • newCachedThreadPool:newCachedThreadPool 会根据实际需要创建一个可缓存的线程池。如果线程池的线程数量超过实际需要处理的任务,那么 newCachedThreadPool 将会回收多余的线程。如果实际需要处理的线程不能满足任务的数量,则回你添加新的线程到线程池中,线程池中线程的数量不存在任何限制。
  • newSingleThreadScheduledExecutor:newSingleThreadScheduledExecutor 和 newSingleThreadExecutor 很类似,只不过带有 scheduled 的这个执行器哥们能够在一定延迟后执行或者定期执行任务。
  • newScheduledThreadPool:这个线程池和上面的 scheduled 执行器类似,只不过 newSingleThreadScheduledExecutor 比 newScheduledThreadPool 多加了一个 DelegatedScheduledExecutorService 代理,这其实包装器设计模式的体现。

上面这些线程池的底层实现都是由 ThreadPoolExecutor 来提供支持的,所以要理解这些线程池的工作原理,你就需要先把 ThreadPoolExecutor 搞明白,下面我们就来聊一聊 ThreadPoolExecutor。

ThreadPoolExecutor 类

ThreadPoolExecutor 位于 java.util.concurrent 工具类下,可以说它是线程池中最核心的一个类了。如果你要想把线程池理解透彻的话,就要首先了解一下这个类。

如果我们再拿上面家族举例子的话,ThreadPoolExecutor 就是一个家族的骨干人才,家族顶梁柱。ThreadPoolExecutor 做的工作真是太多太多了。

首先,ThreadPoolExecutor 提供了四个构造方法,然而前三个构造方法最终都会调用最后一个构造方法进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class ThreadPoolExecutor extends AbstractExecutorService {
.....
// 1
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
// 2
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
// 3
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
// 4
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}

所以我们直接就来看一波最后这个线程池,看看参数都有啥,如果我没数错的话,应该是有 7 个参数(小学数学水平。。。。。。)

  • 首先,一个非常重要的参数就是 corePoolSize,核心线程池的容量/大小,你叫啥我觉得都没毛病。只不过你得理解这个参数的意义,它和线程池的实现原理有非常密切的关系。你刚开始创建了一个线程池,此时是没有任何线程的,这个很好理解,因为我现在没有任务可以执行啊,创建线程干啥啊?而且创建线程还有开销啊,所以等到任务过来时再创建线程也不晚。但是!我要说但是了,如果调用了 prestartAllCoreThreads 或者 prestartCoreThread 方法,就会在没有任务到来时创建线程,前者是创建 corePoolSize 个线程,后者是只创建一个线程。Lea 爷爷本来想让我们程序员当个懒汉,等任务来了再干;可是你非要当个饿汉,提前完成任务。如果我们想当个懒汉的话,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中。

  • maximumPoolSize :又来一个线程池的容量,只不过这个是线程池的最大容量,也就是线程池所能容纳最大的线程,而上面的 corePoolSize 只是核心线程容量。

我知道你此时会有疑问,那就是不知道如何核心线程的容量和线程最大容量的区别是吧?我们后面会解释这点。

  • keepAliveTime:这个参数是线程池的保活机制,表示线程在没有任务执行的情况下保持多久会终止。在默认情况下,这个参数只在线程数量大于 corePoolSize 时才会生效。当线程数量大于 corePoolSize 时,如果任意一个空闲的线程的等待时间 > keepAliveTime 后,那么这个线程会被剔除,直到线程数量等于 corePoolSize 为止。如果调用了 allowCoreThreadTimeOut 方法,线程数量在 corePoolSize 范围内也会生效,直到线程减为 0。
  • unit :这个参数好说,它就是一个 TimeUnit 的变量,unit 表示的是 keepAliveTime 的时间单位。unit 的类型有下面这几种
1
2
3
4
5
6
7
java复制代码TimeUnit.DAYS;               //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
  • workQueue:这个参数表示的概念就是等待队列,我们上面说过,如果核心线程 > corePoolSize 的话,就会把任务放入等待队列,这个等待队列的选择也是一门学问。Lea 爷爷给我们展示了三种等待队列的选择

+ `SynchronousQueue`: 基于`阻塞队列(BlockingQueue)`的实现,它会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。使用 SynchronousQueue 阻塞队列一般要求maximumPoolSizes 为无界,也就是 Integer.MAX\_VALUE,避免线程拒绝执行操作。
+ `LinkedBlockingQueue`:LinkedBlockingQueue 是一个无界缓存等待队列。当前执行的线程数量达到 corePoolSize 的数量时,剩余的元素会在阻塞队列里等待。
+ `ArrayBlockingQueue`:ArrayBlockingQueue 是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于 corePoolSize 时,多余的元素缓存在 ArrayBlockingQueue 队列中等待有空闲的线程时继续执行,当 ArrayBlockingQueue 已满时,加入 ArrayBlockingQueue 失败,会开启新的线程去执行,当线程数已经达到最大的 maximumPoolSizes 时,再有新的元素尝试加入 ArrayBlockingQueue时会报错
  • threadFactory:线程工厂,这个参数主要用来创建线程;
  • handler :拒绝策略,拒绝策略主要有以下取值

+ `AbortPolicy`:丢弃任务并抛出 RejectedExecutionException 异常。
+ `DiscardPolicy`: 直接丢弃任务,但是不抛出异常。
+ `DiscardOldestPolicy`:直接丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
+ `CallerRunsPolicy`:由调用线程处理该任务。

深入理解线程池

上面我和你简单聊了一下线程池的基本构造,线程池有几个非常重要的参数可以细细品味,但是哥们醒醒,接下来才是刺激的地方。

线程池状态

首先我们先来聊聊线程池状态,线程池状态是一个非常有趣的设计点,ThreadPoolExecutor 使用 ctl 来存储线程池状态,这些状态也叫做线程池的生命周期。想想也是,线程池作为一个存储管理线程的资源池,它自己也要有这些状态,以及状态之间的变更才能更好的满足我们的需求。ctl 其实就是一个 AtomicInteger 类型的变量,保证原子性。

ctl 除了存储线程池状态之外,它还存储 workerCount 这个概念,workerCount 指示的是有效线程数,workerCount 表示的是已经被允许启动但不允许停止的工作线程数量。workerCount 的值与实际活动线程的数量不同。

ctl 高低位来判断是线程池状态还是工作线程数量,线程池状态位于高位。

这里有个设计点,为什么使用 AtomicInteger 而不是存储上线更大的 AtomicLong 之类的呢?

Lea 并非没有考虑过这个问题,为了表示 int 值,目前 workerCount 的大小是**(2 ^ 29)-1(约 5 亿个线程),而不是(2 ^ 31)-1(20亿个)可表示的线程**。如果将来有问题,可以将该变量更改为 AtomicLong。但是在需要之前,使用 int 可以使此代码更快,更简单,int 存储占用存储空间更小。

runState 具有如下几种状态

1
2
3
4
5
java复制代码private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

我们先上状态轮转图,然后根据状态轮转图做详细的解释。


这几种状态的解释如下

  • RUNNING: 如果线程池处于 RUNNING 状态下的话,能够接收新任务,也能处理正在运行的任务。可以从 ctl 的初始化得知,线程池一旦创建出来就会处于 RUNNING 状态,并且线程池中的有效线程数为 0。
1
java复制代码private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  • SHUTDOWN: 在调用 shutdown 方法后,线程池的状态会由 RUNNING -> SHUTDOWN 状态,位于 SHUTDOWN 状态的线程池能够处理正在运行的任务,但是不能接受新的任务,这和我们上面说的对与 shutdown 的描述一致。
  • STOP: 和 shutdown 方法类似,在调用 shutdownNow 方法时,程序会从 RUNNING/SHUTDOWN -> STOP 状态,处于 STOP 状态的线程池,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  • TIDYING:TIDYING 状态有个前置条件,分为两种:一种是是当线程池位于 SHUTDOWN 状态下,阻塞队列和线程池中的线程数量为空时,会由 SHUTDOWN -> TIDYING;另一种是当线程池位于 STOP 状态下时,线程池中的数量为空时,会由 STOP -> TIDYING 状态。转换为 TIDYING 的线程池会调用 terminated这个钩子方法,terminated 在 ThreadPoolExecutor 类中是空实现,若用户想在线程池变为 TIDYING 时,进行相应的处理,可以通过重载 terminated 函数来实现。
  • TERMINATED:TERMINATED 状态是线程池的最后一个状态,线程池处在 TIDYING 状态时,执行完terminated 方法之后,就会由 TIDYING -> TERMINATED 状态。此时表示线程池的彻底终止。

重要变量

下面我们一起来了解一下线程池中的重要变量。

1
java复制代码private final BlockingQueue<Runnable> workQueue;

阻塞队列,这个和我们上面说的阻塞队列的参数是一个意思,因为在构造 ThreadPoolExecutor 时,会把参数的值赋给 this.workQueue。

1
java复制代码private final ReentrantLock mainLock = new ReentrantLock();

线程池的主要状态锁,对线程池的状态(比如线程池大小、运行状态)的改变都需要使用到这个锁

1
java复制代码private final HashSet<Worker> workers = new HashSet<Worker>();

workers 持有线程池中所有线程的集合,只有持有上面 mainLock 的锁才能够访问。

1
java复制代码private final Condition termination = mainLock.newCondition();

等待条件,用来支持 awaitTermination 方法。Condition 和 Lock 一起使用可以实现通知/等待机制。

1
java复制代码private int largestPoolSize;

largestPoolSize 表示线程池中最大池的大小,只有持有 mainLock 才能访问

1
java复制代码private long completedTaskCount;

completedTaskCount 表示任务完成的计数,它仅仅在任务终止时更新,需要持有 mainLock 才能访问。

1
java复制代码private volatile ThreadFactory threadFactory;

threadFactory 是创建线程的工厂,所有的线程都会使用这个工厂,调用 addWorker 方法创建。

1
java复制代码private volatile RejectedExecutionHandler handler;

handler 表示拒绝策略,handler 会在线程饱和或者将要关闭的时候调用。

1
java复制代码private volatile long keepAliveTime;

保活时间,它指的是空闲线程等待工作的超时时间,当存在多个 corePoolSize 或 allowCoreThreadTimeOut 时,线程将使用这个超时时间。

下面是一些其他变量,这些变量比较简单,我就直接给出注释了。

1
2
3
4
5
java复制代码private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy(); // 默认的拒绝策略

任务提交

现在我们知道了 ThreadPoolExecutor 创建出来就会处于运行状态,此时线程数量为 0 ,等任务到来时,线程池就会创建线程来执行任务,而下面我们的关注点就会放在任务提交这个过程上。

通常情况下,我们会使用

1
java复制代码executor.execute()

来执行任务,我在很多书和博客教程上都看到过这个执行过程,下面是一些书和博客教程所画的 ThreadPoolExecutor 的执行示意图和执行流程图

执行示意图

处理流程图

ThreadPoolExecutor 的执行 execute 的方法分为下面四种情况

  1. 如果当前运行的工作线程少于 corePoolSize 的话,那么会创建新线程来执行任务 ,这一步需要获取 mainLock 全局锁。
  2. 如果运行线程不小于 corePoolSize,则将任务加入 BlockingQueue 阻塞队列。
  3. 如果无法将任务加入 BlockingQueue 中,此时的现象就是队列已满,此时需要创建新的线程来处理任务,这一步同样需呀获取 mainLock 全局锁。
  4. 如果创建新线程会使当前运行的线程超过 maximumPoolSize 的话,任务将被拒绝,并且使用 RejectedExecutionHandler.rejectEExecution() 方法拒绝新的任务。

ThreadPoolExecutor 采取上面的整体设计思路,是为了在执行 execute 方法时,避免获取全局锁,因为频繁获取全局锁会是一个严重的可伸缩瓶颈,所以,几乎所有的 execute 方法调用都是通过执行步骤2。

上面指出了 execute 的运行过程,整体上来说这个执行过程把非常重要的点讲解出来了,但是不够细致,我查阅 ThreadPoolExecute 和部分源码分析文章后,发现这事其实没这么简单,先来看一下 execute 的源码,我已经给出了中文注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取 ctl 的值
int c = ctl.get();
// 判断 ctl 的值是否小于核心线程池的数量
if (workerCountOf(c) < corePoolSize) {
// 如果小于,增加工作队列,command 就是一个个的任务
if (addWorker(command, true))
// 线程创建成功,直接返回
return;
// 线程添加不成功,需要再次判断,每需要一次判断都会获取 ctl 的值
c = ctl.get();
}
// 如果线程池处于运行状态并且能够成功的放入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次进行检查
int recheck = ctl.get();
// 如果不是运行态并且成功的从阻塞队列中删除
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// worker 线程数量是否为 0
else if (workerCountOf(recheck) == 0)
// 增加工作线程
addWorker(null, false);
}
// 如果不能增加工作线程的数量,就会直接执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

下面是我根据源码画出的执行流程图

下面我们针对 execute 流程进行分析,可能有点啰嗦,因为几个核心流程上面已经提过了,不过为了流程的完整性,我们再在这里重新提一下。

  1. 如果线程池的核心数量少于 corePoolSize,那么就会使用 addWorker 创建新线程,addworker 的流程我们会在下面进行分析。如果创建成功,那么 execute 方法会直接返回。如果没创建成功,可能是由于线程池已经 shutdown,可能是由于并发情况下 workerCountOf(c) < corePoolSize ,别的线程先创建了 worker 线程,导致 workerCoun t>= corePoolSize。
  2. 如果线程池还在 Running 状态,会将 task 加入阻塞队列,加入成功后会进行 double-check 双重校验,继续下面的步骤,如果加入失败,可能是由于队列线程已满,此时会判断是否能够加入线程池中,如果线程池也满了的话,就会直接执行拒绝策略,如果线程池能加入,execute 方法结束。
  3. 步骤 2 中的 double-check 主要是为了判断进入 workQueue 中的 task 是否能被执行:如果线程池已经不是 Running 状态,则应该拒绝添加任务,从 workQueue 队列中删除任务。如果线程池是 Running,但是从 workQueue 中删除失败了,此时的原因可能是由于其他线程执行了这个任务,此时会直接执行拒绝策略。
  4. 如果线程是 Running 状态,并且不能把任务从队列中移除,进而判断工作线程是否为 0 ,如果不为 0 ,execute 执行完毕,如果工作线程是 0 ,则会使用 addWorker 增加工作线程,execute 执行完毕。

添加 worker 线程

从上面的执行流程可以看出,添加一个 worker 涉及的工作也非常多,这也是一个比价难啃的点,我们一起来分析下,这是 worker 的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
java复制代码private boolean addWorker(Runnable firstTask, boolean core) {
// retry 的用法相当于 goto
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 仅在必要时检查队列是否为空。
// 线程池状态有五种,state 越小越是运行状态
// rs >= SHUTDOWN,表示此时线程池状态可能是 SHUTDOWN、STOP、TIDYING、TERMINATED
// 默认 rs >= SHUTDOWN,如果 rs = SHUTDOWN,直接返回 false
// 默认 rs < SHUTDOWN,是 RUNNING,如果任务不是空,返回 false
// 默认 RUNNING,任务是空,如果工作队列为空,返回 false
//
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;


// 执行循环
for (;;) {
// 统计工作线程数量
int wc = workerCountOf(c);
// 如果 worker 数量>线程池最大上限 CAPACITY(即使用int低29位可以容纳的最大值)
// 或者 worker数量 > corePoolSize 或 worker数量>maximumPoolSize ),即已经超过了给定的边界
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;

// 使用 CAS 增加 worker 数量,增加成功,跳出循环。
if (compareAndIncrementWorkerCount(c))
break retry;

// 检查 ctl
c = ctl.get(); // Re-read ctl
// 如果状态不等于之前获取的 state,跳出内层循环,继续去外层循环判断
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

/*
worker数量+1成功的后续操作
* 添加到 workers Set 集合,并启动 worker 线程
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 包装 Runnable 对象
// 设置 firstTask 的值为 -1
// 赋值给当前任务
// 使用 worker 自身这个 runnable,调用 ThreadFactory 创建一个线程,并设置给worker的成员变量thread
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 在持有锁的时候重新检查
// 如果 ThreadFactory 失败或在获得锁之前关闭,请回退。
int rs = runStateOf(ctl.get());

//如果线程池在运行 running<shutdown 或者 线程池已经 shutdown,且firstTask==null
// (可能是 workQueue 中仍有未执行完成的任务,创建没有初始任务的 worker 线程执行)
//worker 数量 -1 的操作在 addWorkerFailed()
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();

// workers 就是一个 HashSet 集合
workers.add(w);

// 设置最大的池大小 largestPoolSize,workerAdded 设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
//如果启动线程失败
// worker 数量 -1
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

真长的一个方法,有点想吐血,其实我肝到现在已经肝不动了,但我一想到看这篇文章的读者们能给我一个关注,就算咳出一口老血也值了。

这个方法的执行流程图如下

这里我们就不再文字描述了,但是上面流程图中有一个对象引起了我的注意,那就是 worker 对象,这个对象就代表了线程池中的工作线程,那么这个 worker 对象到底是啥呢?

worker 对象

Worker 位于 ThreadPoolExecutor 内部,它继承了 AQS 类并且实现了 Runnable 接口。Worker 类主要维护了线程运行过程中的中断控制状态。它提供了锁的获取和释放操作。在 worker 的实现中,我们使用了非重入的互斥锁而不是使用重复锁,因为 Lea 觉得我们不应该在调用诸如 setCorePoolSize 之类的控制方法时能够重新获取锁。

worker 对象的源码比较简单和标准,这里我们只说一下 worker 对象的构造方法,也就是

1
2
3
4
5
java复制代码Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

构造一个 worker 对象需要做三步操作:

  • 初始 AQS 状态为 -1,此时不允许中断 interrupt(),只有在 worker 线程启动了,执行了 runWorker() 方法后,将 state 置为0,才能进行中断。
  • 将 firstTask 赋值给为当前类的全局变量
  • 通过 ThreadFactory 创建一个新的线程。

###任务运行

我们前面的流程主要分析了线程池的 execute 方法的执行过程,这个执行过程相当于是任务提交过程,而我们下面要说的是从队列中获取任务并运行的这个工作流程。

一般情况下,我们会从初始任务开始运行,所以我们不需要获取第一个任务。否则,只要线程池还处于 Running 状态,我们会调用 getTask 方法获取任务。getTask 方法可能会返回 null,此时可能是由于线程池状态改变或者是配置参数更改而导致的退出。还有一种情况可能是由于 异常 而引发的,这个我们后面会细说。

下面来看一下 runWorker 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
java复制代码final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 允许打断
// new Worker() 是 state==-1,此处是调用 Worker 类的 tryRelease() 方法,
// 将 state 置为0
w.unlock();
boolean completedAbruptly = true;
try {
// 调用 getTask() 获取任务
while (task != null || (task = getTask()) != null) {
// 获取全局锁
w.lock();
// 确保只有在线程 STOPING 时,才会被设置中断标志,否则清除中断标志。
// 如果一开始判断线程池状态 < STOPING,但 Thread.interrupted() 为 true,
// 即线程已经被中断,又清除了中断标示,再次判断线程池状态是否 >= stop
// 是,再次设置中断标示,wt.interrupt()
// 否,不做操作,清除中断标示后进行后续步骤
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行前需要调用的方法,交给程序员自己来实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行后需要调用的方法,交给程序员自己来实现
afterExecute(task, thrown);
}
} finally {
// 把 task 置为 null,完成任务数 + 1,并进行解锁
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
// 最后处理 worker 的退出
} finally {
processWorkerExit(w, completedAbruptly);
}
}

下面是 runWorker 的执行流程图


这里需要注意一下最后的 processWorkerExit 方法,这里面其实也做了很多事情,包括判断 completedAbruptly 的布尔值来表示是否完成任务,获取锁,尝试从队列中移除 worker,然后尝试中断,接下来会判断一下中断状态,在线程池当前状态小于 STOP 的情况下会创建一个新的 worker 来替换被销毁的 worker。

任务获取

任务获取就是 getTask 方法的执行过程,这个环节主要用来获取任务和剔除任务。下面进入源码分析环节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码private Runnable getTask() {
// 判断最后一个 poll 是否超时。
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 必要时检查队列是否为空
// 对线程池状态的判断,两种情况会 workerCount-1,并且返回 null
// 线程池状态为 shutdown,且 workQueue 为空(反映了 shutdown 状态的线程池还是要执行 workQueue 中剩余的任务的)
// 线程池状态为 stop(shutdownNow() 会导致变成 STOP)(此时不用考虑 workQueue 的情况)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 是否需要定时从 workQueue 中获取
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果工作线程的数量大于 maximumPoolSize 会进行线程剔除
// 如果使用了 allowCoreThreadTimeOut ,并且工作线程不为0或者队列有任务的话,会直接进行线程剔除
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

getTask 方法的执行流程图如下

工作线程退出

工作线程退出是 runWorker 的最后一步,这一步会判断工作线程是否突然终止,并且会尝试终止线程,以及是否需要增加线程来替换原工作线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码private void processWorkerExit(Worker w, boolean completedAbruptly) {
// worker数量 -1
// completedAbruptly 是 true,突然终止,说明是 task 执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的 worker 线程数量需要-1
// completedAbruptly 是 false 是突然终止,说明是 worker 线程没有 task 可执行了,不用-1,因为已经在 getTask() 方法中-1了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

// 从 Workers Set 中移除 worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

// 尝试终止线程,
tryTerminate();

// 是否需要增加 worker 线程
// 线程池状态是 running 或 shutdown
// 如果当前线程是突然终止的,addWorker()
// 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
// 故如果调用线程池 shutdown(),直到workQueue为空前,线程池都会维持 corePoolSize 个线程,
// 然后再逐渐销毁这 corePoolSize 个线程
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

源码搞的有点头大了,可能一时半会无法理解上面这些源码,不过你可以先把注释粘过去,等有时间了需要反复刺激,加深印象!

其他线程池

下面我们来了解一下其他线程池的构造原理,主要涉及 FixedThreadPool、SingleThreadExecutor、CachedThreadPool。

newFixedThreadPool

newFixedThreadPool 被称为可重用固定线程数的线程池,下面是 newFixedThreadPool 的源码

1
2
3
4
5
java复制代码public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

可以看到,newFixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为创建 FixedThreadPool 时指定的参数 nThreads,也就是说,在 newFiexedThreadPool 中,核心线程数就是最大线程数。

下面是 newFixedThreadPool 的执行示意图


newFixedThreadPool 的工作流程如下

  • 如果当前运行的线程数少于 corePoolSize,则会创建新线程 addworker 来执行任务
  • 如果当前线程的线程数等于 corePoolSize,会将任务直接加入到 LinkedBlockingQueue 无界阻塞队列中,LinkedBlockingQueue 的上限如果没有制定,默认为 Integer.MAX_VALUE 大小。
  • 等到线程池中的任务执行完毕后,newFixedThreadPool 会反复从 LinkedBlockingQueue 中获取任务来执行。

相较于 ThreadPoolExecutor,newFixedThreadPool 主要做了以下改变

  • 核心线程数等于最大线程数,因此 newFixedThreadPool 只有两个最大容量,一个是线程池的线程容量,还有一个是 LinkedBlockingQueue 无界阻塞队列的线程容量。
  • 这里可以看到还有一个变化是 0L,也就是 keepAliveTime = 0L,keepAliveTime 就是到达工作线程最大容量后的线程等待时间,0L 就意味着当线程池中的线程数大于 corePoolsize 时,空余的线程会被立即终止。
  • 由于使用无界队列,运行中的 newFixedThreadPool 不会拒绝任务,也就是不会调用 RejectedExecutionHandler.rejectedExecution 方法。

newSingleThreadExecutor

newSingleThreadExecutor 中只有单个工作线程,也就是说它是一个只有单个 worker 的 Executor。

1
2
3
4
5
6
7
java复制代码public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

可以看到,在 newSingleThreadExecutor 中,corePoolSize 和 maximumPoolSize 都被设置为 1,也不存在超时情况,同样使用了 LinkedBlockingQueue 无界阻塞队列,除了 corePoolSize 和 maximumPoolSize 外,其他几乎和 newFixedThreadPool 一模一样。

下面是 newSingleThreadExecutor 的执行示意图


newSingleThreadExecutor 的执行过程和 newFixedThreadPool 相同,只是 newSingleThreadExecutor 的工作线程数为 1。

newCachedThreadPool

newCachedThreadPool 是一个根据需要创建工作线程的线程池,newCachedThreadPool 线程池最大数量是 Integer.MAX_VALUE,保活时间是 60 秒,使用的是SynchronousQueue 无缓冲阻塞队列。

1
2
3
4
5
6
java复制代码public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

它的执行示意图如下

  • 首先会先执行 SynchronousQueue.offer 方法,如果当前 maximumPool 中有空闲线程正在执行 SynchronousQueue.poll ,就会把任务交给空闲线程来执行,execute 方法执行完毕,否则的话,继续向下执行。
  • 如果 maximumPool 中没有线程执行 SynchronousQueue.poll 方法,这种情况下 newCachedThreadPool 会创建一个新线程执行任务,execute 方法执行完成。
  • 执行完成的线程将执行 poll 操作,这个 poll 操作会让空闲线程最多在 SynchronousQueue 中等待 60 秒钟。如果 60 秒钟内提交了一个新任务,那么空闲线程会执行这个新提交的任务,否则空闲线程将会终止。

这里的关键点在于 SynchronousQueue 队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程对应的移除操作。这其实就是一种任务传递,如下图所示

其实还有一个线程池 ScheduledThreadPoolExecutor ,就先不在此篇文章做详细赘述了。

线程池实践考量因素

下面介绍几种在实践过程中使用线程池需要考虑的几个点

  • 避免任务堆积,比如我们上面提到的 newFixedThreadPool,它是创建指定数目的线程,但是工作队列是无界的,这就导致如果工作队列线程太少,导致处理速度跟不上入队速度,这种情况下很可能会导致 OOM,诊断时可以使用 jmap 检查是否有大量任务入队。
  • 生产实践中很可能由于逻辑不严谨或者工作线程不能及时释放导致 线程泄漏,这个时候最好检查一下线程栈
  • 避免死锁等同步问题
  • 尽量避免在使用线程池时操作 ThreadLocal,因为工作线程的生命周期可能会超过任务的生命周期。

线程池大小的设置

线程池大小的设置也是面试官经常会考到的一个点,一般需要根据任务类型来配置线程池大小

  • 如果是 CPU 密集型任务,那么就意味着 CPU 是稀缺资源,这个时候我们通常不能通过增加线程数来提高计算能力,因为线程数量太多,会导致频繁的上下文切换,一般这种情况下,建议合理的线程数值是 N(CPU)数 + 1。
  • 如果是 I/O 密集型任务,就说明需要较多的等待,这个时候可以参考 Brain Goetz 的推荐方法 线程数 = CPU核数 × (1 + 平均等待时间/平均工作时间)。参考值可以是 N(CPU) 核数 * 2。

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

后记

这篇文章真的写了很久,因为之前对线程池认识不是很深,所以花了大力气来研究,希望这篇文章对你有所帮助。

另外,添加我的微信 becomecxuan,加入每日一题群,每天一道面试题分享,更多内容请参见我的 Github,成为最好的 bestJavaer,已经收录此篇文章,详情见原文链接。

我自己肝了六本 PDF,微信搜索 程序员cxuan 关注公众号后,在后台回复 cxuan ,领取全部 PDF,这些 PDF 如下

六本 PDF 链接

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SQL子查询总结:相关子查询与非相关子查询有什么区别

发表于 2021-02-10

如果一个select语句能够返回单个值或者一列值,且该select语句嵌套在另一个SQL语句(例如select语句、insert语句、update语句或者delete语句)中,那么该select语句成为“子查询”(也叫内层查询),包含子查询的SQL语句称为“主查询”(也叫外层查询)。为了标记子查询与主查询之间的关系,通常将子查询写在小括号内。子查询一般用在主查询的where子句或having子句中,与比较运算符或者逻辑运算符一起构成where筛选条件或having筛选条件。子查询分为“相关子查询”(Dependent Subquery)与“非相关子查询”。

非相关子查询

如果子查询返回单个值,则可以讲一个表达式的值与子查询的结果进行比较。 例如,检索成绩比学生张三平均分高的所有学生及课程的信息, 可以使用下面的SQL语句, 执行结果如下图。

1
2
3
4
5
6
7
8
9
csharp复制代码mysql> select class_name, student.student_no, student_name, course_name, score
-> from classes join student on student.class_no = classes.class_no
-> join choose on choose.student_no = student.student_no
-> join course on choose.course_no = course.course_no
-> where score > (
-> select avg(score)
-> from student, choose
-> where student.student_no = choose.student_no and student_name = '张三'
-> );

非相关子查询

说明

该示例中的子查询是一个单独的select语句,可以不依赖主查询单独运行。这种不依靠主查询,能够独立运行的子查询称为**“非相关子查询”**。

执行过程

  1. 执行子查询,其结果不被显示,而是传递给外部查询,作为外部查询的条件使用。
  2. 执行外部查询,并显示整个结果。

相关子查询

下面的示例演示了相关子查询,代码第七行标记了两条子查询语句之间的区别(其他SQl代码完全相同),执行结果如下图。

1
2
3
4
5
6
7
8
9
csharp复制代码mysql> select class_name, student.student_no, student_name, course_name, score
-> from classes join student on student.class_no = classes.class_no
-> join choose on choose.student_no = student.student_no
-> join course on choose.course_no = course.course_no
-> where score > (
-> select avg(score)
-> from choose
-> where student.student_no = choose.student_no and student_name = '张三'
-> );

相关子查询

说明

从执行结果可以看到,子查询可以仅仅使用自己定义的数据源,也可以“直接引用”主查询中的数据源,但两者意义完全不同。

  1. 如果子查询中仅仅使用了自己定义的数据源, 这种查询是非相关子查询。 非相关子查询是独立于外部查询的子查询, 子查询总共执行一次, 执行完毕后将值传递给主查询。
  2. 如果子查询中使用了主查询的数据源, 这种查询是相关子查询, 此时主查询的执行与相关子查询的执行相互依赖。

执行过程

  1. 从外层查询中取出一个元组,将元组相关列的值传递给内层查询。
  2. 执行内层查询,得到子查询操作的值。
  3. 外查询根据子查询返回的结果或结果集得到满足条件的行。
  4. 然后外层查询取出下一个元组重复做步骤1-3,直到外层的元组全部处理完毕。

如何区分

说了这么多,那我们该如何快速区分非相关子查询和相关子查询呢?

最简单的办法的就是直接看子查询本身能否执行。比如执行上面的例子中的子查询:

1
2
3
csharp复制代码mysql> select avg(score)
-> from choose
-> where student.student_no = choose.student_no and student_name = '张三';

子查询

会报错:1054 - Unknown column 'student.student_no' in 'where clause' 这样的查询语句构成的子查询便为相关子查询。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

代码通俗易懂的讲解 volatile 禁止指令重排原理 vo

发表于 2021-02-09

volatile 禁止指令重排

本文介绍下volatile禁止指令重排的背景及最终效果。

volatile 可见性保证

我们都知道 volatile 会保证读写修饰的变量时会将对应变量同步给主存。

但是实际上, volatile 修饰的变量不仅保证自身,还会保证其他局部变量的可见性。

以下结合例子讲解下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码
public class MyClass {
private int years;
private int months;
private volatile int days;

// 读主存的时候,会把后边的变量一起顺带读出来
public int totalDays() {
int total = this.days;
total += months * 30;
total += years * 365;
return total;
}

// 写主存的时候, 会把前边的变量一起顺带写进去
public void update(int years, int months, int days){
this.years = years;
this.months = months;
this.days = days;
}
}

Reordinrg 指令重排的挑战

指令重排前的代码

1
2
3
4
5
ini复制代码int a = 1;
int b = 2;

a++;
b++;

可能的重排后代码

1
2
3
4
5
6
ini复制代码// 估计是为了读写操作一起,减少内存缺页率
int a = 1;
a++;

int b = 2;
b++;

对volatile可见性保证的影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码
// 指令重排前,写主存的时候, 会把前边的变量一起写进去
public void update(int years, int months, int days){
this.years = years;
this.months = months;
this.days = days;
}

// 指令重排后,写主存的时候, 本来该顺带写的变量不写了
public void update(int years, int months, int days){
this.days = days;
this.years = years;
this.months = months;
}

hanppens-before 发生前保证

为了保证 vaolatile 对其他变量可见性的保证 规则, volatile 的 happens-before 规定对指令重排的限制。

读写其他变量如果本身在写volatile变量之前的, 禁止重排到写volatile变量之后。

读写其他变量如果本身在读volatile变量之后的, 禁止重排到读volatile变量之前。

据说 happens-before 是 JSR-133 的规范质疑,内存屏障是 CPU 的指令。

前者是目的, 后者是实现目的的手段。

本文主要参考

tutorials.jenkov.com/java-concur…

mp.weixin.qq.com/s/DZkGRTan2…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Spring】Spring的PostProcessor-后

发表于 2021-02-08

PostProcessor

为什么要有这个后置处理器这种接口,了解Spring容器的朋友应该知道,所有的ApplicationContext都需要调用AbstractApplicationContext#refresh方法,这是一个模板方法,既然使用了模板方法设计模式,那么就应该提供hook,也就是钩子函数。而PostProcessor就是为了实现这种定制化的扩展需求而制定的。

UML

  • BeanPostProcessor
    BeanPostProcessor
  • BeanFactoryPostProcessor

BeanFactoryPostProcessor

PostProcessor概览

BeanFactoryPostProcessor

容器级别的后置处理器,其内部仅声明了一个方法。Spring会根据声明的顺序对后置处理器进行调用,而BeanFactoryPostProcessor会在容器初始化期间将容器本身交由接口实现类去处理。获取到了容器,就可以做很多定制化的操作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@FunctionalInterface
public interface BeanFactoryPostProcessor {

/**
* Modify the application context's internal bean factory after its standard
* initialization. All bean definitions will have been loaded, but no beans
* will have been instantiated yet. This allows for overriding or adding
* properties even to eager-initializing beans.
* @param beanFactory the bean factory used by the application context
* @throws org.springframework.beans.BeansException in case of errors
*/
void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;

}
BeanDefinitionRegistryPostProcessor

从注解容器解析BeanDefinition的篇章中,你会了解到BeanDefinitionRegistryPostProcessor是一种特殊的BeanFactoryPostProcessor,但是Spring会优先处理这类后置处理器,再处理常规的一些BeanFactoryPostProcessor.可以理解成VIP级别的BeanFactoryPostProcessor,优先于常规BeanFactoryPostProcessor执行。

其内部也声明了一个专用的注册后置处理方法。postProcessBeanDefinitionRegistry

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public interface BeanDefinitionRegistryPostProcessor extends BeanFactoryPostProcessor {

/**
* Modify the application context's internal bean definition registry after its
* standard initialization. All regular bean definitions will have been loaded,
* but no beans will have been instantiated yet. This allows for adding further
* bean definitions before the next post-processing phase kicks in.
* @param registry the bean definition registry used by the application context
* @throws org.springframework.beans.BeansException in case of errors
*/
void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException;

}

BeanPostProcessor

容器创建Bean时的hook实例,你可以通过实现BeanPostProcessor来影响所有Bean的生命周期,包括Initialization的前置与后置处理.在后面学习Spring Bean的生命周期中,会接触BeanPostProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public interface BeanPostProcessor {

@Nullable
default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

@Nullable
default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

}

使用postProcessor

MyBeanFactoryPostProcessor

  • 基础对象
1
2
3
4
5
6
7
8
9
java复制代码@Data
public class Root {

private String name;

private String description;

private boolean isRoot;
}
  • 通过后置处理器往容器注入BeanDefinition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码package com.xjm.bean.postprocessor;

import com.xjm.model.Root;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.*;
import org.springframework.context.annotation.Configuration;

/**
* @author jaymin
* 2021/1/7 23:10
*/
@Configuration
public class MyBeanFactoryPostProcessor implements BeanDefinitionRegistryPostProcessor {

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

}

/**
* 标准初始化后,修改应用程序上下文的内部BeanDefiniton注册表。<br>
* 所有常规bean定义都将被加载,但尚未实例化任何bean。 <br>
* 这允许在下一个后处理阶段开始之前添加更多的BeanDeinition。 <br>
* Spring工厂级别的BeanFactoryPostProcessor,优先级别高于常规的BeanFactoryPostProcessor.<br>
* 第三方框架整合时,可以实现这个注册接口进行BeanDefinition注册.如Mybatis.<br>
* 设计模式:责任链模式<br>
* @param registry the bean definition registry used by the application context
* @throws BeansException
*/
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
BeanDefinition beanDefinition = getBeanDefinition(Root.class);
registry.registerBeanDefinition("root",beanDefinition);
System.out.println("Customer BeanPostProcessors execute.");
}

private BeanDefinition getBeanDefinition(Class<?> clazz){
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(clazz).getBeanDefinition();
return beanDefinition;
}
}
  • test

@ComponentScan根据自己的包路径进行修改,此处程序正常运行,说明Root已经注册到容器中了.

1
2
3
4
5
6
7
8
java复制代码@Configuration
@ComponentScan(value = "com.xjm")
public class BeanDefinitionDemoByAnnotation {
public static void main(String[] args) {
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(AnnotationContextDemo.class);
Root root = applicationContext.getBean("root", Root.class);
}
}

MyBeanPostProcessor

简单实现BeanPostProcessor,主要是观察其行为的作用域.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码@Configuration
public class MyBeanPostProcessor implements BeanPostProcessor {

@Override
@Nullable
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
System.out.println(beanName + " invoking MyBeanPostProcessor#postProcessBeforeInitialization");
return bean;
}

/**
* 可以对所有的Bean做统一操作
* @param bean the new bean instance
* @param beanName the name of the bean
* @return
* @throws BeansException
*/
@Override
@Nullable
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
System.out.println(beanName + " invoking MyBeanPostProcessor#postProcessAfterInitialization");
return bean;
}

}
  • result

result

可以看到,实现的MyBeanPostProcessor影响了所有Bean的初始化过程.

总结

  • 后置处理器是Spring留给开发者的扩展性钩子接口。
  • 从后置处理器上可以简略分为:BeanFactoryPostProcessor和BeanPostProcessor.其中,BeanFactoryPostProcessor可以操作容器,BeanPostProcessor会影响所有Bean的生命周期.
  • BeanDefinitionRegistryPostProcessor是一种特殊的BeanFactoryPostProcessor,优于其他BeanFactoryPostProcessor执行.

扩展阅读

Spring系列六:Spring BeanPostProcessor

谈谈Spring中的BeanPostProcessor接口

Spring 执行顺序:PostProcessor 接口

Spring BeanPostProcessor

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

环境安装手册:安装 MySQL8 在 Centos8 上安装

发表于 2021-02-08

在 Centos8 上安装

  1. 首先下载二进制压缩包mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz,解压。
1
2
3
4
shell复制代码mkdir /usr/local/mysql 
mkdir /usr/local/mysql/mysql-8.0.23 # MySQL安装在这个目录
tar -xvf `mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz` /usr/local/mysql/mysql-8.0.23
cd /usr/local/mysql/mysql-8.0.23
  1. 添加MySQL的用户并指定用户组。
1
2
shell复制代码groupadd mysql
useradd -r -g mysql mysql
  1. 在安装路径下创建data目录。
1
2
shell复制代码cd /usr/local/mysql/mysql-8.0.23
mkdir data
  1. 创建一个文件夹用来存放sock文件,并为该文件夹赋予权限。我创建的文件夹就在安装目录下。
1
2
3
4
shell复制代码cd /usr/local/mysql/mysql-8.0.23
mkdir conf
mkdir conf/mysql
chmod 777 conf/mysql

这一步之前看到有人说放在/tmp下,但是/tmp是临时文件夹,数据不会长期保存。所以时间久了,MySQL就无法启动了。所以不建议放在/tmp。亲身经历,要不然也不会重装了。

  1. 在/etc下创建配置文件my.cnf,并配置相关信息。
  • socket的目录为第4步创建的文件夹
  • basedir是安装路径
  • datadir是安装路径下的data文件夹
  • 最后的skip-grant-tables表示待会登录时不需要密码,修改密码后把这行注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
properties复制代码[mysql]
default-character-set=utf8
socket=/usr/local/mysql/mysql-8.0.23/conf/mysql/mysql.sock

[mysqld]
port=3306
user=mysql
socket=/usr/local/mysql/mysql-8.0.23/conf/mysql/mysql.sock
basedir=/usr/local/mysql/mysql-8.0.23
datadir=/usr/local/mysql/mysql-8.0.23/data

max_connections=200

# character-set-server=utf8

default-storage-engine=INNODB

max_allowed_packet=16M

default-authentication-plugin=mysql_native_password

transaction_isolation = READ-COMMITTED
character-set-server = utf8mb4
collation-server = utf8mb4_general_ci
lower_case_table_names = 1

skip-grant-tables
  1. 把安装目录的用户和组改为mysql。
1
shell复制代码chown -R mysql:mysql ./
  1. 初始化MySQL,会生成一个随机密码(由于之前设置了不使用密码,所以这个密码不重要,如果没有开启免密,就用这个密码登录),并在data目录下创建一些文件。
1
shell复制代码./bin/mysqld --initialize --user=mysql --basedir=/usr/local/mysql/mysql-8.0.23 --datadir=/usr/local/mysql/mysql-8.0.23/data

注意把basedir,datadir修改为自己的路径。

  1. 在/etc/profile中配置环境变量。
1
2
3
properties复制代码MYSQL_HOME=/usr/local/mysql/mysql-8.0.23
PATH=$PATH:$MYSQL_HOME/bin:$MYSQL_HOME/support-files
export PATH

使配置生效

1
shell复制代码source /etc/profile
  1. 启动服务,并修改密码。
    启动服务使用mysql.server命令,该命令在安装路径下的support-files中,所以配置环境变量时也把这个目录配置了。
1
2
3
4
5
6
bash复制代码mysql.server start # 启动
mysql -uroot -p # 登录

# 修改密码
use mysql;
alter user 'root'@'localhost' identified by 'new passwd';

如果修改密码报错了,先执行flush privileges;,再修改密码。

修改密码后,要把my.cnf中的skip-grant-tables注释掉,再次重启服务mysql.server restart就可以使用密码登录了。

  1. 开启远程登录。
1
2
3
4
5
6
7
shell复制代码use mysq;
create user 'root'@'%' identified by 'root';
grant all privileges on *.* to 'root'@'%' with grant option;
flush privileges;
alter user 'root'@'%' identified with mysql_native_password by 'your passwd';
flush privileges;
exit;

设置后重启服务。

  1. 防火墙开启3306端口。
    先查看防火墙已经放行的端口,
1
shell复制代码firewall-cmd --list-ports

如果没有放行3306,将端口放行。

1
2
shell复制代码firewall-cmd --zone=public --add-port=3306/tcp --permanent
systemctl restart firewalld.service # 重启防火墙服务
  • service firewall start 开启防火墙
  • service firewall stop 关闭防火墙
  • service firewall restart 重启防火墙
  • firewall-cmd --state 查看防火墙状态

到这里就全部安装完成了,赶快连上服务器试试吧!

在 Ubuntu 上安装

Ubuntu 20.04.2 + MySQL8

使用 root 权限,依次执行如下命令:

1
2
3
4
5
shell复制代码wget https://dev.mysql.com/get/mysql-apt-config_0.8.18-1_all.deb
dpkg -i mysql-apt-config_0.8.18-1_all.deb
apt update
apt install mysql-server
[ok.]

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

要想用活Redis,Lua脚本是绕不过去的坎 前言 发布与订

发表于 2021-02-08

前言

Redis 当中提供了许多重要的高级特性,比如发布与订阅,Lua 脚本等。Redis 当中也提供了自增的原子命令,但是假如我们需要同时执行好几个命令的同时又想让这些命令保持原子性,该怎么办呢?这时候就可以使用本文介绍的 Lua 脚本来实现。

发布与订阅

发布与订阅功能理论上来说可以直接通过一个双端链表就可以实现了,然而这种通过普通的双端链表来实现的发布与订阅功能有两个局限性:

  • 如果生产者生产消息的速度远大于消费者消费消息的速度,那么链表中未消费的消息会大量堆积,导致占用大量的内存。
  • 基于链表实现的消息队列,不支持一对多的消息分发。

因为普通双端链表来实现发布与订阅功能有这两个局限性,故而 Redis 当中并没有直接通过双端列表来实现。在 Redis 中的发布与订阅可以分为两种类型:基于频道和基于模式。

基于频道的实现

基于频道的实现方式主要通过以下三个命令:

  • subscribe channel-1 channel-2:订阅一个或者多个频道。
  • unsubscribe channel-1:取消频道的订阅(命令操作界面上无法退订)。
  • publish channel-1 message:向频道 channel-1 发送消息 message。

打开一个客户端 一,输入订阅命令 subscribe music movie,表示当前客户端订阅了 music 和 movie 两个频道的消息:

然后再打开一个客户端二,执行以下发布消息命令:

1
2
3
java复制代码publish movie myCountry //发布消息 myCountry 到 movie 频道
publish music love //发布消息 love 到 music 频道
publish tv myHome //发布消息 myHome 到 tv 频道

前面两个频道发布之后返回 1 就表示当前有 1 个客户端订阅了该频道,消息已经发送到这个客户端。

这时候我们再回到之前的客户端一,就会发现客户端一收到了消息 myCountry 和 love 两条消息,而 myHome 这条消息是属于频道 tv,客户端一并没有订阅,故而不会收到:

同时,还有以下 2 个命令可以查看当前客户端订阅的频道信息:

  • punsub channels [channel_name] :查看当前服务器被订阅的频道。不带参数则返回所有频道,后面的参数可以使用通配符 ? 或者 *。
  • pubsub numsub channel_name [channel_name]:查看指定频道的订阅数(可同时查看多个)。

实现原理分析

客户端与其订阅的频道信息被保存在 redisServer 对象中的 pubsub_channels 属性中。

1
2
3
4
c复制代码struct redisServer {
dict *pubsub_channels;//保存了客户端及其订阅的频道信息
//... 省略其他信息
};

pubsub_channels 属性是一个字典,其 key 值保存的就是频道名,value 是一个链表,链表中保存的就是每个客户端的 id,下图就是基于频道订阅的存储结构示意图:

  • 订阅
    订阅的时候首先会检查字典内是否存在这个频道:如果不存在,则需要为当前频道创建一个字典,同时创建一个链表作为 value,并将当前客户端 id 放入链表;如果存在,则直接将当前客户端 id 放入链表末尾即可。
  • 取消订阅
    取消订阅的时候需要将客户端 id 从对应的链表中移除,如果移除之后链表为空,则需要同时将该频道从字典内删除。
  • 发送消息
    发送消息时首先会去 pubsub_channels 字典内寻找键,如果发现有可以匹配上的键,则会找到对应的链表,进行遍历发送消息。

基于模式的实现

基于模式的发布与订阅实现方式主要通过以下三个命令:

  • psubscribe pattern-1 pattern-2:订阅一个或者多个模式,模式可以通过通配符 ? 和 * 来表示。
  • punsubscribe pattern-1 pattern-1:取消模式的订阅(基于命令操作,界面上无法退订)
  • publish channel-1 message :向频道 channel-1 发送消息 message。这里和上面基于频道命令是一样的。

打开一个客户端 一,输入订阅命令 psubscribe m*,表示当前客户端订阅了所有以 m 开头的频道:

然后再打开一个客户端二,执行一下发布消息命令:

1
2
3
java复制代码publish movie myCountry //发布消息 myCountry 到 movie 频道
publish music love //发布消息 love 到 music 频道
publish tv myHome //发布消息 myHome 到 tv 频道

前面两个频道发布之后返回 1 就表示当前有 1 个客户端订阅了该频道(上面基于频道订阅的客户端关闭之后会自动取消订阅),消息已经发送到这个客户端。

这时候我们再回到之前的客户端一,就会发现客户端一收到了 myCountry 和 love 两条消息,因为这两个频道都是以 m 开头的,而 myHome 这条消息是属于频道 tv,并不是以 m 开头,客户端一并没有订阅,故而不会收到:

同样的,基于模式的订阅也提供了一个查询命令:

  • pubsub numpat:查询当前服务器被订阅模式的数量。

实现原理分析

客户端与其订阅的模式信息被保存在 redisServer 对象中的 pubsub_patterns 属性中。

1
2
3
4
c复制代码struct redisServer {
list pubsub_patterns;//保存了客户端及其订阅的模式信息
//...省略其他信息
};

pubsub_patterns 属性是一个列表,其列表内结构(源码 serer.h 内)定义如下:

1
2
3
4
c复制代码typedef struct pubsubPattern {
client *client;//订阅模式的客户端
robj *pattern;//被订阅的模式
} pubsubPattern;

  • 订阅
    新建一个 pubsubPattern 数据结构加入到链表 pubsub_patterns 的结尾。
  • 取消订阅
    从链表中将当前取消订阅的客户端 pubsubPattern 从链表 pubsub_patterns 中移除
  • 发送消息
    此时需要遍历整个链表来寻找能匹配的模式。之所以基于模式场景使用链表是因为模式支持通配符,所以没有办法直接用字典实现。

PS:当基于频道和基于模式两种订阅同时都存在时,Redis 会先去寻找频道字典,再去遍历模式链表进行消息发送。

Lua 脚本

Redis 从 2.6 版本开始支持 Lua 脚本,为了支持 Lua 脚本,Redis 在服务器中嵌入了 Lua 环境。

使用 Lua 脚本最大的好处是 Redis 会将整个脚本作为一个整体执行,不会被其他请求打断,可以保持原子性且减少了网络开销。

Lua 脚本的调用

Lua 脚本的执行语法如下:

1
java复制代码eval lua-script numkeys key [key ...] arg [arg ...]
  • eval:执行 Lua 脚本的命令。
  • lua-script:Lua 脚本内容。
  • numkeys:表示的是 Lua 脚本中需要用到多少个 key,如果没用到则写 0。
  • key [key …]:将 key 作为参数按顺序传递到 Lua 脚本,numkeys 是 0 时则可省略。
  • arg:Lua 脚本中用到的参数,如果没有可省略。

接下来我们执行一个不带任何参数的简单 Lua 脚本命令:

1
java复制代码eval "return 'Hello Redis'" 0

Lua 脚本中执行 Redis 命令

在 Lua 脚本中执行 Redis 命令时需要使用以下语法:

1
java复制代码redis.call(command, key [key ...] argv [argv…])
  • command:Redis 中的命令,如 set、get 等。
  • key:操作 Redis 中的 key 值,相当于我们调用方法时的形参。
  • param:代表参数,相当于我们调用方法时的实参。

假如我们想执行一个命令 set name lonely_wolf,那么利用 Lua 脚本则应该这么执行:

1
java复制代码eval "return redis.call('set',KEYS[1],ARGV[1])" 1 name lonely_wolf

需要注意的是:KEYS 和 ARGV 必须要大写,参数的下标从 1 开始。上面命令中 1 表示当前需要传递 1 个 key

Lua 脚本摘要

有时候如果我们执行的一个 Lua 脚本很长的话,那么直接这么调用 Lua 脚本的话非常不方便,所以 Redis 当中提供了一个命令 script load 来手动给每一个 Lua 脚本生成摘要,这里之所以要说手动的原因是即使我们不使用这个命令,每次调用完 Lua 脚本的时候,Redis 也会为每个 Lua 脚本生成一个摘要。

其他相关命令:

  • script exists 摘要:判断一个摘要是否存在。0 表示不存在,1 表示存在。
  • script flush:清除所有 Lua 脚本缓存。

接下来我们来验证一下,依次执行以下命令:

1
2
3
4
5
6
java复制代码script load "return redis.call('set',KEYS[1],ARGV[1])"  //给当前 Lua脚本生成摘要,这时候会返回一个摘要
evalsha "c686f316aaf1eb01d5a4de1b0b63cd233010e63d" 1 address china //相当于执行命令 set address china
get address //获取 adress,确认上面的脚本是否执行成功
script exists "c686f316aaf1eb01d5a4de1b0b63cd233010e63d" //判断当前摘要的 Lua脚本是否存在
script flush //清除所有 Lua脚本缓存
script exists "c686f316aaf1eb01d5a4de1b0b63cd233010e63d" //清除之后这里就不存在了

执行之后得到如下效果:

Lua 脚本文件

当我们的 Lua 脚本很长时,直接在命令窗口中写脚本是不直观的,也很难发现语法问题,所以 Redis 当中也支持我们直接把先把脚本写入文件中,然后直接调用文件。
比如我们新建一个 test.lua 脚本:

1
2
lua复制代码redis.call('set',KEYS[1],ARGV[1])
return redis.call('get',KEYS[1])

将文件上传到指定目录之后,执行如下命令:

1
java复制代码redis-cli --eval test.lua 1 age , 18 //注意 key 和 arg 参数之间要以逗号隔开,且逗号两边的空格不能省略

这时候就可以正常返回 18:

脚本异常

我们知道,Redis 的指令是单线程执行的,而现在使用了 Lua 脚本,我们就可以通过 Lua 脚本来实现一些业务逻辑,那么如果 Lua 脚本执行超时或者陷入了死循环,这个时候其他的指令就会被阻塞,导致 Redis 无法正常使用。这个时候应该如何处理呢?

脚本超时

为了解决 Lua 脚本超时的问题,Redis 提供了一个超时时间的参数 lua-time-limit 来控制 Lua 脚本执行的超时时间,单位是毫秒,默认是 5000 (即 5 秒),到达超时时间之后 Lua 会自动中断脚本。

脚本陷入死循环

假如脚本陷入了死循环,这时候超时时间就不起作用了,我们来模拟一下:
首先打开客户端一,执行一个死循环的 lua 脚本:

1
java复制代码eval 'while(true) do end' 0

然后打开另一个客户端二,任意执行一个命令:

1
java复制代码get name

这时候会返回 busy,表示当前无法执行这个命令:

提示 busy 之后,同时 Redis 也给出了解决方案,我们只能只用 script kill 或者 shutdown nosave 命令,这两个命令又是做什么用的呢?

  • script kill:当脚本陷入死循环之后,执行这个命令可以强制 Lua 脚本中断执行。这个脚本的局限性就是当前陷入死循环的 Lua 脚本必须没有成功执行过命令。
  • shutdown nosave:强制退出 Lua 脚本,可以解决 script kill 命令的局限性。

接下来让我们在客户端二执行命令 script kill,然后再去看看陷入死循环的客户端一的效果:

可以看到,客户端一的 Lua 脚本已经退出了,根据后面的提示可以知道就是因为执行了 script kill 命令而导致了 Lua 脚本的中断。

现在我们重新用客户端一执行下面这个 Lua 脚本,这个脚本和上面的脚本区别就是这里执行成功了一个 Redis 命令之后才开始死循环:

1
java复制代码eval "redis.call('set','age','28') while true do end" 0

这时候再去客户端二执行 script kill 命令,发现无法中止 Lua 脚本了:

这里不允许直接中断 Lua 脚本是因为在死循环前已经有 Redis 命令被成功执行了,如果直接中断,那么就会造成数据不一致问题。

在这种场景下,只能通过执行 shutdown nosave 命令来强行中断 Lua 脚本,这里因为加了 nosave 之后不会触发 Redis 的持久化,所以当重启 Redis 服务之后,可以保证数据的一致性,下图就是执行 shutdown nosave 命令之后客户端一的效果图:

为什么可以执行 script kill 命令

Redis 当中执行命令是单线程的,那么为什么 Lua 脚本陷入死循环之后其他客户端还可以执行 script kill 命令呢?

这是因为 Lua 脚本引擎提供了钩子(hook)函数,它允许在内部虚拟机执行指令时运行钩子代码,所以 Redis 正是利用了这一原理,在执行 Lua 脚本之前设置了一个钩子,也就是说 script kill 命令是通过钩子(hook)函数来执行的。

总结

本文主要介绍了 Redis 中的发布订阅功能和 Lua 脚本的使用,使用 Lua 脚本可以让多个命令原子执行,减少网络开销,但是同时也要注意 Lua 脚本引发的死循环问题。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

分布式高级篇(七) - 商城业务 - 购物车

发表于 2021-02-08

购物车

环境搭建

  • 修改本地etc文件
1
复制代码cart.mall.com

image-20210202091824760

  • 拷贝静态资源到nginx

image-20210202093854707

  • 将两个HTML页面,拷贝到购物车服务的templates文件夹下,并修改两个html文件中的静态资源引用路径

image-20210202094315588

image-20210202094445426

  • 配置网关路由

image-20210202100620321

  • 页面效果

image-20210202102813795

购物车需求

需求描述

  • 用户可以在登录状态下将商品添加到购物车【登录购物车 / 在线购物车】
+ 放入数据库 (购物车读写都是高并发,不适合使用 mysql)
+ MongoDB (并不能带来很高的性能提升)
+ 放入 Redis(采用)


登录以后,会将临时购物车的数据全部合并到登录的购物车信息中,并**清空临时购物车**
  • 用户可以在未登录状态下将商品添加到购物车【游客购物车 / 离线购物车 / 临时购物车】
+ 放入 localstorage (客户端存储,后台不存,虽然后台压力减小,但是无法应用到大数据分析,进行精准分析、推荐)
+ cookie
+ WebSQL
+ 放入 Redis (采用)


即使关闭浏览器,下次进入,**临时购物车**的数据都在
  • 用户可以使用购物车一起结算下单
  • 用户给购物车添加商品
  • 用户可以查询自己的购物车
  • 用户可以在购物车中修改购买商品的数量
  • 用户可以在购物车中删除商品
  • 选中 / 不选中商品
  • 在购物车中展示商品优惠信息

数据结构

  • 购物项

image-20210202105052199

  • 每一个购物项信息,都对应一个对象,基本字段包括
1
2
3
4
5
6
7
8
9
10
arduino复制代码{
skuId:123321, //商品id
check:true, //是否被选中
title:"Apple iphone...",
defaultImage: "xxx",
price:5998.00
count:1,
totalPrice:13678.00
skuSaleVo:{} //销售属性的值
}

另外购物车中不止一条数据,因此最终会是对象的数组,即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
arduino复制代码[
{
skuId:123321, //商品id
check:true, //是否被选中
title:"Apple iphone...",
defaultImage: "xxx",
price:5998.00
count:1,
totalPrice:13678.00
skuSaleVo:{} //销售属性的值
},
{
skuId:111222, //商品id
check:true, //是否被选中
title:"Apple iphone...",
defaultImage: "xxx",
price:3398.00
count:10,
totalPrice:44678.00
skuSaleVo:{} //销售属性的值
}
]
  • Redis 有五种不同数据结构,这里选择哪一种比较合适呢?Map<String,List>>
+ 首先不同用户应该有独立的购物车,因此购物车应该以用户id作为 key 来存储,Value是用户的所有购物车信息。这样看来基本的 k-v 结构就可以了
+ 但是,我们对购物车中的商品进行增、删、改操作,基本都需要根据商品id进行判断,为了后期方便处理,我们购物车也应该是 k-v 结构,key是商品id,value 才是这个商品的购物车信息
+ 总上所述,我们购物车结构是一个双层 Map:Map<String,Map<String,CartItemInfo>>
    - 第一层 Map,key 是用户 id
    - 第二层 Map,key是购物车中商品 id,值是购物项数据![image-20210202111245926](https://gitee.com/songjianzaina/juejin_p10/raw/master/img/b0244f51d70bb11f4abfe7373a64e7d8b0ca51332fc3f4c7508d2a0285b8a7ab)

流程

编写购物车和购物项的VO

需要计算的属性,必须重写它的get方法,保证每次获取属性都会进行计算

image-20210202113359902

1
2
3
4
5
6
7
sql复制代码浏览器有一个cookie;user-key:标识用户身份,一个月后过期;
如果第一次使用jd的购物车功能,都会给一个临时的用户身份
浏览器保存,以后的每次访问都会带上
登录:session有
未登录:按照cookie里带来的user-key来做
第一次:如果没有临时用户,帮忙创建一个临时用户
--> 创建一个拦截器
拦截器
1
复制代码在执行目标方法之前,判断用户的登录状态。并封装传递给controller目标请求

添加配置让拦截器生效

image-20210202143646638

ThreadLocal:同一个线程共享数据(*)

核心原理:是一个Map,以线程为key,同一个线程可以取到相同的数据,不同线程无法共享

image-20210202140832369

效果演示

  • 未登录 – 临时用户
    • 查看购物车
    • 加入购物车
    • redis 中key为 user-key
  • 已经登录 – 登录用户
    • 会将临时用户购物车合并进用户购物车
    • 购物项增减、勾选、删除

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Focus 聚焦社区 v020,GoFrame 开源社区

发表于 2021-02-07

此版本对核心功能进行了完善,也对大家提的一些建议及bug进行了修复;也诚请各位小伙伴们关注GoFrame项目发展,为生态添砖加瓦,加油助力!

更新功能:

  1. 文章回复功能完善:加入了点赞,踩,删除,回复,采纳;
  2. 编辑资料加入了我的消息模块:此版本主要加入了回复消息统一展示;
  3. 加入了简单的管理员功能,可以对垃圾文章和垃圾回复进行清理;
  4. 加入Docker Compose一键部署,极大方便linux环境部署;
  5. 对网友的一些建议及上一版本bug进行了修复;

零、关于Focus

Focus聚焦社区是GoFrame社区项目,采用了简洁强大的GoFrame作为后端WEB框架, 由于前台系统需要SEO因此使用了GF自带template模板引擎,数据库用MySQL,前端使用jQuery/bootstrap框架。

一、源码地址

  • github:github.com/gogf/focus
  • gitee:gitee.com/johng/focus

二、演示地址

请不要恶意发送垃圾数据哦~ 地址:focus.goframe.org/

三、安装部署

安装

1、下载项目源码: git clone https://github.com/gogf/focus

2、创建focus数据库:

1
sql复制代码CREATE DATABASE `focus` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_bin';

3、运行document下focus.sql初始化数据库SQL。

4、复制config下config.example.toml为config.toml配置文件,其他配置根据需要自行调整。

1
2
3
4
ini复制代码# 数据库配置
[database]
link = "mysql:root:123456@tcp(127.0.0.1:3306)/focus"
debug = true

5、运行:go run main.go

6、访问 http://127.0.0.1:8199 即可看到登录页面,默认账号/密码:goframe / 123456

一键部署

一键部署仅支持linux,并且需要先完成docker-compose安装;

1
2
bash复制代码git clone https://github.com/gogf/focus && \\
cd focus && chmod 755 focus.sh && ./focus.sh build

更多命令,请执行./focus help

传统部署

1、安装gf-cli工具链,详情查看:开发工具

2、打包命令:执行 gf build 即可,具体配置请参考:build 交叉编译

3、打包结果:默认 linux/macos/windows 三个版本。

1
2
3
4
5
6
7
bash复制代码bin
├── darwin_amd64
│ └── focus
├── linux_amd64
│ └── focus
└── windows_amd64
└── focus.exe

4、将config.toml和focus放到同一级目录,启动focus即可。

发布说明:

默认配置文件配置中,会将public,template目录进行二进制打包,随着可执行文件一同发布。因此,发布时仅需要将config.toml和focus拷贝到目标服务器运行即可。此外,您也可以考虑将特定的配置文件用打包发布,那么发布时仅需要发布focus可执行文件即可。

如果您想同时打包配置文件到可执行文件中,那么您可以同时参考一下关于默认配置文件读取的修改方式:配置管理里#默认文件修改

四、项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bash复制代码├── app
│ ├── dao
│ ├── model
│ ├── shared
│ └── system
│ ├── admin
│ │ └── internal
│ └── index
│ └── internal
│ ├── api
│ ├── define
│ └── service
├── config
├── document
├── library
├── packed
├── public
├── template
├── upload
├── Dockerfile
├── go.mod
└── main.go

五、目录说明

目录/文件名称 说明 描述
app 业务逻辑层 所有的业务逻辑存放目录。
- dao 数据访问 数据库的访问操作,仅包含最基础的数据库CURD方法。
- model 数据模型 存放数据相关的实体结构定义。
- system 系统模块 内部可能包含多个子系统,不同子系统之间资源相互隔离。
- index 前端页面 子系统,前端页面。
- internal 内部模块 系统内部模块,仅供当前内部系统调用,无法在系统间共享。
- api 业务接口 系统内部接收/解析用户输入参数的入口/接口层
- define 结构定义 系统内部的输入、输出数据结构定义。
- service 逻辑封装 系统内部业务逻辑封装,实现特定的业务需求。
config 配置管理 所有的配置文件存放目录。
docker 镜像文件 Docker镜像相关依赖文件,脚本文件等等。
document 项目文档 Documentation项目文档,如: 设计文档、帮助文档等等。
library 公共库包 公共的功能封装包,往往不包含业务需求实现。
packed 打包目录 将资源文件打包的Go文件存放在这里,boot包初始化时会自动调用。
public 静态目录 仅有该目录下的文件才能对外提供静态服务访问。
template 模板文件 MVC模板文件存放的目录。
Dockerfile 镜像描述 云原生时代用于编译生成Docker镜像的描述文件。
go.mod 依赖管理 使用Go Module包管理的依赖描述文件。
main.go 入口文件 程序入口文件。

六、框架设计

由于采用了强大易用的GoFrame开发框架,可以参考框架的一些设计介绍。

  • 代码分层设计
  • 对象封装设计
  • Context: 业务流程共享变量
  • 单应用多系统设计
  • 更多:框架设计

七、项目截图

图1. 社区首页

图2. 登录页面


图3. 注册页面


图4. 发布/编辑页面

图5. 内容详情

图6. 搜索页面

图7. 个人主页

图8. 编辑资料

八、数据库表设计

九、功能模块进度

十、Focus文档

更多资料请移步:Focus聚焦社区文档

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

花一个周末,掌握 OpenFeign 核心原理|牛气冲天新年

发表于 2021-02-07

前言

现在的微服务在互联网圈子里应用已经相关广泛了,SpringCloud 是微服务领域当之无愧的 “头牌”

加上现在的一些轮子项目,新建一个全套的 SpringCloud 项目分分钟的事情,而我们要做的事情,就是不把认知停留在使用层面,所以要深入到源码中去理解 SpringCloud

为什么要选择 OpenFien? 因为它足够的 “小”,符合我们的标题:一个周末搞定

Feign 的源代码中,Java 代码才 3w 多行,放眼现在热门的开源项目,包括不限于 Dubbo、Naocs、Skywalking 中 Java 代码都要 30w 行起步

通过本篇文章,希望读者朋友可以掌握如下知识

  • 什么是 Feign
  • Feign 和 Openfeign 的区别
  • OpenFeign 的启动原理
  • OpenFeign 的工作原理
  • OpenFeign 如何负载均衡

spring-cloud-starter-openfeign version:2.2.6.RELEASE

什么是 Feign

Feign 是声明式 Web 服务客户端,它使编写 Web 服务客户端更加容易

Feign 不做任何请求处理,通过处理注解相关信息生成 Request,并对调用返回的数据进行解码,从而实现 简化 HTTP API 的开发

如果要使用 Feign,需要创建一个接口并对其添加 Feign 相关注解,另外 Feign 还支持可插拔编码器和解码器,致力于打造一个轻量级 HTTP 客户端

Feign 和 Openfeign 的区别

Feign 最早是由 Netflix 公司进行维护的,后来 Netflix 不再对其进行维护,最终 Feign 由社区进行维护,更名为 Openfeign

为了少打俩字,下文简称 Opefeign 为 Feign

并将原项目迁移至新的仓库,所以我们在 Github 上看到 Feign 的坐标如下

1
2
3
xml复制代码<groupId>io.github.openfeign</groupId>
<artifactId>parent</artifactId>
<version>...</version>

Starter Openfeign

当然了,基于 SpringCloud 团队对 Netflix 的情有独钟,你出了这么好用的轻量级 HTTP 客户端,我这老大哥不得支持一下,所以就有了基于 Feign 封装的 Starter

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

Spring Cloud 添加了对 Spring MVC 注解的支持,并支持使用 Spring Web 中默认使用的相同 HttpMessageConverters

另外,Spring Cloud 老大哥同时集成了 Ribbon 和 Eureka 以及 Spring Cloud LoadBalancer,以在使用 Feign 时提供负载均衡的 HTTP 客户端

针对于注册中心的支持,包含但不限于 Eureka,比如 Consul、Naocs 等注册中心均支持

在我们 SpringCloud 项目开发过程中,使用的大多都是这个 Starter Feign

环境准备

为了方便大家理解,这里写出对应的生产方、消费方 Demo 代码,以及使用的注册中心

注册中心使用的 Nacos,生产、消费方代码都比较简单。另外为了阅读体验感,文章原则是少放源码,更多的是给大家梳理核心逻辑

生产者服务

添加 Nacos 服务注册发现注解以及发布出 HTTP 接口服务

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@EnableDiscoveryClient @SpringBootApplication
public class NacosProduceApplication {
public static void main(String[] args) {
SpringApplication.run(NacosProduceApplication.class, args);
}
@RestController
static class TestController {
@GetMapping("/hello")
public String hello(@RequestParam("name") String name) {
return "hello " + name;
}
}
}

消费者服务

定义 FeignClient 消费服务接口

1
2
3
4
5
java复制代码@FeignClient(value = "nacos-produce")
public interface DemoFeignClient {
@RequestMapping(value = "/hello", method = RequestMethod.GET)
String sayHello(@RequestParam("name") String name);
}

因为生产者使用 Nacos,所以消费者除了开启 Feign 注解,同时也要开启 Naocs 服务注册发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@RestController @EnableFeignClients
@EnableDiscoveryClient @SpringBootApplication
public class NacosConsumeApplication {
public static void main(String[] args) {
SpringApplication.run(NacosConsumeApplication.class, args);
}

@Autowired private DemoFeignClient demoFeignClient;

@GetMapping("/test")
public String test() {
String result = demoFeignClient.sayHello("公号-源码兴趣圈");
return result;
}
}

Feign 的启动原理

我们在 SpringCloud 的使用过程中,如果想要启动某个组件,一般都是 @Enable… 这种方式注入,Feign 也不例外,我们需要在类上标记此注解 @EnableFeignClients

1
2
3
4
5
6
7
java复制代码@EnableFeignClients
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

继续深入看一下注解内部都做了什么。注解内部的方法就不说明了,不加会有默认的配置,感兴趣可以跟下源码

1
2
3
4
5
java复制代码@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {...}

前三个注解看着平平无奇,重点在第四个 @Import 上,一般使用此注解都是想要动态注册 Spring Bean 的

注入@Import

通过名字也可以大致猜出来,这是 Feign 注册 Bean 使用的,使用到了 Spring 相关的接口,一起看下起了什么作用

ResourceLoaderAware、EnvironmentAware 为 FeignClientsRegistrar 中两个属性 resourceLoader、environment 赋值,对 Spring 了解的小伙伴理解问题不大

ImportBeanDefinitionRegistrar 负责动态注入 IOC Bean,分别注入 Feign 配置类、FeignClient Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 资源加载器,可以加载 classpath 下的所有文件
private ResourceLoader resourceLoader;
// 上下文,可通过该环境获取当前应用配置属性等
private Environment environment;

@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}

@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}

@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
// 注册 @EnableFeignClients 提供的自定义配置类中的相关 Bean 实例
registerDefaultConfiguration(metadata,registry);
// 扫描 packge,注册被 @FeignClient 修饰的接口类为 IOC Bean
registerFeignClients(metadata, registry);
}

添加全局配置

registerDefaultConfiguration 方法流程如下

  1. 获取 @EnableFeignClients 注解上的属性以及对应 Value
  2. 生成 FeignClientSpecification(存储 Feign 中的配置类) 对应的构造器 BeanDefinitionBuilder
  3. FeignClientSpecification Bean 名称为 default. + @EnableFeignClients 修饰类全限定名称 + FeignClientSpecification
  4. @EnableFeignClients defaultConfiguration 默认为 {},如果没有相关配置,默认使用 FeignClientsConfiguration 并结合 name 填充到 FeignClientSpecification,最终注册为 IOC Bean

注册 FeignClient 接口

将重点放在 registerFeignClients 上,该方法主要就是将修饰了 @FeignClient 的接口注册为 IOC Bean

  1. 扫描 @EnableFeignClients 注解,如果有 clients,则加载指定接口,为空则根据 scanner 规则扫描出修饰了 @FeignClient 的接口
  2. 获取 @FeignClient 上对应的属性,根据 configuration 属性去创建接口级的 FeignClientSpecification 配置类 IOC Bean
  3. 将 @FeignClient 的属性设置到 FeignClientFactoryBean 对象上,并注册 IOC Bean

@FengnClient 修饰的接口实际上使用了 Spring 的代理工厂生成代理类,所以这里会把修饰了 @FeignClient 接口的 BeanDefinition 设置为 FeignClientFactoryBean 类型,而 FeignClientFactoryBean 继承自 FactoryBean

也就是说,当我们定义 @FeignClient 修饰接口时,注册到 IOC 容器中 Bean 类型变成了 FeignClientFactoryBean

在 Spring 中,FactoryBean 是一个工厂 Bean,用来创建代理 Bean。工厂 Bean 是一种特殊的 Bean,对于需要获取 Bean 的消费者而言,它是不知道 Bean 是普通 Bean 或是工厂 Bean 的。工厂 Bean 返回的实例不是工厂 Bean 本身,而是会返回执行了工厂 Bean 中 FactoryBean#getObject 逻辑的实例

Feign 的工作原理

说 Feign 的工作原理,核心点围绕在被 @FeignClient 修饰的接口,如何发送及接收 HTTP 网络请求

上面说到 @FeignClient 修饰的接口最终填充到 IOC 容器的类型是 FeignClientFactoryBean,先来看下它是什么

FactoryBean 接口特征

这里说一下 FeignClientFactoryBean 都有哪些特征

  1. 它会在类初始化时执行一段逻辑,依据 Spring InitializingBean 接口
  2. 如果它被别的类 @Autowired 进行注入,返回的不是它本身,而是 FactoryBean#getObject 返回的类,依据 Spring FactoryBean 接口
  3. 它能够获取 Spring 上下文对象,依据 Spring ApplicationContextAware 接口

先来看它的初始化逻辑都执行了什么

1
2
3
4
5
java复制代码@Override
public void afterPropertiesSet() {
Assert.hasText(contextId, "Context id must be set");
Assert.hasText(name, "Name must be set");
}

没有特别的操作,只是使用断言工具类判断两个字段不为空。ApplicationContextAware 也没什么说的,获取上下文对象赋值到对象的局部变量里,重点以及关键就是 FactoryBean#getObject 方法

1
2
3
4
java复制代码@Override
public Object getObject() throws Exception {
return getTarget();
}

getTarget 源码方法还是挺长的,这里采用分段的形式展示

1
2
3
4
5
6
7
java复制代码<T> T getTarget() {
// 从 IOC 容器获取 FeignContext
FeignContext context = applicationContext.getBean(FeignContext.class);
// 通过 context 创建 Feign 构造器
Feign.Builder builder = feign(context);
...
}

这里提出一个疑问?FeignContext 什么时候、在哪里被注入到 Spring 容器里的?

看到图片小伙伴就明了了,用了 SpringBoot 怎么会不使用自动装配的功能呢,FeignContext 就是在 FeignAutoConfiguration 中被成功创建

初始化父子容器

feign 方法里日志工厂、编码、解码等类均是通过 get(…) 方法得到

这里涉及到 Spring 父子容器的概念,默认子容器 Map 为空,获取不到服务名对应 Context 则新建

从下图中看到,注册了一个 FeignClientsConfiguration 类型的 Bean,我们上述方法 feign 中的获取的编码、解码器等组件都是从此类中获取默认

默认注册如下,FeignClientsConfiguration 是由创建 FeignContext 调用父类 Super 构造方法传入的

关于父子类容器对应关系,以及提供 @FeignClient 服务对应子容器的关系(每一个服务对应一个子容器实例)

回到 getInstance 方法,子容器此时已加载对应 Bean,直接通过 getBean 获取 FeignLoggerFactory

如法炮制,Feign.Builder、Encoder、Decoder、Contract 都可以通过子容器获取对应 Bean

configureFeign 方法主要进行一些配置赋值,比如超时、重试、404 配置等,就不再细说赋值代码了

到这里有必要总结一下创建 Spring 代理工厂的前半场代码

  1. 注入@FeignClient 服务时,其实注入的是 FactoryBean#getObject 返回代理工厂对象
  2. 通过 IOC 容器获取 FeignContext 上下文
  3. 创建 Feign.Builder 对象时会创建 Feign 服务对应的子容器
  4. 从子容器中获取日志工厂、编码器、解码器等 Bean
  5. 为 Feign.Builder 设置配置,比如超时时间、日志级别等属性,每一个服务都可以个性化设置

动态代理生成

继续嗑,上面都是开胃菜,接下来是最最最重要的地方了,小板凳坐板正了..

因为我们在 @FeignClient 注解是使用 name 而不是 url,所以会执行负载均衡策略的分支

Client: Feign 发送请求以及接收响应等都是由 Client 完成,该类默认 Client.Default,另外支持 HttpClient、OkHttp 等客户端

代码中的 Client、Targeter 在自动装配时注册,配合上文中的父子容器理论,这两个 Bean 在父容器中存在

因为我们并没有对 Hystix 进行设置,所以走入此分支

创建反射类 ReflectiveFeign,然后执行创建实例类

newInstance 方法对 @FeignClient 修饰的接口中 SpringMvc 等配置进行解析转换,对接口类中的方法进行归类,生成动态代理类

可以看出 Feign 创建动态代理类的方式和 Mybatis Mapper 处理方式是一致的,因为两者都没有实现类

根据 newInstance 方法按照行为大致划分,共做了四件事

  1. 处理 @FeignCLient 注解(SpringMvc 注解等)封装为 MethodHandler 包装类
  2. 遍历接口中所有方法,过滤 Object 方法,并将默认方法以及 FeignClient 方法分类
  3. 创建动态代理对应的 InvocationHandler 并创建 Proxy 实例
  4. 接口内 default 方法 绑定动态代理类

MethodHandler 将方法参数、方法返回值、参数集合、请求类型、请求路径进行解析存储

到这里我们也就可以 Feign 的工作方式了。前面那么多封装铺垫,封装个性化配置等等,最终确定收尾的是创建动态代理类

也就是说在我们调用 @FeignClient 接口时,会被 FeignInvocationHandler#invoke 拦截,并在动态代理方法中执行下述逻辑

  1. 接口注解信息封装为 HTTP Request
  2. 通过 Ribbon 获取服务列表,并对服务列表进行负载均衡调用(服务名转换为 ip+port)
  3. 请求调用后,将返回的数据封装为 HTTP Response,继而转换为接口中的返回类型

既然已经明白了调用流程,那就正儿八经的试一哈,试过才知有没有…

RequestTemplate:构建 Request 模版类

Options:存放连接、超时时间等配置类

Retryer:失败重试策略类

重试这一块逻辑看了很多遍,但是怎么看,一个 continue 关键字放到 while 的最后面都有点多余…

执行远端调用逻辑中使用到了 Rxjava (响应式编程),可以看到通过底层获取 server 后将服务名称转变为 ip+port 的方式

这种响应式编程的方式在 SpringCloud 中很常见,Hystix 源码底层也有使用

网络调用默认使用 HttpURLConnection,可以配置使用 HttpClient 或者 OkHttp

调用远端服务后,再将返回值解析正常返回,到这里一个完成的 Feign 调用链就聊明白了

图片参考@疯狂创客圈

Feign 如何负载均衡

一般而言,我们生产者注册多个服务,消费者调用时需要使用负载均衡从中 选取一个健康并且可用的生产者服务

因为 Feign 内部集成 Ribbon,所以也支持此特性,一起看下它是怎么做的

我们在 Nacos 上注册了两个服务,端口号 8080、8081。在获取负载均衡器时就可以获取服务集合

然后通过 chooseServer 方法选择一个健康实例返回,后面会新出一篇文章对 Ribbon 的负载均衡详细说明

通过返回的 Server 替换 URL 中的服务名,最后使用网络调用服务进行远端调用,完美的一匹

结语

文章从最基础的知识介绍什么是 Feign?继而从源码的角度上说明 Feign 的底层原理,总结如下:

  1. 通过 @EnableFeignCleints 注解启动 Feign Starter 组件
  2. Feign Starter 在项目启动过程中注册全局配置,扫描包下所有的 @FeignClient 接口类,并进行注册 IOC 容器
  3. @FeignClient 接口类被注入时,通过 FactoryBean#getObject 返回动态代理类
  4. 接口被调用时被动态代理类逻辑拦截,将 @FeignClient 请求信息通过编码器生成 Request
  5. 交由 Ribbon 进行负载均衡,挑选出一个健康的 Server 实例
  6. 继而通过 Client 携带 Request 调用远端服务返回请求响应
  7. 通过解码器生成 Response 返回客户端,将信息流解析成为接口返回数据

虽然 Feign 体量相对小,但是想要一篇文章完全描述,也不太现实,所以这里都是挑一些核心点讲解,没有写到的地方还请见谅

另外,由于作者水平有限, 欢迎大家能够反馈指正文章中错误不正确的地方, 感谢

微信搜索【源码兴趣圈】,关注公众号后回复 123 领取内容涵盖 GO、Netty、Seata、SpringCloud Alibaba、开发规范、面试宝典、数据结构等学习资料!

参考文章:

  • blog.csdn.net/forezp/arti…
  • www.cnblogs.com/yangxiaohui…
  • www.cnblogs.com/crazymakerc…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…721722723…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%