开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

杂谈 聊一聊NIO

发表于 2021-07-25

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

NIO 是老生常谈了 , 由于最近准备开 Netty 的新坑了 , 不再局限于使用 , 初期先把前置的知识点回顾一下

二 . NIO 的概念

2.1 NIO 是什么 ?

NIO 可以从2个维度说 ,它既可以是一种设计模型 ,也可以说是 Java 中的一个包 (NIO 包是在 java1.4中引入的).

IO 是计算机与其他部分之间的接口 , IO 可以分为多种 : BIO ,NIO , AIO

NIO 模型

2.2 NIO , AIO , BIO 的区别

BIO : 基于流( Stream ) 的阻塞 IO , 也是我们最常见的 IO

NIO : 通过 Selector 实现的基于缓冲区 (Buffer) 的非阻塞式 IO

AIO : 在 NIO 基础上引入了异步的概念

IO 流向和特点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码
BIO : 基于 Stream , 单向流转
NIO : 基于 Channel , 双向流通

// PS : Stream 流的特点
- 每次处理一个字节的数据。输入流产生一个字节的数据,输出流消耗一个字节的数据
- 为流式数据创建过滤器非常容易 , 可以方便的创建过滤器链
- 流程更加简洁 , 容易处理复杂的逻辑 , 相对教慢

// PS : NIO的特点
- 事件驱动模型、单线程处理多任务、非阻塞 I/O 、IO 多路复用
- 每个操作在一个步骤中生成或消耗一个数据块
- 基于 block(Buffer) 的传输比基于流的传输更高效
- IO 函数 zero-copy
- 流程相对更复杂

PS : 这里的单线程处理是指只会在一个线程里面通过 selector 来处理逻辑 , 然后由 select 指定具体的 Handler

线程的模式

1
2
3
java复制代码BIO :一个连接一个线程,客户端的连接请求时服务器端就需要启动一个线程进行处理
NIO :一个请求一个线程,客户端的连接请求都会注册到多路复用器上 , 多路复用轮询到该请求时创建线程
AIO : 一个请求一个线程, 在创建线程时 , 会创建异步线程

2.3 NIO 的组成部分

Channel s and Buffer s , 是 NIO 的中心对象,几乎用于每一个 I/O 操作 .

Channels : 频道

  • Channel 是一个对象,可以从中读取数据并向其写入数据
  • 类似于 Stream , 去任何地方(或来自任何地方)的数据都必须通过 Channel 对象
  • 数据可以从 channel 写入buffers ,也可以从 buffers 读取到 channels , channels 的 读写均为单向
  • 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入
  • 通道可以异步地读写

Buffers : 缓冲区

  • Buffer 是一个 Java 对象
  • 缓冲区本质上是一个数组 , 通常是字节数组 , 也可以是其他数组
  • 缓冲区提供对数据的结构化访问,并且跟踪系统的读/写进程。
  • 发送到通道的所有数据必须首先放置在缓冲区中; 同样,从通道读取的任何数据都被读入缓冲区

Selectors :

  • Selector允许单线程处理多个 Channel , 当打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便 .
  • 要使用Selector,得向Selector注册Channel,然后调用它的select()方法。
  • 这个方法会一直阻塞到某个注册的通道有事件就绪。
  • 一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收

2.4 Channel 的主要实现

Channel的类型 :

  • FileChannel — 从文件中读写数据
  • DatagramChannel — 能通过UDP读写网络中的数据
  • SocketChannel — 能通过TCP读写网络中的数据
  • ServerSocketChannel — 可以监听新进来的TCP连接,像Web服务器那样

NIO-FileChannel.png

2.5 Buffer 的主要实现

Buffer 的主要实现 : ByteBuffer / CharBuffer / DoubleBuffer / FloatBuffer / IntBuffer / LongBuffer / ShortBuffer

NIO-Buffer.png

每个 Buffer 类都是 Buffer 接口的一个实例。除了 ByteBuffer 之外(基础对象),每一个都具有完全相同的操作,只是所处理的数据类型不同。

Buffer 方法简述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码- array() : 返回返回该缓冲区的数组
- arrayOffset() : 返回该缓冲区第一个元素在该缓冲区的支持数组中的偏移量
- capacity() : 返回该缓冲区的容量

- clear() : 清除这个缓冲区
- clear : position将被设回0,limit被设置成 capacity的值

- flip() : 翻转这个缓冲区
- 将Buffer从写模式切换到读模式。调用flip()方法会将position设回0,并将limit设置成之前position的值

- hasArray() : 指示该缓冲区是否由可访问数组支持
- hasRemaining() : 告诉当前位置和极限之间是否有任何元素
- isReadOnly() : 只读
- limit() : 返回该缓冲区的限制
- limit(int newLimit)
- mark() : 将该缓冲区的标记设置在其位置 , 与 reset() 配合使用
- position() : 返回缓冲区的位置
- position(int newPosition)
- remaining() : 返回当前位置和限制之间的元素数
- reset() : 将该缓冲区的位置重置为先前标记的位置

- rewind() : 倒带这个缓冲区
- Buffer.rewind()将position设回0,所以你可以重读Buffer中的所有数据。
- limit保持不变,仍然表示能从Buffer中读取多少个元素


// 补充 : mark()与reset()方法
- 通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。
- 通过调用Buffer.reset()方法恢复到这个position

要点 :

当向buffer写入数据时,buffer会记录下写了多少数据。

一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。

在读模式下,可以读取之前写入到buffer的所有数据


读取完成后 需要清空缓存区 , 任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面 , 此时可以使用如下方法

clear() : 清空整个缓存区

compact() : 清除已经读过的数据

capacity,position 和 limit 三属性

- capacity (总容量)

作为一个内存块,Buffer有一个固定的大小值,也叫“capacity”.你只能往里写capacity个byte、long,char等类型

- position(指针当前位置)

当你写数据到Buffer中时,position表示当前的位置。初始的position值为0

当将Buffer从写模式切换到读模式,position会被重置为0
当从Buffer的position处读取数据时,position向前移动到下一个可读的位置

- limit (读/写边界位置)

在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity
当切换Buffer到读模式时, limit表示你最多能读到多少数据

- mark

用于记录当前 position 的前一个位置或者默认是 0在实际操作数据时它们有如下关系图

参考自 @ 一文让你彻底理解 Java NIO 核心组件 - SegmentFault 思否
image.png

在对Buffer进行读/写的过程中,position会往后移动,而 limit 就是 position 移动的边界。

  • 在对Buffer进行写入操作时,limit应当设置为capacity的大小
  • 对Buffer进行读取操作时,limit应当设置为数据的实际结束位置

2.6 Select 的主要实现

三 . Java NIO 使用

3.1 一个 NIO 的简单案例

File 读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码
//**使用 buffer 步骤**
1. 写入数据到Buffer
2. 调用flip()方法
3. 从Buffer中读取数据
4. 调用clear()方法或者compact()方法


public void templateRead() throws Exception {

logger.info("------> Step 1 : 开启基本案例 , 从 Stream 转换为 <-------");
FileInputStream fin = new FileInputStream("src/main/resources/data/data.txt");
FileChannel fc = fin.getChannel();


logger.info("------> Step 2 : 构建一个缓冲区 <-------");
ByteBuffer buffer = ByteBuffer.allocate(1024);

logger.info("------> Step 3 : 从缓冲区读取数据 <-------");
int bytesRead = fc.read(buffer);
System.out.println("buffer = " + buffer);

logger.info("------> Step 4: 单个字符读取 <-------");
while (bytesRead != -1) {
buffer.flip(); //缓冲区准备读取
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());// 每次读取一个字节
}
buffer.clear(); //准备缓冲区写入

// 可以通过屏蔽该语句看效果
bytesRead = fc.read(buffer);
}

fin.close();

}

File 写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码 public void templateWrite() throws Exception {

logger.info("------> Step 1 : 开启基本案例 , 从 Stream 转换为 <-------");
RandomAccessFile randomAccessFile = new RandomAccessFile("src/main/resources/data/data2.txt", "rw");

FileChannel fc = randomAccessFile.getChannel();


logger.info("------> Step 2 : 构建一个缓冲区 <-------");
ByteBuffer buffer = ByteBuffer.allocate(1024);
byte[] message = new String("Hello World !").getBytes();
for (int i = 0; i < message.length; ++i) {
buffer.put(message[i]);
}
buffer.flip();

logger.info("------> Step 3 : 往缓冲区写数据 <-------");
fc.write(buffer);

}

3.2 scatter/gather 概念

scatter/gather用于描述从Channel 中读取或者写入到Channel的操作

  • 分散(scatter)从Channel中读取是指在读操作时将读取的数据写入多个buffer中
    • Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中
  • 聚集(gather)写入Channel是指在写操作时将多个buffer的数据写入同一个Channelsc
    • Channel 将多个Buffer中的数据“聚集(gather)”后发送到Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// ------------------- Scattering Reads
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);

// Scattering Reads在移动下一个buffer前,必须填满当前的buffer
// 如果存在消息头和消息体,消息头必须完成填充

// ------------------- Scattering Reads
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);

3.2 数据传输

在Java NIO中,如果两个通道中有一个是FileChannel,那你可以直接将数据从一个channel传输到另外一个channel

3.2.1 transferFrom 将数据从源通道传输到FileChannel中

1
2
3
4
5
6
java复制代码RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();

toChannel.transferFrom(position, count, fromChannel);

// 输入参数position表示从position处开始向目标文件写入数据,count表示最多传输的字节

3.2.2 transferTo() 将数据从FileChannel传输到其他的channel中

1
2
3
4
java复制代码RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();

fromChannel.transferTo(position, count, toChannel);

3.3 Selector

Selector(选择器) 是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。

这样,一个单独的线程可以管理多个channel,从而管理多个网络连接

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道

3.3.1 Selector 的 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// 创建 selector
Selector selector = Selector.open();

// 向Selector注册通道
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_READ);
// 此处是通过位运算达到配置类型的定义 ,在多线程中也能频繁看到
// -- Connect -- SelectionKey.OP_CONNECT --- 0100 -- 连接成功
// -- Accept -- SelectionKey.OP_ACCEPT --- 1000 -- 有可以接受的连接
// -- Read -- SelectionKey.OP_READ -- 0001 -- 有数据可读
// -- Write -- SelectionKey.OP_WRITE --- 0010 -- 可以写入数据了

// ---- SelectionKey 是 注册后返回的 key
// 其中包含属性 :interest集合 、ready集合、Channel、Selector、附加的对象(可选)

3.3.2 ServerSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码/**
* 构建一个简单的 SockChannel
* @throws Exception
*/
private void createSocketServer() throws Exception {

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));

while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}

}

/**
* 常用方法
* @throws Exception
*/
private void other() throws Exception {
// --> 打开 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();


// --> 关闭 ServerSocketChannel
serverSocketChannel.close();

// --> 监听新进来的连接
SocketChannel socketChannel = serverSocketChannel.accept();
}

/**
* 非阻塞模式
* @throws Exception
*/
private void async() throws Exception {
// ServerSocketChannel可以
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
// 设置成非阻塞模式
serverSocketChannel.configureBlocking(false);
while (true) {
// 此时accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
//do something with socketChannel...
}
}
}

3.3.3 SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道

创建 SocketChannel 2 种方式 :

  • 打开一个SocketChannel并连接到互联网上的某台服务器
  • 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码// --> 打开 SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("jenkov.com", 80));

// --> 关闭 SocketChannel
socketChannel.close();

// --> 从 SocketChannel 读取数据
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

// --> 写入 SocketChannel
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
channel.write(buf);
}

// --> 非阻塞模式 , 非阻塞情况下 , write 会提前返回 ,需要循环调用
socketChannel.configureBlocking(false);

// --> write()
// --> read()

3.3.4 FileChannel

通过使用一个InputStream、OutputStream或RandomAccessFile来获取一个FileChannel实例

从FileChannel读取数据 :

  1. 首先,准备一个Buffer
  2. 然后,调用FileChannel.read()方法 ,该方法将数据从FileChannel读取到Buffer中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码RandomAccessFile aFile = new RandomAccessFile("data/local-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();


// 返回-1,表示到了文件末尾
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

// 向FileChannel写数据
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
channel.write(buf);
}

// 关闭FileChannel
channel.close();

// position 获取filechannel 的当前位置 ,末尾为 -1
long pos = channel.position();
channel.position(pos +123);

// FileChannel的size方法 , 返回该实例所关联文件的大小
long fileSize = channel.size();

// 可以使用FileChannel.truncate()方法截取一个文件
channel.truncate(1024);

// FileChannel.force()方法将通道里尚未写入磁盘的数据强制写到磁盘上
// force()方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上
channel.force(true);

3.3.5 一个 Selector 的完整使用

该段代码参考至 Java NIO原理 图文分析及代码实现 , 写的很清楚 , 代码复制下来可以直接用 , 建议试试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
java复制代码public void startLogin() {

logger.info("------> 进入服务端逻辑 <-------");

try {
initServer(8000);
listen();
} catch (Exception e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}

}


//通道管理器
private Selector selector;

/**
* 获得一个ServerSocket通道,并对该通道做一些初始化的工作
*
* @param port 绑定的端口号
* @throws IOException
*/
public void initServer(int port) throws IOException {

// Step 1 : 获得一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置通道为非阻塞
serverChannel.configureBlocking(false);

// Step 2 : 为 ServerSocket 绑定 port端口
serverChannel.socket().bind(new InetSocketAddress(port));

// Step 3 : 此处开始使用 , 获得一个通道管理器
this.selector = Selector.open();

//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
//当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}

/**
* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
*
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void listen() throws IOException {
System.out.println("服务端启动成功!");
// 轮询访问selector
while (true) {
//当注册的事件到达时,方法返回;否则,该方法会一直阻塞
selector.select();
// 获得selector中选中的项的迭代器,选中的项为注册的事件
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已选的key,以防重复处理
ite.remove();
// 客户端请求连接事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 获得和客户端连接的通道
SocketChannel channel = server.accept();
// 设置成非阻塞
channel.configureBlocking(false);

//在这里可以给客户端发送信息哦
channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));
//在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
channel.register(this.selector, SelectionKey.OP_READ);

// 获得了可读的事件
} else if (key.isReadable()) {
read(key);
}

}

}
}

/**
* 处理读取客户端发来的信息 的事件
*
* @param key
* @throws IOException
*/
public void read(SelectionKey key) throws IOException {
// 服务器可读取消息:得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("服务端收到信息:" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);// 将消息回送给客户端
}

附上客户端逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
java复制代码public void startLogin() {

logger.info("------> 进入客户端逻辑 <-------");

try {
initClient("localhost", 8000);
listen();
} catch (Exception e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}

}

//通道管理器
private Selector selector;

/**
* 获得一个Socket通道,并对该通道做一些初始化的工作
*
* @param ip 连接的服务器的ip
* @param port 连接的服务器的端口号
* @throws IOException
*/
public void initClient(String ip, int port) throws IOException {
// 获得一个Socket通道
SocketChannel channel = SocketChannel.open();
// 设置通道为非阻塞
channel.configureBlocking(false);
// 获得一个通道管理器
this.selector = Selector.open();

// 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
//用channel.finishConnect();才能完成连接
channel.connect(new InetSocketAddress(ip, port));
//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
channel.register(selector, SelectionKey.OP_CONNECT);
}

/**
* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
*
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void listen() throws IOException {
// 轮询访问selector
while (true) {
selector.select();
// 获得selector中选中的项的迭代器
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已选的key,以防重复处理
ite.remove();
// 连接事件发生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key
.channel();
// 如果正在连接,则完成连接
if (channel.isConnectionPending()) {
channel.finishConnect();

}
// 设置成非阻塞
channel.configureBlocking(false);

//在这里可以给服务端发送信息哦
channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));
//在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
channel.register(this.selector, SelectionKey.OP_READ);

// 获得了可读的事件
} else if (key.isReadable()) {
read(key);
}

}

}
}

/**
* 处理读取服务端发来的信息 的事件
*
* @param key
* @throws IOException
*/
public void read(SelectionKey key) throws IOException {
//和服务端的read方法一样
}

3.3.6 AIO Demo

参考 JAVA中BIO、NIO、AIO的分析理解-阿里云开发者社区 (aliyun.com) , 这个文档里面把三种类型都通过 Demo 的形式展现 , 非常清晰

截取几段 AIO 核心代码 :

1
2
3
4
5
6
7
8
java复制代码public void init() {
// 创建处理线程池
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
// 创建服务channel
serverSocketChannel = AsynchronousServerSocketChannel.open(group);
// 丙丁端口
serverSocketChannel.bind(new InetSocketAddress(port));
}

五 . NIO 性能分析

TODO : 性能分析当然是要做的 , 后续补上 , 见谅

六 . NIO 的其他用法

6.1 Java NIO Pipe 用法

Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码// > 开启管道
Pipe pipe = Pipe.open();

// > 向管道写数据
Pipe.SinkChannel sinkChannel = pipe.sink();

// > 写入数据
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
sinkChannel.write(buf);
}

// > 读取数据
// 1 访问 source通道
Pipe.SourceChannel sourceChannel = pipe.source();
// 2 调用source通道的read()方法
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = sourceChannel.read(buf);

七 .深入学习

@ Getting started with new I/O (NIO) – IBM Developer

@ Java NIO - Quick Guide - Tutorialspoint

总结 :

之前一直对这些概念没有系统的了解 ,这里算是补上了

整个学习的过程中陆陆续续发现了很多优秀的博客 ,从中也学习到了很多东西 , 这里全部放在参考文档中了

另外 , 学习 Java NIO 只是为了了解概念 ,Netty 的 NIO 中大量代码是自己原生定制的 , 解决了很多原本Java NIO 的问题

参考文档 :

Getting started with new I/O (NIO) – IBM Developer

Java NIO 教程(七) FileChannel - 简书 (jianshu.com)

Java NIO原理 图文分析及代码实现 - 逸情公子 - ITeye博客

JAVA中BIO、NIO、AIO的分析理解-阿里云开发者社区 (aliyun.com)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Springboot日志TraceId实现

发表于 2021-07-25

描述

1
2
bash复制代码通常我们出现生产业务问题时,需要找到关键日志信息,再把该日志对应的请求的所有日志捞出,找到问题原因;
请求的关联id就是TraceId,如下都是基于MDC对TraceId的实现。

实现

  1. 定义拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码package com.satan.common.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.MDC;

import java.util.UUID;

@Slf4j
public class TraceUtils {

private static final String TRACE_ID = "traceId";

public static void createTraceId() {
String traceId = MDC.get(TRACE_ID);
if (StringUtils.isBlank(traceId)) {
traceId = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
log.debug("create traceId :{}", traceId);
MDC.put(TRACE_ID, traceId);
}
}

public static void destroyTraceId() {
MDC.remove(TRACE_ID);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码package com.satan.common.interceptor;

import com.satan.common.utils.TraceUtils;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class TraceIdInterceptor implements HandlerInterceptor {

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
TraceUtils.createTraceId();
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
TraceUtils.destroyTraceId();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码package com.satan.product.config;

import com.satan.common.interceptor.TraceIdInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new TraceIdInterceptor()).addPathPatterns("/**");
}
}
  1. logback配置traceId

1
2
3
4
5
6
7
8
9
10
11
12
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d [%thread][%X{traceId}] %-5p [%c] [%F:%L] - %msg%n</pattern>
</encoder>
</appender>
<logger name="chapters.configuration" level="INFO"/>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
  1. 效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码2021-07-25 22:56:10,000 [com.alibaba.nacos.client.naming.updater][] INFO  [com.alibaba.nacos.client.naming] [HostReactor.java:267] - current ips:(1) service: DEFAULT_GROUP@@life-customer@@DEFAULT -> [{"instanceId":"******#8071#DEFAULT#DEFAULT_GROUP@@life-customer","ip":"******","port":8071,"weight":1.0,"healthy":true,"enabled":true,"ephemeral":true,"clusterName":"DEFAULT","serviceName":"DEFAULT_GROUP@@life-customer","metadata":{"preserved.register.source":"SPRING_CLOUD"},"ipDeleteTimeout":30000,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000}]
2021-07-25 22:56:33,226 [http-nio-8071-exec-1][] INFO [org.apache.catalina.core.ContainerBase.[Tomcat].[localhost].[/]] [DirectJDKLog.java:173] - Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-07-25 22:56:33,226 [http-nio-8071-exec-1][] INFO [org.springframework.web.servlet.DispatcherServlet] [FrameworkServlet.java:525] - Initializing Servlet 'dispatcherServlet'
2021-07-25 22:56:33,230 [http-nio-8071-exec-1][] INFO [org.springframework.web.servlet.DispatcherServlet] [FrameworkServlet.java:547] - Completed initialization in 4 ms
2021-07-25 22:56:33,244 [http-nio-8071-exec-1][] WARN [org.springframework.web.servlet.mvc.support.DefaultHandlerExceptionResolver] [AbstractHandlerExceptionResolver.java:207] - Resolved [org.springframework.web.HttpRequestMethodNotSupportedException: Request method 'POST' not supported]
2021-07-25 22:56:50,386 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 0 complete
2021-07-25 22:56:50,386 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 1 complete
2021-07-25 22:56:50,387 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 2 complete
2021-07-25 22:56:50,387 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 3 complete
2021-07-25 22:56:50,387 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 4 complete
2021-07-25 22:56:50,387 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 5 complete
2021-07-25 22:56:50,388 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 6 complete
2021-07-25 22:56:50,388 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 7 complete
2021-07-25 22:56:50,388 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 8 complete
2021-07-25 22:56:50,388 [http-nio-8071-exec-3][af8bf1254fa746598fc87074ae5c1707] INFO [com.satan.customer.controller.TestController] [TestController.java:16] - business step 9 complete

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

java 根据 KeyPairGenerator对象生成RS

发表于 2021-07-25

java 根据 KeyPairGenerator对象生成RSA密钥对,并进行测试

说明:rsa 算法根据密钥长度, 每轮 加/解密 填充 允许的最大长度,也不相同,这个地方暂时还不知道是怎么计算出来的
限制:由于上面的原因,这里暂时只支持 密钥长度为 1024 / 2048 位长度
实现:根据 KeyPairGenerator 对象传入密钥长度,随机种子,生成rsa 密钥

编码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
java复制代码	package com.example.demo.util.encrypt;

import org.apache.commons.codec.binary.Base64;
import sun.security.rsa.RSACore;
import sun.security.rsa.RSAKeyFactory;

import javax.crypto.Cipher;
import java.io.ByteArrayOutputStream;
import java.security.*;
import java.security.interfaces.RSAKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;

/**
* 生成 rsa 密钥,通过 KeyPairGenerator 对象
*
* 尝试根据 密钥长度获取 每轮 加/解密 填充 允许的最大长度, 但是失败,先不尝试了
*/
public class RsaGenerate {


/**
* 解密 每轮的基准, 128 ,以 密钥 1024 位来算(与 私钥/公钥 解密无关)
*/
private static int decodePaddingBase = 128;

/**
* 加密 每轮的基准, 117 ,以 密钥1024 来算(与 私钥/公钥 加密无关)
*/
private static int encodePaddingBase = 117;


/**
* 密钥位数以 1024 位为基准
*/
private static int baseSize = 1024;





/**
* KeyFactory 不行
* KeyGenerator 不支持rsa
* @param args
* @throws NoSuchAlgorithmException
*/
public static void main(String[] args) throws Exception {
// 传入随机种子,生成 对称算法密钥对
int keySize = 1024;
String seedStr = "测试seed ajibahuihu";
String testStr = "这是一条测试数据,请注意 https://blog.csdn.net/kzcming";

process(keySize, seedStr, testStr);
}


/**
* 根据密钥的长度, 每轮 加/解密 填充 允许的最大长度,也不相同
* @param key
* @throws InvalidKeyException
*/
private static void makeSize(Key key) throws InvalidKeyException {
RSAKey var6 = RSAKeyFactory.toRSAKey(key);
int byteLength = RSACore.getByteLength(var6.getModulus());
if(key instanceof PrivateKey) encodePaddingBase = byteLength;
if(key instanceof PublicKey) decodePaddingBase = byteLength;

// if(key instanceof PrivateKey)
// init("1", key, JceSecurity.RANDOM,null);
}






private static void process(int keySize, String seedStr, String testStr) throws Exception {
KeyPair keyPair = getKeyPair(seedStr,keySize);
// 获得私钥,公钥
PrivateKey aPrivate = keyPair.getPrivate();
PublicKey aPublic = keyPair.getPublic();
byte[] privateEncoded = aPrivate.getEncoded();
byte[] publicEncoded = aPublic.getEncoded();
// Base64 编码
String privateKeyStr = Base64.encodeBase64String(privateEncoded);
String publicKeyStr = Base64.encodeBase64String(publicEncoded);
System.out.println("private:" + privateKeyStr);
System.out.println();
System.out.println("public:" + publicKeyStr);


// 执行测试
// int divide = keySize/baseSize;
// if( divide != 1 ) {
if(keySize == 2048) {
// makeSize(aPrivate);
// makeSize(aPublic);
encodePaddingBase = encodePaddingBase * 2;
decodePaddingBase = decodePaddingBase * 2;
}

// 公钥加密,私钥解密, 当然也可以反着,但是大家一般都这么弄
byte[] bytes = encryptByPublicKey(testStr.getBytes(), publicKeyStr);
byte[] bytes1 = decryptByPrivateKey(bytes, privateKeyStr);
System.out.println(new String(bytes1));
}

/**
* 获取对称加密密钥
*
* KeyPairGenerator 是生成对称加密密钥的 封装对象
* @param seedStr 随机种子字符串
* @param keySize 密钥长度, 必须是128 的倍数
* @return
* @throws NoSuchAlgorithmException
*/
private static KeyPair getKeyPair(String seedStr,int keySize) throws NoSuchAlgorithmException {
KeyPairGenerator gen = KeyPairGenerator.getInstance("RSA");
gen.initialize(keySize,new SecureRandom(seedStr.getBytes()));
return gen.generateKeyPair();
}


/**
* 私钥解密
* @param data
* @param privateKey
* @return
* @throws Exception
*/
public static byte[] decryptByPrivateKey(byte[] data, String privateKey) throws Exception {
byte[] keyBytes = Base64.decodeBase64(privateKey);
PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(keyBytes);
KeyFactory keyFactory = KeyFactory.getInstance("RSA");
Key privateK = keyFactory.generatePrivate(pkcs8KeySpec);
Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm());
cipher.init(2, privateK);
int inputLen = data.length;
ByteArrayOutputStream out = new ByteArrayOutputStream();
int offSet = 0;

int i = 0;
while (inputLen - offSet > 0) {
byte[] cache;

if (inputLen - offSet > decodePaddingBase) {
cache = cipher.doFinal(data, offSet, decodePaddingBase);
} else {
cache = cipher.doFinal(data, offSet, inputLen - offSet);
}
out.write(cache, 0, cache.length);
i++;
offSet = i * decodePaddingBase;
}
byte[] decryptedData = out.toByteArray();
out.close();
return decryptedData;
}


/**
* 公钥加密
* @param data
* @param publicKey
* @return
* @throws Exception
*/
public static byte[] encryptByPublicKey(byte[] data, String publicKey) throws Exception {
byte[] keyBytes = Base64.decodeBase64(publicKey);
X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes);
KeyFactory keyFactory = KeyFactory.getInstance("RSA");
Key publicK = keyFactory.generatePublic(x509KeySpec);

Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm());
cipher.init(1, publicK);
int inputLen = data.length;
ByteArrayOutputStream out = new ByteArrayOutputStream();
int offSet = 0;

int i = 0;
while (inputLen - offSet > 0) {
byte[] cache;
if (inputLen - offSet > encodePaddingBase) {
cache = cipher.doFinal(data, offSet, encodePaddingBase);
} else {
cache = cipher.doFinal(data, offSet, inputLen - offSet);
}
out.write(cache, 0, cache.length);
i++;
offSet = i * encodePaddingBase;
}
byte[] encryptedData = out.toByteArray();
out.close();
return encryptedData;
}


}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

锁升级过程(偏向锁/轻量级锁/重量级锁)

发表于 2021-07-25

锁的前置知识

如果想要透彻的理解java锁的来龙去脉,需要先了解锁的基础知识:锁的类型、java线程阻塞的代价、Markword。

锁的类型

锁从宏观上分类,分为悲观锁与乐观锁。

乐观锁

乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低。每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采取在写时先读出当前版本号,然后加锁操作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的操作。

java中的乐观锁基本都是通过CAS操作实现的,CAS是一种更新的原子操作,比较当前值跟传入值是否一样,一样则更新,否则失败。

悲观锁

悲观锁是就是悲观思想,即认为写多,遇到并发写的可能性高。每次去拿数据的时候都认为别人会修改,所以每次在读写数据的时候都会上锁,这样别人想读写这个数据就会阻塞(block)直到拿到锁。

java中的悲观锁就是Synchronize、AQS框架下的锁则是先尝试CAS乐观锁去获取锁,获取不到才会转换为悲观锁,如:ReentrantLock。

线程阻塞的代价

java的线程是映射到操作系统原生线程之上的,如果要阻塞或唤醒一个线程就需要操作系统介入,需要在用户态与核心态之间切换,这种切换会消耗大量的系统资源。因为用户态与内核态都有各自专用的内存空间,专用的寄存器等,用户态切换至内核态需要传递给许多变量、参数给内核,内核也需要保护好用户态在切换时的一些寄存器值、变量等,以便内核态调用结束后切换回用户态继续工作。

  1. 如果线程状态切换是一个高频操作时,这将会消耗很多CPU处理时间;
  2. 如果对于那些需要同步的简单的代码块,获取锁挂起操作消耗的时间比用户代码执行的时间还要长,这种同步策略显然非常糟糕的。

synchronized会导致争用不到锁的线程进入阻塞状态,所以说它是java语言中一个重量级的同步操纵,被称为重量级锁。为了缓解上述性能问题【synchronized】,JVM从1.5开始,引入了轻量锁与偏向锁,默认启用了自旋锁,他们都属于乐观锁。

明确java线程切换的代价,是理解java中各种锁的优缺点的基础之一。

Mark Word

在介绍java锁之前,先说下什么是markword,markword是java对象数据结构中的一部分,要详细了解java对象的结构可以点击这里,这里只做markword的详细介绍,因为对象的markword和java各种类型的锁密切相关;

markword数据的长度在32位和64位的虚拟机(未开启压缩指针)中分别为32bit和64bit,它的最后2bit是锁状态标志位,用来标记当前对象的状态,对象的所处的状态,决定了markword存储的内容,如下表所示:

状态 标志位 存储内容
未锁定 01 对象哈希码、对象分代年龄
轻量级锁定 00 指向锁记录的指针
膨胀(重量级锁定) 10 执行重量级锁定的指针
GC标记 11 空(不需要记录信息)
可偏向 01 偏向线程ID、偏向时间戳、对象分代年龄

32位虚拟机在不同状态下markword结构如下图所示:

这里写图片描述

了解了markword结构,有助于后面了解java锁的加锁解锁过程;

​ 前面提到了java的4种锁,他们分别是重量级锁、自旋锁、轻量级锁和偏向锁。不同的锁有不同特点,每种锁只有在其特定的场景下,才会有出色的表现,java中没有哪种锁能够在所有情况下都能有出色的效率,引入这么多锁的原因就是为了应对不同的情况;

Java中的锁

偏向锁

Java偏向锁(Biased Locking)是Java6引入的一项多线程优化,它通过消除资源无竞争情况下的同步原语,进一步提高了程序的运行性能。

偏向锁,顾名思义它会偏向于第一个访问锁的线程,如果在运行过程中,同步锁只有一个线程访问,不存在多线程争用的情况,则线程是不需要触发同步的,这种情况下,就会给线程加一个偏向锁。
如果在运行过程中,遇到了其他线程抢占锁,则持有偏向锁的线程会被挂起,JVM会消除它身上的偏向锁,将锁恢复到标准的轻量级锁。

偏向锁获取过程

  1. 访问Mark Word中偏向锁的标识是否设置成1,锁标志位是否为01,确认为可偏向状态。
  2. 如果为可偏向状态,则测试线程ID是否指向当前线程,如果是,进入步骤5,否则进入步骤3。
  3. 如果线程ID并未指向当前线程,则通过CAS操作竞争锁。如果竞争成功,则将Mark Word中线程ID设置为当前线程ID,然后执行5;如果竞争失败,执行4。
  4. 如果CAS获取偏向锁失败,则表示有竞争。当到达全局安全点(safepoint)时获得偏向锁的线程被挂起,偏向锁升级为轻量级锁,然后被阻塞在安全点的线程继续往下执行同步代码。(撤销偏向锁的时候会导致stop the world)
  5. 执行同步代码。

注意:第四步中到达安全点safepoint会导致stop the world,时间很短。

偏向锁获取过程

​ 偏向锁的撤销在上述第四步骤中有提到。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

偏向锁的适用场景

​ 始终只有一个线程在执行同步块,在它没有执行完释放锁之前,没有其它线程去执行同步块,在锁无竞争的情况下使用,一旦有了竞争就升级为轻量级锁,升级为轻量级锁的时候需要撤销偏向锁,撤销偏向锁的时候会导致stop the world操作;
在有锁的竞争时,偏向锁会多做很多额外操作,尤其是撤销偏向锁的时候会导致进入安全点,安全点会导致stw,导致性能下降,这种情况下应当禁用。

查看停顿–安全点停顿日志

要查看安全点停顿,可以打开安全点日志,通过设置JVM参数 -XX:+PrintGCApplicationStoppedTime 会打出系统停止的时间,添加 -XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticsCount=1 这两个参数会打印出详细信息,可以查看到使用偏向锁导致的停顿,时间非常短暂,但是争用严重的情况下,停顿次数也会非常多;

注意:安全点日志不能一直打开,只在问题排查时打开:

  1. 安全点日志默认输出到stdout,一是stdout日志的整洁性,二是stdout所重定向的文件如果不在/dev/shm,可能被锁。
  2. 对于一些很短的停顿,比如取消偏向锁,打印的消耗比停顿本身还大。
  3. 安全点日志是在安全点内打印的,本身加大了安全点的停顿时间。

如果在生产系统上要打开,再再增加下面四个参数:-XX:+UnlockDiagnosticVMOptions -XX: -DisplayVMOutput -XX:+LogVMOutput -XX:LogFile=/dev/shm/vm.log 打开Diagnostic(只是开放了更多的flag可选,不会主动激活某个flag),关掉输出VM日志到stdout,输出到独立文件,/dev/shm目录(内存文件系统)。

这里写图片描述
第一部分是时间戳,VM Operation的类型;

第二部分是线程概况,被中括号括起来【total: 安全点里的总线程数、initially_running: 安全点开始时正在运行状态的线程数、wait_to_block: 在VM Operation开始前需要等待其暂停的线程数】;

第三部分是到达安全点时的各个阶段以及执行操作所花的时间【spin:等待线程响应safepoint号召的时间、block:暂停所有线程所用的时间、sync:等于spin+block,这是从开始到进入安全点所耗的时间,可用于判断进入安全点耗时、cleanup:清理所用时间、vmop:真正执行VMOperation的时间】其中最重要的是vmop。

可见,那些很多但又很短的安全点,全都是RevokeBias, 高并发的应用会禁用掉偏向锁。

Jvm开启/关闭偏向锁

  • 开启偏向锁:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0
  • 关闭偏向锁:-XX:-UseBiasedLocking

轻量级锁

轻量级锁是由偏向锁升级来的,偏向锁运行在一个线程进入同步块的情况下,当第二个线程加入锁争用的时候,偏向锁就会升级为轻量级锁;

轻量级锁加锁过程

  1. 在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,官方称之为 Displaced Mark Word。这时候线程堆栈与对象头的状态如图所示:

这里写图片描述
2. 拷贝对象头中的Mark Word复制到锁记录中;
3. 拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock record里的owner指针指向object mark word。如果更新成功,则执行步骤4,否则执行步骤5。
4. 如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,即表示此对象处于轻量级锁定状态,这时候线程堆栈与对象头的状态如图所示:

这里写图片描述
5. 如果这个更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行。否则说明多个线程竞争锁,轻量级锁就要膨胀为重量级锁,锁标志的状态值变为“10”,Mark Word中存储的就是指向重量级锁(互斥量)的指针,后面等待锁的线程也要进入阻塞状态。 而当前线程便尝试使用自旋来获取锁,自旋就是为了不让线程阻塞,而采用循环去获取锁的过程。

轻量级锁释放过程

释放锁线程视角:由轻量锁切换到重量锁,是发生在轻量锁释放锁的期间,之前在获取锁的时候它拷贝了锁对象头的markword,在释放锁的时候如果它发现在它持有锁的期间有其他线程来尝试获取锁了,并且该线程对markword做了修改,两者比对发现不一致,则切换到重量锁。

因为重量级锁被修改了,所有display mark word和原来的markword不一样了。怎么补救?就是进入mutex前,compare一下obj的markword状态,确认该markword是否被其他线程持有。此时如果线程已经释放了markword,那么通过CAS后就可以直接进入线程,无需进入mutex,就这个作用。

尝试获取锁线程视角:如果线程尝试获取锁的时候,轻量锁正被其他线程占有,那么它就会修改markword,修改重量级锁,表示该进入重量锁了。

还有一个注意点:等待轻量锁的线程不会阻塞,它会一直自旋等待锁,并如上所说修改markword。

这就是自旋锁,尝试获取锁的线程,在没有获得锁的时候,不被挂起,而转而去执行一个空循环,即自旋。在若干个自旋后,如果还没有获得锁,则才被挂起,获得锁,则执行代码。

自旋锁

自旋锁原理非常简单,如果持有锁的线程能在很短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞挂起状态,它们只需要等一等(自旋),等持有锁的线程释放锁后即可立即获取锁,这样就避免用户线程和内核的切换的消耗。

但是线程自旋是需要消耗CPU的,说白了就是让CPU在做无用功,如果一直获取不到锁,那线程也不能一直占用CPU自旋做无用功,所以需要设定一个自旋等待的最大时间。如果持有锁的线程执行的时间超过自旋等待的最大时间扔没有释放锁,就会导致其它争用锁的线程在最大等待时间内还是获取不到锁,这时争用线程会停止自旋进入阻塞状态。

自旋锁的优缺点

​ 优:自旋锁尽可能的减少线程的阻塞,这对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗,这些操作会导致线程发生两次上下文切换!

​ 缺:但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁执行同步块,这时候就不适合使用自旋锁了,因为自旋锁在获取锁前一直都是占用cpu做无用功,占着茅坑又不拉屎,同时有大量线程在竞争一个锁,会导致获取锁的时间很长,线程自旋的消耗大于线程阻塞挂起操作的消耗,其它需要CPU的线程又不能获取到cpu,造成cpu的浪费。所以这种情况下我们要关闭自旋锁;

自旋锁时间阈值

​ 自旋锁的目的是为了占着CPU的资源不释放,等到获取到锁立即进行处理。但是如何去选择自旋的执行时间呢?如果自旋执行时间太长,会有大量的线程处于自旋状态占用CPU资源,进而会影响整体系统的性能。因此自旋的周期选的额外重要!

​ JVM对于自旋周期的选择,jdk1.5这个限度是一定的写死的,在1.6引入了适应性自旋锁,适应性自旋锁意味着自旋的时间不在是固定的了,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定,基本认为一个线程上下文切换的时间是最佳的一个时间,同时JVM还针对当前CPU的负荷情况做了较多的优化:

  1. 如果平均负载小于CPUs则一直自旋;
  2. 如果有超过(CPUs/2)个线程正在自旋,则后来线程直接阻塞;
  3. 如果正在自旋的线程发现Owner发生了变化则延迟自旋时间(自旋计数)或进入阻塞;
  4. 如果CPU处于节电模式则停止自旋;
  5. 自旋时间的最坏情况是CPU的存储延迟(CPU A存储了一个数据,到CPU B得知这个数据直接的时间差);
  6. 自旋时会适当放弃线程优先级之间的差异;

自旋锁的开启

JDK1.6中 -XX:+UseSpinning 开启;-XX:PreBlockSpin=10 为自旋次数;
JDK1.7后,去掉此参数,由jvm控制;

重量级锁Synchronized

在JDK1.5之前都是使用synchronized关键字保证同步的,它可以保证方法或者代码块在运行时,同一时刻只有一个方法可以进入到临界区,同时它还可以保证共享变量的内存可见性。

  • 普通同步方法,锁是当前实例对象 ;
  • 静态同步方法,锁是当前类的class对象 ;
  • 同步方法块,锁是括号里面的对象;

Synchronized的实现

这里写图片描述

它有多个队列,当多个线程一起访问某个对象监视器的时候,对象监视器会将这些线程存储在不同的容器中。

  1. Contention List:竞争队列,所有请求锁的线程首先被放在这个竞争队列中;
  2. Entry List:候选者队列 ,Contention List中那些有资格成为候选资源的线程被移动到Entry List中;
  3. Wait Set:阻塞队列,哪些调用wait方法被阻塞的线程被放置在这里;
  4. OnDeck:任意时刻,最多只有一个线程正在竞争锁资源,该线程被成为OnDeck;
  5. Owner:当前已经获取到所资源的线程被称为Owner;
  6. !Owner:当前释放锁的线程。

JVM每次从队列的尾部取出一个数据用于锁竞争候选者(OnDeck),但是并发情况下,ContentionList会被大量的并发线程进行CAS访问,为了降低对尾部元素的竞争,JVM会将一部分线程移动到EntryList中作为候选竞争线程。Owner线程会在unlock时,将ContentionList中的部分线程迁移到EntryList中,并指定EntryList中的某个线程为OnDeck线程(一般是最先进去的那个线程)。Owner线程并不直接把锁传递给OnDeck线程,而是把锁竞争的权利交给OnDeck,OnDeck需要重新竞争锁。这样虽然牺牲了一些公平性,但是能极大的提升系统的吞吐量,在JVM中,也把这种选择行为称之为“竞争切换”。

OnDeck线程获取到锁资源后会变为Owner线程,而没有得到锁资源的仍然停留在EntryList中。如果Owner线程被wait方法阻塞,则转移到WaitSet队列中,直到某个时刻通过notify或者notifyAll唤醒,会重新进去EntryList中。

处于ContentionList、EntryList、WaitSet中的线程都处于阻塞状态,该阻塞是由操作系统来完成的(Linux内核下采用pthread_mutex_lock内核函数实现的)。

Synchronized是非公平锁,在线程进入ContentionList时,等待的线程会先尝试自旋获取锁,如果获取不到就进入ContentionList,这明显对于已经进入队列的线程是不公平的,还有一个不公平的事情就是自旋获取锁的线程还可能直接抢占OnDeck线程的锁资源。

Synchronized锁的演变过程

  1. 检测Mark Word里面是不是当前线程的ID,如果是则表示当前线程处于偏向锁;
  2. 如果不是,则使用CAS将当前线程的ID替换Mard Word,如果成功则表示当前线程获得偏向锁,置偏向标志位1;
  3. 如果失败,则说明发生竞争,撤销偏向锁,进而升级为轻量级锁;
  4. 当前线程使用CAS将对象头的Mark Word替换为锁记录指针,如果成功,当前线程获得锁;
  5. 如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁;
  6. 如果自旋成功则依然处于轻量级状态;
  7. 如果自旋失败,则升级为重量级锁;

在所有的锁都启用的情况下线程进入临界区时会先去获取偏向锁,如果已经存在偏向锁了,则会尝试获取轻量级锁,启用自旋锁,如果自旋也没有获取到锁,则使用重量级锁,没有获取到锁的线程阻塞挂起,直到持有锁的线程执行完同步块唤醒他们;

偏向锁是在无锁争用的情况下使用的,也就是同步开在当前线程没有执行完之前,没有其它线程会执行该同步块,一旦有了第二个线程的争用,偏向锁就会升级为轻量级锁,如果轻量级锁自旋到达阈值后,没有获取到锁,就会升级为重量级锁;

注意:如果线程争用激烈,那么应该禁用偏向锁。

关注公众号数据工匠记 ,专注于java大数据领域离线、实时技术干货定期分享!个人网站 www.lllpan.top

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Cron表达式 1 Cron表达式简介 2 Cron表达式的

发表于 2021-07-25

因博主最近使用定时任务时,关于Cron表达式的使用,出现问题,故记录一下问题,避免大家踩坑

1 Cron表达式简介

Cron,又称计划任务: 是任务在约定的时间执行已经计划好的工作. 在Linux中,我们经常用到 cron 服务器来完成这项工作。cron服务器可以根据配置文件约定的时间来执行特定的任务。

2 Cron表达式的格式

Cron的语法格式有如下两种:

  • Seconds Minutes Hours DayofMonth Month DayofWeek Year
  • Seconds Minutes Hours DayofMonth Month DayofWeek

1 表达式每个域支持的值

名称 是否必须 允许值 特殊字符
秒 是 0-59 , - * /
分 是 0-59 , - * /
时 是 0-23 , - * /
日 是 1-31 , - * ? / L W C
月 是 1-12 或 JAN-DEC , - * /
周 是 ==1-7== 或 SUN-SAT , - * ? / L C #
年 否 空 或 1970-2099 , - * /

ps:

1 月份和星期的名称是不区分大小写的。FRI 和 fri 是一样的。 域之间有空格分隔.

2 表中周的1-7用颜色突出,后续解释使用中出现的问题.

2 特殊字符说明

*** 星号**

1
2
3
4
5
text复制代码使用星号(*) 指示着你想在这个域上包含所有合法的值。例如,在月份域上使用星号意味着每个月都会触发这个 trigger。

表达式样例:
0 * 17 * * ?
意义:每天从下午5点到下午5:59中的每分钟激发一次trigger.

? 问号

1
2
3
4
5
text复制代码? 号只能用在日和周域上,但是不能在这两个域上同时使用。你可以认为 ? 字符是 "我并不关心在该域上是什么值。" 这不同于星号,星号是指示着该域上的每一个值。? 是说不为该域指定值。 

表达式样例:
0 10,44 14 ? 3 WED
意义:在三月中的每个星期三的下午 2:10 和 下午 2:44 被触发。

, 逗号

1
2
3
4
5
test复制代码逗号 (,) 是用来在给某个域上指定一个值列表的。例如,使用值 0,15,30,45 在秒域上意味着每15秒触发一个 trigger。 

表达式样例:
0 0,15,30,45 * * * ?
意义:每刻钟触发一次 trigger。

/ 斜杠

1
2
3
4
5
text复制代码斜杠 (/) 是用于时间表的递增的。我们刚刚用了逗号来表示每15分钟的递增,但是我们也能写成这样 0/15。

表达式样例:
0/15 0/30 * * * ?
意义:在整点和半点时每15秒触发 trigger。

-中划线

1
2
3
4
5
text复制代码中划线 (-) 用于指定一个范围。例如,在小时域上的 3-8 意味着 "3,4,5,6,7 和 8 点。"  域的值不允许回转,所以像 50-10 这样的值是不允许的。 

表达式样例:
0 45 3-8 ? * *
意义:在上午的3点至上午的8点的45分时触发 trigger。

L 字母

1
2
3
4
5
text复制代码L 说明了某域上允许的最后一个值。它仅被日和周域支持。当用在日域上,表示的是在月域上指定的月份的最后一天。

表达式样例:
0 0 8 L * ?
意义: 是在每个月最后一天的上午 8:00 触发 trigger。在月域上的 * 说明是 "每个月"。

W 字母 WorkDay

1
2
3
text复制代码W 字符代表着平日 (Mon-Fri),并且仅能用于日域中。它用来指定离指定日的最近的一个平日。W 只能用在指定的日域为单天,不能是范围或列表值。

例如,日域中的 15W 意味着 "离该月15号的最近一个平日。" 假如15号是星期六,那么 trigger 会在14号(星期五)触发.

#井号

1
2
text复制代码字符仅能用于周域中。它用于指定月份中的第几周的哪一天。
例如,如果你指定周域的值为 6#3,它意思是某月的第三个周五 (6=星期五,#3意味着月份中的第三周)。另一个例子 2#1 意思是某月的第一个星期一 (2=星期一,#1意味着月份中的第一周)。注意,假如你指定 #5,然而月份中没有第 5 周,那么该月不会触发。

3 使用遇到的问题

1 关于问题

​ 博主的要求时, 每周日0点执行定时任务. 验证定时任务功能时, 博主都是具体到每天的几点钟几分钟去验证,验证结果没有问题,定时任务可以运行,且逻辑正常.

​ 结合自己的理解和网上资料,我给出的Cron表达式是 0 0 0 ? * 1 因为这个表达式,自己没有去验证,导致定时任务出现问题. 定时任务周日0点没有执行,到了周一0点才执行的,我一脸懵. 很奇怪怎么会这样.经过查看资料发现,cron表达式准备很多.

常用的quartz框架中cron表达式,:

  • 1-7 分别是,周日,周一…周六.

Spring中@scheduled注解中cron表达式:

  • 1-7 分别是周一,周二,…周日,且用0也表示周日,这个cron的标准,是linux/unix 和 github.com/robfig/cron 里遵循的.

2 解决方法

1 Cron表达式中周域使用英文表示

​ 因为Cron表达式周域中支持英文单词缩写, 即用SUN, MON, TUE, WED, THU, FRI, SAT,分别表示星期天,星期一,…星期六.

​ 采用英文缩写的方式,能直接避免上述的问题,因为不管什么Cron标准,都是支持英文缩写的,所以使用通用的表达方式,可以更好的兼容不同的场景使用该表达式.

4 总结

​ 定义的每一个Cron表达式,一定要测试验证一下. 因为整个定时任务,所有做的一切都是为了任务能够在==准确的时间==去执行==指定的任务==. 二者缺一不可.

所以我们一定要以测试结果为依据.即我们要以唯物辩证的角度客观地去认证事务的结果.千万不要觉得不重要,事情很小,没问题,而疏忽掉这个细节.

毕竟,==千里之堤,溃于蚁穴;百步之室,以突隙之烟焚==的故事不算少数.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【2021最新版】RabbitMQ面试题总结(32道题含答案

发表于 2021-07-25

前言

最近面试的小伙伴很多,对此我整理了一份Java面试题手册:基础知识、JavaOOP、Java集合/泛型面试题、Java异常面试题、Java中的IO与NIO面试题、Java反射、Java序列化、Java注解、多线程&并发、JVM、Mysql、Redis、Memcached、MongoDB、Spring、SpringBoot、SpringCloud、RabbitMQ、Dubbo、MyBatis、ZooKeeper、数据结构、算法、Elasticsearch、Kafka、微服务、Linux等等。可以分享给大家学习。【持续更新中】

1、什么是rabbitmq?

答:

采用AMQP高级消息队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦。

2、为什么要使用rabbitmq?

答:
1、在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;

2、拥有持久化的机制,进程消息,队列中的信息也可以保存下来。

3、实现消费者和生产者之间的解耦。

4、对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。

5、可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。

3、使用rabbitmq的场景。

答:

1、服务间异步通信

2、顺序消费

3、定时任务

4、请求削峰

4、如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?

答:

发送方确认模式

1.将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。

2.一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

3.如果 RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(notacknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

接收方确认机制

接收方消息确认机制

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;

下面罗列几种特殊情况

如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。

(可能存在消息重复消费的隐患,需要去重)如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

5.如何避免消息重复投递或重复消费?

答:

在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;

在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID 等)作为去重的依据,避免同一条消息被重复消费。

6、消息基于什么传输?

答:

由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制

7、消息如何分发?

答:

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。

通过路由可实现多消费的功能

8、消息怎么路由?

答:

消息提供方->路由->一至多个队列

消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。

通过队列路由键,可以把队列绑定到交换器上。

消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);

常用的交换器主要分为一下三种

1.fanout:如果交换器收到消息,将会广播到所有绑定的队列上

2.direct:如果路由键完全匹配,消息就被投递到相应的队列

3.topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符

9、如何确保消息不丢失?

答:

消息持久化,当然前提是队列必须持久化

RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应。

一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前RabbitMQ重启,那么Rabbit会自动重建交换器和队列(以及绑定),并重新发布持久化日志文件中的消息到合适的队列。

10、使用RabbitMQ有什么好处?

答:

1、服务间高度解耦

2、异步通信性能高

3、流量削峰

11、RabbitMQ的集群。

答:

镜像集群模式

你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。

好处在于,你任何一个机器宕机了,没事儿,别的机器都可以用。坏处在于,第一,这个性能开销也太大了吧,消息同步所有机器,导致网络带宽压力和消耗很重!第二,这么玩儿,就没有扩展性可言了,如果某个queue负载很重,你加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性扩展你的queue。

12、mq的缺点

答:

系统可用性降低

系统引入的外部依赖越多,越容易挂掉,本来你就是A系统调用BCD三个系统的接口就好了,人 ABCD四个系统好好的,没啥问题,你偏加个MQ进来,万一MQ挂了咋整?MQ挂了,整套系统崩溃了,你不就完了么。

系统复杂性提高硬生生加个MQ进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。

一致性问题

A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,咋整?你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,最好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了10倍。但是关键时刻,用,还是得用的。

13、什么是MQ ?

答:

MQ就是消息队列。是软件和软件进行通信的中间件产品。

14、MQ的优点。

答:

简答

异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。

应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。

流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。

日志处理 - 解决大量日志传输。

消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

15、解耦、异步、削峰是什么?

答:

解耦:A系统发送数据到BCD三个系统,通过接口调用发送。如果E系统也要这个数据呢?那如果C系统现在不需要了呢?A系统负责人几乎崩溃A 系统跟其它各种乱七八糟的系统严重耦合,A系统产生一条比较关键的数据,很多系统都需要A系统将这个数据发送过来。如果使用MQ,A系统产生一条数据,发送到MQ里面去,哪个系统需要数据自己去MQ里面消费。如果新系统需要数据,直接从MQ里消费即可;如果某个系统不需要这条数据了,就取消对MQ消息的消费即可。这样下来,A系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其 实这个调用是不需要直接同步调用接口的,如果用MQ给它异步化解耦。

异步:A系统接收一个请求,需要在自己本地写库,还需要在BCD三个系统写库,自己本地写库要3ms,BCD三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是3+300+450+200=953ms,接近1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。

如果使用MQ,那么A系统连续发送3条消息到MQ队列中,假如耗时5ms,A系统从接受一个请求到返回响应给用户,总时长是3+5=8ms。

削峰:减少高峰时期对服务器压力。

16、消息队列有什么缺点?

答:

缺点有以下几个:

  1. 系统可用性降低

本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;

  1. 系统复杂度提高

加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。

  1. 一致性问题

A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,咋整?你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方 案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关 键时刻,用,还是得用的。

17、你们公司生产环境用的是什么消息中间件?

答:

这个首先你可以说下你们公司选用的是什么消息中间件,比如用的是RabbitMQ,然后可以初步给一些你对不同MQ中间件技术的选型分析。

举个例子:比如说ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。

但是问题在于没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。

然后你可以说说RabbitMQ,他的好处在于可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。

另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。

而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。

除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ。

但是RabbitMQ也有一点缺陷,就是他自身是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。

然后可以聊聊RocketMQ,是阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。

而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。

另外就是Kafka。Kafka提供的消息中间件的功能明显较少一些,相对上述几款MQ中间件要少很多。

但是Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。

因此Kafka在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。但是在传统的MQ中间件使用场景中较少采用。

18、Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

答:


综上,各种对比之后,有如下建议:

一般的业务系统要引入MQ,最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

后来大家开始用RabbitMQ,但是确实erlang语言阻止了大量的Java工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

不过现在确实越来越多的公司会去用RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ已捐给Apache,但GitHub上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用RocketMQ,否则回去老老实实用RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

19、MQ有哪些常见问题?如何解决这些问题?

答:

MQ的常见问题有:

消息的顺序问题

消息的重复问题

消息的顺序问题

消息有序指的是可以按照消息的发送顺序来消费。

假如生产者产生了2条消息:M1、M2,假定 M1 发送到S1,M2 发送到S2,如果要保证 M1先于M2被消费,怎么做?

解决方案:

  1. 保证生产者-MQServer-消费者是一对一对一的关系:

    缺陷:

并行度就会成为消息系统的瓶颈(吞吐量不够)

更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。 (2)通过合理的设计或者将问题分解来规避。

不关注乱序的应用实际大量存在

队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。

消息的重复问题

造成消息重复的根本原因是:网络不可达。

所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息 ID已经在日志表中,那么就不再处理这条消息。

20、RabbitMQ基本概念。

答:

Broker:简单来说就是消息队列服务器实体

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列

Queue:消息队列载体,每个消息都会被投入到一个或多个队列

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来

Routing Key: 路由关键字,exchange根据这个关键字进行消息投递

VHost:vhost可以理解为虚拟broker ,即mini-RabbitMQ server。其内部均含有独立的queue、exchange和binding等,但最最重要的是,其拥有独立的权限系统,可以做到vhost范围的用户控制。当然,从RabbitMQ的全局角度,vhost可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。

Producer: 消息生产者,就是投递消息的程序

Consumer:消息消费者,就是接受消息的程序

Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

21、RabbitMQ的工作模式。

答:

一.simple模式(即最简单的收发模式)

在这里插入图片描述

  1. 消息产生消息,将消息放入队列
  2. 消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。

二.work工作模式(资源的竞争)

3.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。

4.C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。

三.publish/subscribe发布订阅(共享资源)


5.每个消费者监听自己的队列;

6.生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

四.routing路由模式
在这里插入图片描述
8. 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

9.根据业务功能定义路由字符串;

10.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

11.业务场景:error通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误; 五.topic 主题模式(路由模式的一种)


12. 星号井号代表通配符

13.星号代表多个单词,井号代表一个单词

14.路由功能添加模糊匹配

15.消息产生者产生消息,把消息交给交换机

16.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)

22、如何保证RabbitMQ消息的顺序性?

答:

拆分多个queue(消息队列),每个queue(消息队列) 一个consumer(消费者),就是多一些queue(消息队列)而已,确实是麻烦点;

或者就一个queue (消息队列)但是对应一个consumer(消费者),然后这个consumer(消费者)内部用内存队列做排队,然后分发给底层不同的worker来处理。

23、消息如何分发?

答:

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。

24、消息怎么路由?

答:

消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);

常用的交换器主要分为一下三种:

  1. fanout:如果交换器收到消息,将会广播到所有绑定的队列上
  2. direct:如果路由键完全匹配,消息就被投递到相应的队列
  3. topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符

25、消息基于什么传输?

答:

由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。

26、如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?

答:

先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;

但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。

针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;

比如:在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;

假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

27、如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?

答:

发送方确认模式

将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。

一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(notacknowledged,未确认)消息。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

接收方确认机制消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确
认了消息,RabbitMQ才能安全地把消息从队列中删除。

这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;

下面罗列几种特殊情况

如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

28、如何保证RabbitMQ消息的可靠传输?

答:

消息不可靠的情况可能是消息丢失,劫持等原因;

丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;

  1. 生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;

transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;

rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;

如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

  1. 消息队列丢数据:消息持久化。

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。

这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。

这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢?

这里顺便说一下吧,其实也很容易,就下面两步

  1. 将queue的持久化标识durable设置为true,则代表是一个持久的队列

2.发送消息的时候将deliveryMode=2这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据

3.消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;

如果这时处理消息失败,就会丢失该消息;

解决方案:处理消息成功后,手动回复确认消息。

29、为什么不应该对所有的message都使用持久化机制?

答:

首先,必然导致性能的下降,因为写磁盘比写RAM慢的多,message的吞吐量可能有10倍的差距。

其次,message的持久化机制用在RabbitMQ的内置cluster方案时会出现“坑爹”问题。矛盾点在于,若message设置了persistent属性,但queue未设置durable属性,那么当该queue的owner node出现异常后,在未重建该queue前,发往该queue 的message将被 blackholed;若 message 设置了 persistent属性,同时queue也设置了durable属性,那么当queue的owner node异常且无法重启的情况下,则该queue无法在其他node上重建,只能等待其owner node重启后,才能恢复该 queue的使用,而在这段时间内发送给该queue的message将被 blackholed 。

所以,是否要对message进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到100,000 条/秒以上的消息吞吐量(单RabbitMQ服务器),则要么使用其他的方式来确保message的可靠delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用SSD)。

另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

30、如何保证高可用的?RabbitMQ的集群?

答:

RabbitMQ是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以RabbitMQ为例子讲解第一种MQ的高可用性怎么实现。RabbitMQ有三种模式:单机模式、普通集群模式、镜像集群模式。

  1. 单机模式,就是Demo级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式
  2. 普通集群模式:

意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。

你创建的queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue的元数据(元数据可以认为是queue的一些配置信息,通过元数据,可以找到queue所在实例)。

你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个queue的读写操作。

  1. 镜像集群模式:

这种模式,才是所谓的RabbitMQ的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据的意思。然后每次你写消息到queue的时候,都会自动把消息同步到多个实例的queue上。

RabbitMQ有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个queue的完整数据,别的consumer都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ一个queue的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个queue的完整数据。

31、如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,怎么办?

答:

消息积压处理办法:临时紧急扩容:

先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉。

新建一个topic,partition是原来的10倍,临时建立好原先10倍的queue数量。

然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。

接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。这种做法相当于是临时将 queue 资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。

等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer机器来消费消息。

MQ中消息失效:假设你用的是RabbitMQ,RabbtiMQ是可以设置过期时间的,也就是 TTL。如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了。

这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,
然后重新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。假设1万个订单积压在mq里面,没有处理,其中 1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。

mq消息队列块满了:如果消息积压在mq里,你很长时间都没有处理掉,此时导致mq都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

32、设计MQ思路。

答:

比如说这个消息队列系统,我们从以下几个角度来考虑一下:

首先这个mq得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下kafka的设计理念,broker->topic->partition,每个partition放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给topic增加partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?

其次你得考虑一下这个mq的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。

其次你考虑一下你的mq的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的kafka的高可用保障机制。多副本 -> leader&follower->broker挂了重新选举 leader 即可对外服务。能不能支持数据0丢失啊?可以呀,有点复杂的。

总结

该面试题答案解析完整文档获取方式:RabbitMQ面试题总结
我这边还整理了一份Java的系统化资料:(包括Java核心知识点、面试专题和21年最新的互联网真题、电子书等)有需要的朋友可以关注公众号【程序媛小琬】即可获取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Springboot ConnectTimeout与Read

发表于 2021-07-25

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

王者并发课-钻石1:明心见性-如何由表及里精通线程池设计与原

发表于 2021-07-25

欢迎来到《王者并发课》,本文是该系列文章中的第24篇,砖石中的第1篇。

在钻石系列中,我们将学习线程池相关的框架和工具类。作为铂金系列的第一篇,我们将在这篇文章中深入讲解线程池的应用及原理。

关于线程池,无论是在实际的项目开发还是面试,它都是并发编程中当之无愧的重中之重。因此,掌握线程池是每个Java开发者的必备技能。

本文将从线程池的应用场景和设计原理出发,先带大家手撸一个线程池,在理解线程池的内部构造后,再深入剖析Java中的线程池。全文大约2.5万字,篇幅较长,在阅读时建议先看目录再看内容。

一、为什么要使用线程池

在前面系列文章的学习中,你已然知道多线程可以加速任务的处理、提高系统的吞吐量。那么,是否我们因此就可以频繁地创建新的线程呢?答案是否定的。频繁地繁创建和启用新的线程不仅代价昂贵,而且无限增加的线程势必也会造成管理成本的急剧上升。因此,为了平衡多线程的收益和成本,线程池诞生了。

1. 线程池的使用场景

生产者与消费者问题是线程池的典型应用场景。当你有源源不断的任务需要处理时,为了提高任务的处理速度,你需要创建多个线程。那么,问题来了,如何管理这些任务和多线程呢?答案是:线程池。

线程池的池化(Pooling)原理的应用并不局限于Java中,在MySQL和诸多的分布式中间件系统中都有着广泛的应用。当我们链接数据库的时候,对链接的管理用的是线程池;当我们使用Tomcat时,对请求链接的管理用的也是线程池。所以,当你有批量的任务需要多线程处理时,那么基本上你就需要使用线程池。

2. 线程池的使用好处

线程池的好处主要体现在三个方面:系统资源、任务处理速度和相关的复杂度管理,主要表现在:

  • 降低系统的资源开销:通过复用线程池中的工作线程,避免频繁创建新的线程,可以有效降低系统资源的开销;
  • 提高任务的执行速度:新任务达到时,无需创建新的线程,直接将任务交由已经存在的线程进行处理,可以有效提高任务的执行速度;
  • 有效管理任务和工作线程:线程池内提供了任务管理和工作线程管理的机制。

为什么说创建线程是昂贵的

现在你已经知道,频繁地创建新线程需要付出额外的代价,所以我们使用了线程池。那么,创建一个新的线程的代价究竟是怎样的呢?可以参考以下几点:

  • 创建线程时,JVM必须为线程堆栈分配和初始化一大块内存。每个线程方法的调用栈帧都会存储到这里,包括局部变量、返回值和常量池等;
  • 在创建和注册本机线程时,需要和宿主机发生系统调用;
  • 需要创建、初始化描述符,并将其添加到 JVM 内部数据结构中。

另外,从某种意义上说,只要线程还活着,它就会占用资源,这不仅昂贵,而且浪费。 例如 ,线程堆栈、访问堆栈的可达对象、JVM 线程描述符、操作系统本机线程描述符等等,在线程活着的时候,这些资源都会持续占据。

虽然不同的Java平台在创建线程时的代价可能有所差异,但总体来说,都不便宜。

3. 线程池的核心组成

一个完整的线程池,应该包含以下几个核心部分:

  • 任务提交:提供接口接收任务的提交;
  • 任务管理:选择合适的队列对提交的任务进行管理,包括对拒绝策略的设置;
  • 任务执行:由工作线程来执行提交的任务;
  • 线程池管理:包括基本参数设置、任务监控、工作线程管理等。

二、如何手工制作线程池

通过第一部分的阅读,现在你已经了解了线程池的作用及它的核心组成。为了更深刻地理解线程池的组成,在这一部分我们通过简单的四步来手工制作一个简单的线程池。当然,麻雀虽小,五脏俱全。如果你能手工自制线程池之后,那么在理解后续的Java中的线程池时,将会易如反掌。

1. 线程池设计和制作

第一步:定义一个王者线程池:TheKingThreadPool,它是这次手工制作中名副其实的主角儿。在这个线程池中,包含了任务队列管理、工作线程管理,并提供了可以指定队列类型的构造参数,以及任务提交入口和线程池关闭接口。你看,虽然它看起来似乎很迷你,但是线程池的核心组件都已经具备了,甚至在它的基础上,你完全可以把它扩展成更为成熟的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码/**
* 王者线程池
*/
public class TheKingThreadPool {
private final BlockingQueue<Task> taskQueue;
private final List<Worker> workers = new ArrayList<>();
private ThreadPoolStatus status;

/**
* 初始化构建线程池
*
* @param worksNumber 线程池中的工作线程数量
* @param taskQueue 任务队列
*/
public TheKingThreadPool(int worksNumber, BlockingQueue<Task> taskQueue) {
this.taskQueue = taskQueue;
status = ThreadPoolStatus.RUNNING;
for (int i = 0; i < worksNumber; i++) {
workers.add(new Worker("Worker" + i, taskQueue));
}
for (Worker worker : workers) {
Thread workThread = new Thread(worker);
workThread.setName(worker.getName());
workThread.start();
}
}

/**
* 提交任务
*
* @param task 待执行的任务
*/
public synchronized void execute(Task task) {
if (!this.status.isRunning()) {
throw new IllegalStateException("线程池非运行状态,停止接单啦~");
}
this.taskQueue.offer(task);
}

/**
* 等待所有任务执行结束
*/
public synchronized void waitUntilAllTasksFinished() {
while (this.taskQueue.size() > 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 关闭线程池
*/
public synchronized void shutdown() {
this.status = ThreadPoolStatus.SHUTDOWN;
}

/**
* 停止线程池
*/
public synchronized void stop() {
this.status = ThreadPoolStatus.SHUTDOWN;
for (Worker worker : workers) {
worker.doStop();
}
}
}

第二步:设计并制作工作线程。工作线程是干活的线程,将负责处理提交到线程池中的任务,我们把它叫做Worker。其实,这里的Worker的定义和Java线程池中的Worker已经很像了,它继承了Runnable接口并封装了Thread. 在构造Worker时,可以设定它的名字,并传入任务队列。当Worker启动后,它将会从任务队列中获取任务并执行。此外,它还提供了Stop方法,用以响应线程池的状态变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码
/**
* 线程池中用于执行任务的线程
*/
public class Worker implements Runnable {
private final String name;
private Thread thread = null;
private final BlockingQueue<Task> taskQueue;
private boolean isStopped = false;
private AtomicInteger counter = new AtomicInteger();

public Worker(String name, BlockingQueue<Task> queue) {
this.name = name;
taskQueue = queue;
}

public void run() {
this.thread = Thread.currentThread();
while (!isStopped()) {
try {
Task task = taskQueue.poll(5L, TimeUnit.SECONDS);
if (task != null) {
note(this.thread.getName(), ":获取到新的任务->", task.getTaskDesc());
task.run();
counter.getAndIncrement();
}
} catch (Exception ignored) {
}
}
note(this.thread.getName(), ":已结束工作,执行任务数量:" + counter.get());
}

public synchronized void doStop() {
isStopped = true;
if (thread != null) {
this.thread.interrupt();
}
}

public synchronized boolean isStopped() {
return isStopped;
}

public String getName() {
return name;
}
}

第三步:设计并制作任务。任务是可以可执行的对象,因此我们直接继承Runnable接口就行。其实,直接使用Runnable接口也是可以的,只不过为了让示例更加清楚,我们给Task加了任务描述的方法。

1
2
3
4
5
6
java复制代码/**
* 任务
*/
public interface Task extends Runnable {
String getTaskDesc();
}

第四步:设计线程池的状态。线程池作为一个运行框架,它必然会有一系列的状态,比如运行中、停止、关闭等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public enum ThreadPoolStatus {
RUNNING(),
SHUTDOWN(),
STOP(),
TIDYING(),
TERMINATED();

ThreadPoolStatus() {
}

public boolean isRunning() {
return ThreadPoolStatus.RUNNING.equals(this);
}
}

以上四个步骤完成后,一个简易的线程池就已经制作完毕。你看,如果你从以上几点入手来理解线程池的源码的话,是不是要简单多了?Java中的线程池的核心组成也是如此,只不过在细节处理等方面更多全面且丰富。

2. 运行线程池

现在,我们的王者线程池已经制作好。接下来,我们通过一个场景来运行它,看看它的效果如何。

试验场景:峡谷森林中,铠、兰陵王和典韦等负责打野,而安其拉、貂蝉和大乔等美女负责对狩猎到的野怪进行烧烤,一场欢快的峡谷烧烤节正在进行中。

在这个场景中,铠和兰陵王他们负责提交任务,而貂蝉和大乔她们则负责处理任务。

在下面的实现代码中,我们通过上述设计的TheKingThreadPool来定义个线程池,wildMonsters中的野怪表示待提交的任务,并安排3个工作线程来执行任务。在示例代码的末尾,当所有任务执行结束后,关闭线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码 public static void main(String[] args) {
TheKingThreadPool theKingThreadPool = new TheKingThreadPool(3, new ArrayBlockingQueue<>(10));

String[] wildMonsters = {"棕熊", "野鸡", "灰狼", "野兔", "狐狸", "小鹿", "小花豹", "野猪"};
for (String wildMonsterName : wildMonsters) {
theKingThreadPool.execute(new Task() {
public String getTaskDesc() {
return wildMonsterName;
}

public void run() {
System.out.println(Thread.currentThread().getName() + ":" + wildMonsterName + "已经烤好");
}
});
}

theKingThreadPool.waitUntilAllTasksFinished();
theKingThreadPool.stop();
}

王者线程池运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码Worker0:获取到新的任务->灰狼
Worker1:获取到新的任务->野鸡
Worker1:野鸡已经烤好
Worker2:获取到新的任务->棕熊
Worker2:棕熊已经烤好
Worker1:获取到新的任务->野兔
Worker1:野兔已经烤好
Worker0:灰狼已经烤好
Worker1:获取到新的任务->小鹿
Worker1:小鹿已经烤好
Worker2:获取到新的任务->狐狸
Worker2:狐狸已经烤好
Worker1:获取到新的任务->野猪
Worker1:野猪已经烤好
Worker0:获取到新的任务->小花豹
Worker0:小花豹已经烤好
Worker0:已结束工作,执行任务数量:2
Worker2:已结束工作,执行任务数量:2
Worker1:已结束工作,执行任务数量:4

Process finished with exit code 0

从结果中可以看到,效果完全符合预期。所有的任务都已经提交完毕,并且都被正确执行。此外,通过线程池的任务统计,可以看到任务并不是均匀分配,Worker1执行了4个任务,而Worker0和Worker2均只执行了2个任务,这也是线程池中的正常现象。

三、透彻理解Java中的线程池

在手工制作线程线程池之后,再来理解Java中的线程池就相对要容易很多。当然,相比于王者线程池,Java中的线程池(ThreadPoolExecutor)的实现要复杂很多。所以,理解时应当遵循一定的结构和脉络,把握住线程池的核心要点,眉毛胡子一把抓、理不清层次会导致你无法有效理解它的设计内涵,进而导致你无法正确掌握它。

总体来说,Java中的线程池的设计核心都是围绕“任务”进行,可以通过一个框架、两大核心、三大过程概括。理解了这三个重要概念,基本上你已经能从相对抽象的层面理解了线程池。

  • 一个框架:即线程池的整体设计存在一个框架,而不是杂乱无章的组成。所以,在学习线程池时,首先要能从立体上感知到这个框架的存在,而不要陷于凌乱的细节中;
  • 两大核心:在线程池的整个框架中,围绕任务执行这件事,存在两大核心:任务的管理和任务的执行,对应的也就是任务队列和用于执行任务的工作线程。任务队列和工作线程是框架得以有效运转的关键部件;
  • 三大过程:前面说过,线程池的整体设计都是围绕任务展开,所以框架内可以分为任务提交、任务管理和任务执行三大过程。

从类比的角度讲,你可以把框架看作是一个生产车间。在这个车间里,有一条流水线,任务队列和工作线程是这条流水线的两大关键组成。而在流水线运作的过程中,就会涉及任务提交、任务管理和任务执行等不同的过程。

下面这幅图,将帮助你立体地感知线程池的整体设计,建议你收藏。在这幅图中,清楚地展示了线程池整个框架的工作流程和核心部件,接下来的文章也将围绕这幅图展开。

1. 线程池框架设计概览

从源码层面看,理解Java中的线程池,要从下面这四兄弟的概念和关系入手,这四个概念务必了然于心。

  • Executor:作为线程池的最顶层接口,Executor的接口在设计上,实现了任务提交与任务执行之间的解耦,这是它存在的意义。在Executor中,只定义了一个方法void execute(Runnable command),用于执行提交的可运行的任务。注意,你看它这个方法的参数干脆就叫command,也就是“命令”,意在表明所提交的不是一个静止的对象,而是可运行的命令。并且,这个命令将在未来的某一时刻执行,具体由哪个线程来执行也是不确定的;
  • ExecutorService:继承了Executor的接口,并在此基础上提供可以管理服务和执行结果(Futrue) 的能力。ExecutorService所提供的submit方法可以返回任务的执行结果,而shutdown方法则可以用于关闭服务。相比起来,Executor只具备单一的执行能力,而ExecutorService则不仅具有执行能力,还提供了简单的服务管理能力;
  • AbstractExecutorService:作为ExecutorService的简单实现,该类通过RunnableFuture和newTaskFor实现了submit、invokeAny和invokeAll等方法;
  • ThreadPoolExecutor:该类是线程池的最终实现类,实现了Executor和ExecutorService中定义的能力,并丰富了AbstractExecutorService中的实现。在ThreadPoolExecutor中,定义了任务管理策略和线程池管理能力,相关能力的实现细节将是我们下文所要讲解的核心所在。

如果你觉得还是不太能直观地感受四兄弟的差异,那么你可以放大查看下面这幅高清图示。看的时候,要格外注意它们各自方法的不同,方法的不同意味着它们的能力不同。

而对于线程池总体的执行过程,下面这幅图也建议你收藏。这幅图虽然简明,但完整展示了从任务提交到任务执行的整个过程。这个执行过程往往也是面试中的高频面试题,务必掌握。

(1)线程池的核心属性

线程池中的一些核心属性选取如下,对于其中个别属性会做特别说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码// 线程池控制相关的主要变量
// 这个变量很神奇,下文后专门陈述,请特别留意
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 待处理的任务队列
private final BlockingQueue < Runnable > workQueue;
// 工作线程集合
private final HashSet < Worker > workers = new HashSet < Worker > ();
// 创建线程所用到的线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 空闲线程的保活时长
private volatile long keepAliveTime;
// 线程池变更的主要控制锁,在工作线程数、变更线程池状态等场景下都会用到
private final ReentrantLock mainLock = new ReentrantLock();

关于ctl字段的特别说明

在ThreadPoolExecutor的多个核心字段中,其他字段可能都比较好理解,但是ctl要单独拎出来做些解释。

顾名思义,ctl这个字段用于对线程池的控制。它的设计比较有趣,用一个字段却表示了两层含义,也就是这个字段实际是两个字段的合体:

  • runState:线程池的运行状态(高3位);
  • workerCount:工作线程数量(低29位)。

这两个字段的值相互独立,互不影响。那为何要用这种设计呢?这是因为,在线程池中这两个字段几乎总是如影相随,如果不用一个字段来表示的话,那么就需要通过锁的机制来控制两个字段的一致性。不得不说,这个字段设计上还是比较巧妙的。

在线程池中,也提供了一些方法可以方便地获取线程池的状态和工作线程数量,它们都是通过对ctl进行位运算得来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码/**
计算当前线程池的状态
*/
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
/**
计算当前工作线程数
*/
private static int workerCountOf(int c) {
return c & CAPACITY;
}
/**
初始化ctl变量
*/
private static int ctlOf(int rs, int wc) {
return rs | wc;
}

关于位运算,这里补充一点说明,如果你对位运算有点迷糊的话可以看看,如果你对它比较熟悉则可以直接跳过。

假设A=15,二进制是1111;B=6,二进制是110.

运算符 名称 描述 示例
& 按位与 如果相对应位都是1,则结果为1,否则为0 (A&B),得到6,即110
~ 按位非 按位取反运算符翻转操作数的每一位,即0变成1,1变成0。 (〜A)得到-16,即11111111111111111111111111110000
按位或 如果相对应位都是 0,则结果为 0,否则为 1

(2)线程池的核心构造器

ThreadPoolExecutor有四个构造器,其中一个是核心构造器。你可以根据需要,按需使用这些构造器。

  • 核心构造器之一:相对较为常用的一个构造器,你可以指定核心线程数、最大线程数、线程保活时间和任务队列类型。
1
2
3
4
5
6
7
8
java复制代码public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
  • 核心构造器之二:相比于第一个构造器,你可以在这个构造器中指定ThreadFactory. 通过ThreadFactory,你可以指定线程名称、分组等个性化信息。
1
2
3
4
5
6
7
8
9
java复制代码  public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
  • 核心构造器之三:这个构造器的要点在于,你可以指定拒绝策略。关于任务队列的拒绝策略,下文有详细介绍。
1
2
3
4
5
6
7
8
9
java复制代码public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
  • 核心构造器之四:这个构造器是ThreadPoolExecutor的核心构造器,提供了较为全面的参数设置,上述的三个构造器都是基于它实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

(3)线程池中的核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码/**
* 提交Runnable类型的任务并执行,但不返回结果
*/
public void execute(Runnable command){...}
/**
* 提交Runnable类型的任务,并返回结果
*/
public Future<?> submit(Runnable task){...}
/**
* 提交Runnable类型的任务,并返回结果,支持指定默认结果
*/
public <T> Future<T> submit(Runnable task, T result){...}
/**
* 提交Callable类型的任务并执行
*/
public <T> Future<T> submit(Callable<T> task) {...}
/**
* 关闭线程池,继续执行队列中未完成的任务,但不会接收新的任务
*/
public void shutdown() {...}
/**
* 立即关闭线程池,同时放弃未执行的任务,并不再接收新的任务
*/
public List<Runnable> shutdownNow(){...}

(4)线程池的状态与生命周期管理

前文说过,线程池恰似一个生产车间,而从生产车间的角度看,生产车间有运行、停产等不同状态,所以线程池也是有一定的状态和使用周期的。

  • Running:运行中,该状态下可以继续向线程池中增加任务,并正常处理队列中的任务;
  • Shutdown:关闭中,该状态下线程池不会立即停止,但不能继续向线程池中增加任务,直到任务执行结束;
  • Stop:停止,该状态下将不再接收新的任务,同时不再处理队列中的任务,并中断工作中的线程;
  • Tidying:相对短暂的中间状态,所有任务都已经结束,并且所有的工作线程都不再存在(workerCount==0),并运行terminated()钩子方法;
  • Terminated:terminated()运行结束。

2. 如何向线程池中提交任务

向线程池提交任务有两种比较常见的方式,一种是需要返回执行结果的,一种则是不需要返回结果的。

(1)不关注任务执行结果:execute

通过execute()提交任务到线程池后,任务将在未来某个时刻执行,执行的任务的线程可能是当前线程池中的线程,也可能是新创建的线程。当然,如果此时线程池应关闭,或者任务队列已满,那么该任务将交由RejectedExecutionHandler处理。

(2)关注任务执行结果:submit

通过submit()提交任务到线程池后,运行机制和execute类似,其核心不同在于,由submit()提交任务时将等待任务执行结束并返回结果。

3. 如何管理提交的任务

(1)任务队列选型策略

  • SynchronousQueue:无缝传递(Direct handoffs)。当新的任务到达时,将直接交由线程处理,而不是放入缓存队列。因此,如果任务达到时却没有可用线程,那么将会创建新的线程。所以,为了避免任务丢失,在使用SynchronousQueue时,将会需要创建无数的线程,在使用时需要谨慎评估。
  • LinkedBlockingQueue:无界队列,新提交的任务都会缓存到该队列中。使用无界队列时,只有corePoolSize中的线程来处理队列中的任务,这时候和maximumPoolSize是没有关系的,它不会创建新的线程。当然,你需要注意的是,如果任务的处理速度远低于任务的产生速度,那么LinkedBlockingQueue的无限增长可能会导致内存容量等问题。
  • ArrayBlockingQueue:有界队列,可能会触发创建新的工作线程,maximumPoolSize参数设置在有界队列中将发挥作用。在使用有界队列时,要特别注意任务队列大小和工作线程数量之间的权衡。如果任务队列大但是线程数量少,那么结果会是系统资源(主要是CPU)占用率较低,但同时系统的吞吐量也会降低。反之,如果缩小任务队列并扩大工作线程数量,那么结果则是系统吞吐量增大,但同时系统资源占用也会增加。所以,使用有界队列时,要考虑到平衡的艺术,并配置相应的拒绝策略。

(2)如何选择合适的拒绝策略

在使用线程池时,拒绝策略是必须要确认的地方,因为它可能会造成任务丢失。

当线程池已经关闭或任务队列已满且无法再创建新的工作线程时,那么新提交的任务将会被拒绝,拒绝时将调用RejectedExecutionHandler中的rejectedExecution(Runnable r, ThreadPoolExecutor executor)来执行具体的拒绝动作。

1
2
3
java复制代码final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

以execute方法为例,当线程池状态异常或无法新增工作线程时,将会执行任务拒绝策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

ThreadPoolExecutor的默认拒绝策略是AbortPolicy,这一点在属性定义中已经确定。在大部分场景中,直接拒绝任务都是不合适的。

1
java复制代码private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  • AbortPolicy:默认策略,直接抛出RejectedExecutionException异常;
  • CallerRunsPolicy:交由当前线程自己来执行。这种策略这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;
  • DiscardPolicy:直接丢弃任务,不会抛出异常;
  • DiscardOldestPolicy:如果此时线程池没有关闭,将从队列的头部取出第一个任务并丢弃,并再次尝试执行。如果执行失败,那么将重复这个过程。

如果上述四种策略均不满足,你也可以通过RejectedExecutionHandler接口定制个性化的拒绝策略。事实上,为了兼顾任务不丢失和系统负载,建议你自己实现拒绝策略。

(3)队列维护

对于任务队列的维护,线程池也提供了一些方法。

  • 获取当前任务队列
1
2
3
java复制代码public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
  • 从队列中移除任务
1
2
3
4
5
java复制代码public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}

4. 如何管理执行任务的工作线程

(1)核心工作线程

核心线程(corePoolSize)是指最小数量的工作线程,此类线程不允许超时回收。当然,如果你设置了allowCoreThreadTimeOut,那么核心线程也是会超时的,这可能会导致核心线程数为零。核心线程的数量可以通过线程池的构造参数指定。

(2)最大工作线程

最大工作线程指的是线程池为了处理现有任务,所能创建的最大工作线程数量。

最大工作线程可以通过构造函数的maximumPoolSize变量设定。当然,如果你所使用的任务队列是无界队列,那么这个参数将形同虚设。

(3)如何创建新的工作线程

在线程池中,新线程的创建是通过ThreadFactory完成。你可以通过线程池的构造函数指定特定的ThreadFactory,如未指定将使用默认的Executors.defaultThreadFactory(),该工厂所创建的线程具有相同的ThreadGroup和优先级(NORM_PRIORITY),并且都不是守护( Non-Daemon)线程。

通过设定ThreadFactory,你可以自定义线程的名字、线程组以及守护状态等。

在Java的线程池ThreadPoolExecutor中,addWorker方法负责新线程的具体创建工作。

1
java复制代码  private boolean addWorker(Runnable firstTask, boolean core) {...}

(4)保活时间

保活时间指的是非核心线程在空闲时所能存活的时间。

如果线程池中的线程数量超过了corePoolSize中的设定,那么空闲线程的空闲时间在超过keepAliveTime中设定的时间后,线程将被回收终止。在线程被回收后,如果需要新的线程时,将继续创建新的线程。

需要注意的是,keepAliveTime仅对非核心线程有效,如果需要设置核心线程的保活时间,需要使用allowCoreThreadTimeOut参数。

(5)钩子方法

  • 设定任务执行前动作:beforeExecute

如果你希望提交的任务在执行前执行特定的动作,比如写入日志或设定ThreadLocal等。那么,你可以通过重写beforeExecute来实现这一目的。

1
java复制代码protected void beforeExecute(Thread t, Runnable r) { }
  • 设定任务执行后动作:afterExecute
    如果你希望提交的任务在执行后执行特定的动作,比如写入日志或捕获异常等。那么,你可以通过重写afterExecute来实现这一目的。
1
java复制代码protected void afterExecute(Runnable r, Throwable t) { }
  • 设定线程池终止动作:terminated
1
java复制代码protected void terminated() { }

(6)线程池的预热

默认情况下,在设置核心线程数之后,也不会立即创建相关线程,而是任务到达后再创建。

如果你需要预先就启动核心线程,那么你可以通过调用prestartCoreThread或prestartAllCoreThreads来提前启动,以达到线程池预热目的,并且可以通过ensurePrestart方法来验证效果。

(7)线程回收机制

当线程池中的工作线程数量大于corePoolSize设置的数量时,并且存在空闲线程,并且这个空闲线程的空闲时长超过了keepAliveTime所设置的时长,那么这样的空闲线程将会被回收,以降低不必要的资源浪费。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
...
} finally {
processWorkerExit(w, completedAbruptly); // 主动回收自己
}
}

(8)线程数调整策略

线程池的工作线程的设置是否合理,关系到系统负载和任务处理速度之间的平衡。这里要明确的是,如何设置核心线程并没有放之四海而皆准的公式。每个业务场景都有着它独特的地方,CPU密集型和IO密集型任务存在较大差异。因此,在使用线程池的时候,要具体问题具体分析,但是你可以运行结果持续调整来优化线程池。

5. 线程池使用示例

我们仍以手工制作线程池部分的场景为例,通过ThreadPoolExecutor实现来展示线程池的使用示例。从代码中看,ThreadPoolExecutor的使用和王者线程池TheKingThreadPool的用法基本一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 20, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue < > (10));

String[] wildMonsters = {"棕熊", "野鸡", "灰狼", "野兔", "狐狸", "小鹿", "小花豹", "野猪"};
for (String wildMonsterName: wildMonsters) {
threadPoolExecutor.execute(new RunnableTask() {
public String getTaskDesc() {
return wildMonsterName;
}

public void run() {
System.out.println(Thread.currentThread().getName() + ":" + wildMonsterName + "已经烤好");
}
});
}

threadPoolExecutor.shutdown();
}

6. Executors类

Executors是JUC中一个针对ThreadPoolExecutor和ThreadFactory等设计的一个工具类。通过Executors,可以方便地创建不同类型的线程池。当然,其内部主要是通过给ThreadPoolExecutor的构造传递特定的参数实现,并无玄机可言。常用的几个工具如下所示:

  • 创建固定线程数的线程池
1
2
3
4
5
java复制代码public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 创建只有1个线程的线程池
1
2
3
4
5
6
7
java复制代码public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
  • 创建缓存线程池:这种线程池不设定核心线程数,根据任务的数据动态创建线程。当任务执行结束后,线程会被逐步回收,也就是所有的线程都是临时的。
1
2
3
4
5
java复制代码public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

7. 线程池监控

作为一个运行框架,ThreadPoolExecutor既简单也复杂。因此,对其内部的监控和管理是十分必要的。ThreadPoolExecutor也提供了一些方法,通过这些方法,我们可以获取到线程池的一些重要状态和数据。

  • 获取线程池大小
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码 public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0 :
workers.size();
} finally {
mainLock.unlock();
}
}
  • 获取活跃工作线程数量
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码 public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w: workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
  • 获取最大线程池
1
2
3
4
5
6
7
8
9
java复制代码 public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
  • 获取线程池中的任务总数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码 public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w: workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
  • 获取线程池中已完成的任务总数
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w: workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}

四、如何养成正确使用线程池的良好习惯

1. 线程池的使用风险提示

虽然线程池的使用有诸多的好处,然而天下没有免费的午餐,线程池在给我们带来便利的同时,也有一些避免踩坑的注意事项:

  • 线程池设置过大或过小都不合适。如果线程池的线程数量过多,虽然局部处理速度增加,但将会影响应用系统的整体性能。而如果线程池的线程数量过少,线程池可能无法带来预期的性能的提升;
  • 和其他多线程类似,线程池中也可能会发生死锁。比如,某个任务等待另外一个任务结束,但却没有线程来执行等待的那个任务,这也是为什么要避免任务间存在依赖;
  • 添加任务到队列时耗时过长。如果任务队列已满,外部线程向队列添加任务将会受阻。所以,为了避免外部线程阻塞时间过长,你可以设定最大等待时间;

为了降低这些风险的发生,你在设置线程池的类型和参数时,应当格外小心。在正式上线前,最好能做一次压力测试。

2. 创建线程池的推荐姿势

虽然通过Executors创建线程比较方便,但是Executors的封装屏蔽了一些重要的参数细节,而这些参数对于线程池至关重要,所以为了避免因对Executors不了解而错误地使用线程池,建议还是通过ThreadPoolExecutor的构造参数直接创建。

3. 尽量避免使用无界队列

如果再认真点说的话,你应该在任何时候都避免使用无界队列来管理任务。注意,Executors的newFixedThreadPool所使用的是LinkedBlockingQueue,上文有它的源码。

小结

以上就是关于Java线程池的全部内容。在这篇文章中,我们讲解了线程池的应用场景、核心组成及原理,并手工制作了一个线程池,而且在此基础上深入讲解了Java中的线程池ThreadPoolExecutor的实现。虽然文章整体篇幅较大,但是由于线程池涉及的内容十分广泛,难以在一篇文章中全部提及,仍有部分重要内容未能覆盖,比如如何处理线程池中的异常、如何优雅关闭线程池等。

熟练掌握线程池并不是一件容易的事,建议按照本文开篇的建议,先理解其要解决的问题,再理解其核心组成原理,最后再深入到Java中的源码中。如此一来,带着已知的概念去看源码,会更容易理解源码的设计之道。

正文到此结束,恭喜你又上了一颗星✨

夫子的试炼

  • 思考:如何确保线程池不丢失任务。

延伸阅读与参考资料

  • 《王者并发课》大纲与更新进度总览
  • stackoverflow.com/questions/5…
  • tutorials.jenkov.com/java-concur…

关于作者

从业近十年,先后从事敏捷与DevOps咨询、Tech Leader和管理等工作,对分布式高并发架构有丰富的实战经验。热衷于技术分享和特定领域书籍翻译,掘金小册《高并发秒杀的设计精要与实现》作者。


关注公众号【MetaThoughts】,及时获取文章更新和文稿。

如果本文对你有帮助,欢迎点赞、关注、监督,我们一起从青铜到王者。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

这份Java面试八股文让329人成功进入大厂,堪称2021最

发表于 2021-07-25

前言

2021秋招即将来临,很多同学会问Java面试八股文有必要背吗?

我的回答是:很有必要。你可以讨厌这种模式,但你一定要去背,因为不背你就进不了大厂。

国内的互联网面试,恐怕是现存的、最接近科举考试的制度。

而且,我国的八股文确实是独树一帜。以美国为例,北美工程师面试比较重视算法(Coding),近几年也会加入Design轮(系统设计和面向对象设计OOD)和BQ轮(Behavioral question,行为面试问题)。

那么为什么国内面试不采取这样的考察方式呢?简单来说,互联网IT行业的求职者太多了,如果考察的是清一溜的算法题和设计题,那么会要求面试官有极高的技术水平,还要花大量的时间成本和精力。

也许现行的八股文面试不是最优的解法,但的确是最符合当前国内IT环境的做法。

所以,我采访了超过20位资深大厂面试官后,一直在尽量精炼准确的整理一套切实可行的八股文,现在已经有329位粉丝通过这套题走入了理想的岗位,所以分享出来给大伙看看,有什么不足之处欢迎评论补充。

篇幅所限本文就只贴一下题目了,同学们可以自己先看看有哪些题是会的,答案的话我整理的一本《Java面试八股文》PDF里都有,如果需要可以直接点击获取。

一、Java基础 44 道

  1. 解释下什么是面向对象?面向对象和面向过程的区别?
  1. 面向对象的三大特性?分别解释下?
  1. JDK、JRE、JVM 三者之间的关系?
  1. 重载和重写的区别?
  1. Java 中是否可以重写一个 private 或者 static 方法?
  1. 构造方法有哪些特性?
  1. 在 Java 中定义一个不做事且没有参数的构造方法有什么作用?
  1. Java 中创建对象的几种方式?
  1. 抽象类和接口有什么区别?
  1. 静态变量和实例变量的区别?
  1. 12、short s1 = 1;s1 = s1 + 1;有什么错?那么 short s1 = 1; s1 += 1;呢?有没有错误?
  1. Integer 和 int 的区别?
  1. 装箱和拆箱的区别
  1. switch 语句能否作用在 byte 上,能否作用在 long 上,能否作用在 String 上?
  1. 16、final、finally、finalize 的区别
  1. == 和 equals 的区别?
  1. 两个对象的 hashCode() 相同,则 equals() 也一定为 true 吗?
  1. 为什么重写 equals() 就一定要重写 hashCode() 方法?
  1. & 和 && 的区别?
  1. Java 中的参数传递时传值呢?还是传引用?
  1. Java 中的 Math.round(-1.5) 等于多少?
  1. 如何实现对象的克隆?
  1. 深克隆和浅克隆的区别?
  1. 什么是 Java 的序列化,如何实现 Java 的序列化?
  1. 什么情况下需要序列化?
  1. Java 的泛型是如何工作的 ? 什么是类型擦除 ?
  1. 什么是泛型中的限定通配符和非限定通配符 ?
  1. List 和 List 之间有什么区别 ?
  1. Java 中的反射是什么意思?有哪些应用场景?
  1. 反射的优缺点?
  1. Java 中的动态代理是什么?有哪些应用?
  1. 怎么实现动态代理?
  1. static 关键字的作用?
  1. super 关键字的作用?
  1. 字节和字符的区别?
  1. String 为什么要设计为不可变类?
  1. String、StringBuilder、StringBuffer 的区别?
  1. String 字符串修改实现的原理?
  1. String str = “i” 与 String str = new String(“i”) 一样吗?
  1. String 类的常用方法都有那些?
  1. final 修饰 StringBuffer 后还可以 append 吗?
  1. Java 中的 IO 流的分类?说出几个你熟悉的实现类?
  1. 字节流和字符流有什么区别?
  1. BIO、NIO、AIO 有什么区别?

二、Java异常 9 道

  1. finally 块中的代码什么时候被执行?
  1. finally 是不是一定会被执行到?
  1. try-catch-finally 中,如果 catch 中 return 了,finally 还会执行吗?
  1. try-catch-finally 中那个部分可以省略?
  1. Error 和 Exception 的区别?
  1. 运行时异常与受检异常有何异同?
  1. throw 和 throws 的区别?
  1. 常见的异常类有哪些?
  1. 主线程可以捕获到子线程的异常吗?

三、Java集合 24 道

  1. Java 中常用的容器有哪些?
  1. ArrayList 和 LinkedList 的区别?
  1. ArrayList 实现 RandomAccess 接口有何作用?为何 LinkedList 却没实现这个接口?
  1. ArrayList 的扩容机制?
  1. Array 和 ArrayList 有何区别?什么时候更适合用 Array?
  1. HashMap 的实现原理/底层数据结构?JDK1.7 和 JDK1.8
  1. HashMap 的 put 方法的执行过程?
  1. HashMap 的 get 方法的执行过程?
  1. HashMap 的 resize 方法的执行过程?
  1. HashMap 的 size 为什么必须是 2 的整数次方?
  1. HashMap 多线程死循环问题?
  1. HashMap 的 get 方法能否判断某个元素是否在 map 中?
  1. HashMap 与 HashTable 的区别是什么?
  1. HashMap 与 ConcurrentHashMap 的区别是什么?
  1. HashTable 和 ConcurrentHashMap 的区别?
  1. ConcurrentHashMap 的实现原理是什么?
  1. HashSet 的实现原理?
  1. HashSet 怎么保证元素不重复的?
  1. LinkedHashMap 的实现原理?
  1. Iterator 怎么使用?有什么特点?
  1. Iterator 和 ListIterator 有什么区别?
  1. Iterator 和 Enumeration 接口的区别?
  1. fail-fast 与 fail-safe 有什么区别?
  1. Collection 和 Collections 有什么区别?

四、Java并发 42 道

  1. 并行和并发有什么区别?
  1. 线程和进程的区别?
  1. 守护线程是什么?
  1. 创建线程的几种方式?
  1. Runnable 和 Callable 有什么区别?
  1. 线程状态及转换?
  1. sleep() 和 wait() 的区别?
  1. 线程的 run() 和 start() 有什么区别?
  1. 在 Java 程序中怎么保证多线程的运行安全?
  1. Java 线程同步的几种方法?
  1. Thread.interrupt() 方法的工作原理是什么?
  1. 谈谈对 ThreadLocal 的理解?
  1. 在哪些场景下会使用到 ThreadLocal?
  1. 说一说自己对于 synchronized 关键字的了解?
  1. 如何在项目中使用 synchronized 的?
  1. 说说 JDK1.6 之后的 synchronized 关键字底层做了哪些优化,可以详细介绍一下这些优化吗?
  1. 谈谈 synchronized 和 ReenTrantLock 的区别?
  1. synchronized 和 volatile 的区别是什么?
  1. 谈一下你对 volatile 关键字的理解?
  1. 说下对 ReentrantReadWriteLock 的理解?
  1. 说下对悲观锁和乐观锁的理解?
  1. 乐观锁常见的两种实现方式是什么?
  1. 乐观锁的缺点有哪些?
  1. CAS 和 synchronized 的使用场景?
  1. 简单说下对 Java 中的原子类的理解?
  1. atomic 的原理是什么?
  1. 说下对同步器 AQS 的理解?
  1. AQS 的原理是什么?
  1. AQS 对资源的共享模式有哪些?
  1. AQS 底层使用了模板方法模式,你能说出几个需要重写的方法吗?
  1. 说下对信号量 Semaphore 的理解?
  1. CountDownLatch 和 CyclicBarrier 有什么区别?
  1. 说下对线程池的理解?为什么要使用线程池?
  1. 创建线程池的参数有哪些?
  1. 如何创建线程池?
  1. 线程池中的的线程数一般怎么设置?需要考虑哪些问题?
  1. 执行 execute() 方法和 submit() 方法的区别是什么呢?
  1. 说下对 Fork和Join 并行计算框架的理解?
  1. JDK 中提供了哪些并发容器?
  1. 谈谈对 CopyOnWriteArrayList 的理解?
  1. 谈谈对 BlockingQueue 的理解?分别有哪些实现类?
  1. 谈谈对 ConcurrentSkipListMap 的理解?

五、Java JVM 42 道

  1. 说一下 Jvm 的主要组成部分?及其作用?
  1. 谈谈对运行时数据区的理解?
  1. 堆和栈的区别是什么?
  1. 堆中存什么?栈中存什么?
  1. 为什么要把堆和栈区分出来呢?栈中不是也可以存储数据吗?
  1. Java 中的参数传递时传值呢?还是传引用?
  1. Java 对象的大小是怎么计算的?
  1. 对象的访问定位的两种方式?
  1. 判断垃圾可以回收的方法有哪些?
  1. 垃圾回收是从哪里开始的呢?
  1. 被标记为垃圾的对象一定会被回收吗?
  1. 谈谈对 Java 中引用的了解?
  1. 谈谈对内存泄漏的理解?
  1. 内存泄露的根本原因是什么?
  1. 举几个可能发生内存泄漏的情况?
  1. 尽量避免内存泄漏的方法?
  1. 常用的垃圾收集算法有哪些?
  1. 为什么要采用分代收集算法?
  1. 分代收集下的年轻代和老年代应该采用什么样的垃圾回收算法?
  1. 什么是浮动垃圾?
  1. 什么是内存碎片?如何解决?
  1. 常用的垃圾收集器有哪些?
  1. 谈谈你对 CMS 垃圾收集器的理解?
  1. 谈谈你对 G1 收集器的理解?
  1. 说下你对垃圾回收策略的理解/垃圾回收时机?
  1. 谈谈你对内存分配的理解?大对象怎么分配?空间分配担保?
  1. 说下你用过的 JVM 监控工具?
  1. 如何利用监控工具调优?
  1. JVM 的一些参数?
  1. 谈谈你对类文件结构的理解?有哪些部分组成?
  1. 谈谈你对类加载机制的了解?
  1. 类加载各阶段的作用分别是什么?
  1. 有哪些类加载器?分别有什么作用?
  1. 类与类加载器的关系?
  1. 谈谈你对双亲委派模型的理解?工作过程?为什么要使用
  1. 怎么实现一个自定义的类加载器?需要注意什么?
  1. 怎么打破双亲委派模型?
  1. 有哪些实际场景是需要打破双亲委派模型的?
  1. 谈谈你对编译期优化和运行期优化的理解?
  1. 为何 HotSpot 虚拟机要使用解释器与编译器并存的架构?
  1. 说下你对 Java 内存模型的理解?
  1. 内存间的交互操作有哪些?需要满足什么规则?

六、SSM框架 37 道

  1. 使用 Spring 框架的好处是什么?
  1. 解释下什么是 AOP?
  1. AOP 的代理有哪几种方式?
  1. 怎么实现 JDK 动态代理?
  1. AOP 的基本概念:切面、连接点、切入点等?
  1. 通知类型(Advice)型(Advice)有哪些?
  1. 谈谈你对 IOC 的理解?
  1. Bean 的生命周期?
  1. Bean 的作用域?
  1. Spring 中的单例 Bean 的线程安全问题了解吗?
  1. 谈谈你对 Spring 中的事物的理解?
  1. Spring 中的事务隔离级别?
  1. Spring 中的事物传播行为?
  1. Spring 常用的注入方式有哪些?
  1. Spring 框架中用到了哪些设计模式?
  1. ApplicationContext 通常的实现有哪些?
  1. 谈谈你对 MVC 模式的理解?
  1. SpringMVC 的工作原理/执行流程?
  1. SpringMVC 的核心组件有哪些?
  1. SpringMVC 常用的注解有哪些?
  1. @RequestMapping 的作用是什么?
  1. 如何解决 POST 请求中文乱码问题,GET 的又如何处理呢?
  1. SpringMVC 的控制器是不是单例模式,如果是会有什么问题,怎么解决?
  1. SpringMVC 怎么样设定重定向和转发的?
  1. SpringMVC 里面拦截器是怎么写的?
  1. SpringMVC 和 Struts2 的区别有哪些?
  1. 谈谈你对 MyBatis 的理解?
  1. MyBaits 的优缺点有哪些?
  1. MyBatis 与 Hibernate 有哪些不同?
  1. MyBatis 中 #{} 和 ${}的区别是什么?
  1. MyBatis 是如何进行分页的?分页插件的原理是什么?
  1. MyBatis 有几种分页方式?
  1. MyBatis 逻辑分页和物理分页的区别是什么?
  1. MyBatis 是否支持延迟加载?如果支持,它的实现原理是什么?
  1. 说一下 MyBatis 的一级缓存和二级缓存?
  1. Mybatis 有哪些执行器(Executor)?
  1. MyBatis 动态 SQL 是做什么的?都有哪些动态 SQL?能简述一下动态 SQL的执行原理不?

答案在我整理的一本《Java面试八股文》PDF里都有,如果需要可以直接点击获取。

七、MySQL 31 道

  1. 请说下你对 MySQL 架构的了解?
  1. 一条 SQL 语句在数据库框架中的执行流程?
  1. 数据库的三范式是什么?
  1. char 和 varchar 的区别?
  1. varchar(10) 和 varchar(20) 的区别?
  1. 谈谈你对索引的理解?
  1. 索引的底层使用的是什么数据结构?
  1. 谈谈你对 B+ 树的理解?
  1. 为什么 InnoDB 存储引擎选用 B+ 树而不是 B 树呢?
  1. 谈谈你对聚簇索引的理解?
  1. 谈谈你对哈希索引的理解?
  1. 谈谈你对覆盖索引的认识?
  1. 索引的分类?
  1. 谈谈你对最左前缀原则的理解?
  1. 怎么知道创建的索引有没有被使用到?或者说怎么才可以知道这条语句运行很慢的原因?
  1. 什么情况下索引会失效?即查询不走索引?
  1. 查询性能的优化方法?
  1. InnoDB 和 MyISAM 的比较?
  1. 谈谈你对水平切分和垂直切分的理解?
  1. 主从复制中涉及到哪三个线程?
  1. 主从同步的延迟原因及解决办法?
  1. 谈谈你对数据库读写分离的理解?
  1. 请你描述下事务的特性?
  1. 谈谈你对事务隔离级别的理解?
  1. 解释下什么叫脏读、不可重复读和幻读?
  1. MySQL 默认的隔离级别是什么?
  1. 谈谈你对MVCC 的了解?
  1. 说一下 MySQL 的行锁和表锁?
  1. InnoDB 存储引擎的锁的算法有哪些?
  1. MySQL 问题排查都有哪些手段?
  1. MySQL 数据库 CPU 飙升到 500% 的话他怎么处理?

八、Redis 12 道

  1. 谈下你对 Redis 的了解?
  1. Redis 一般都有哪些使用场景?
  1. Redis 有哪些常见的功能?
  1. Redis 支持的数据类型有哪些?
  1. Redis 为什么这么快?
  1. 什么是缓存穿透?怎么解决?
  1. 什么是缓存雪崩?该如何解决?
  1. 怎么保证缓存和数据库数据的一致性?
  1. Redis 持久化有几种方式?
  1. Redis 怎么实现分布式锁?
  1. Redis 淘汰策略有哪些?
  1. Redis 常见性能问题和解决方案?

九、计算机网络 45 道

  1. 为什么需要三次握手?两次不行?
  1. 为什么需要四次挥手?三次不行?
  1. TCP与UDP有哪些区别?各自应用场景?
  1. HTTP1.0,1.1,2.0 的版本区别
  1. POST和GET有哪些区别?各自应用场景?
  1. HTTP 哪些常用的状态码及使用场景?
  1. HTTP状态码301和302的区别,都有哪些用途?
  1. 在交互过程中如果数据传送完了,还不想断开连接怎么办,怎么维持?
  1. HTTP 如何实现长连接?在什么时候会超时?
  1. TCP 如何保证有效传输及拥塞控制原理
  1. IP地址有哪些分类?
  1. GET请求中URL编码的意义
  1. 什么是SQL 注入?举个例子?
  1. 谈一谈 XSS 攻击,举个例子?
  1. 讲一下网络五层模型,每一层的职责?
  1. 简单说下 HTTPS 和 HTTP 的区别
  1. 对称加密与非对称加密的区别
  1. 简单说下每一层对应的网络协议有哪些?
  1. ARP 协议的工作原理?
  1. TCP 的主要特点是什么?
  1. UDP 的主要特点是什么?
  1. TCP 和 UDP 分别对应的常见应用层协议有哪些?
  1. 为什么 TIME-WAIT 状态必须等待 2MSL 的时间呢?
  1. 保活计时器的作用?
  1. TCP 协议是如何保证可靠传输的?
  1. 谈谈你对停止等待协议的理解?
  1. 谈谈你对 ARQ 协议的理解?
  1. 谈谈你对滑动窗口的了解?
  1. 谈下你对流量控制的理解?
  1. 谈下你对 TCP 拥塞控制的理解?使用了哪些算法?
  1. 什么是粘包?
  1. TCP 黏包是怎么产生的?
  1. 怎么解决拆包和粘包?
  1. forward 和 redirect 的区别?
  1. HTTP 方法有哪些?
  1. 在浏览器中输入 URL 地址到显示主页的过程?
  1. DNS 的解析过程?
  1. 谈谈你对域名缓存的了解?
  1. 谈下你对 HTTP 长连接和短连接的理解?分别应用于哪些场景?
  1. HTTPS 的工作过程?
  1. HTTP 和 HTTPS 的区别?
  1. HTTPS 的优缺点?
  1. 什么是数字签名?
  1. 什么是数字证书?
  1. Cookie 和 Session 有什么区别?

十、操作系统 32 道

  1. 简单说下你对并发和并行的理解?
  1. 同步、异步、阻塞、非阻塞的概念
  1. 进程和线程的基本概念
  1. 进程与线程的区别?
  1. 为什么有了进程,还要有线程呢?
  1. 进程的状态转换
  1. 进程间的通信方式有哪些?
  1. 进程的调度算法有哪些?
  1. 什么是死锁?
  1. 产生死锁的原因?
  1. 死锁产生的必要条件?
  1. 解决死锁的基本方法?
  1. 怎么预防死锁?
  1. 怎么避免死锁?
  1. 怎么解除死锁?
  1. 什么是缓冲区溢出?有什么危害?
  1. 分页与分段的区别?
  1. 物理地址、逻辑地址、虚拟内存的概念
  1. 页面置换算法有哪些?
  1. 谈谈你对动态链接库和静态链接库的理解?
  1. 外中断和异常有什么区别?
  1. 一个程序从开始运行到结束的完整过程,你能说出来多少?
  1. 什么是用户态和内核态
  1. 用户态和内核态是如何切换的?
  1. 进程终止的方式
  1. 守护进程、僵尸进程和孤儿进程
  1. 如何避免僵尸进程?
  1. 介绍一下几种典型的锁?
  1. 常见内存分配内存错误
  1. 内存交换中,被换出的进程保存在哪里?
  1. 原子操作的是如何实现的
  1. 抖动你知道是什么吗?它也叫颠簸现象

十一、消息队列与分布式 26 道

  1. 消息队列的基本作用?
  1. 消息队列的优缺点有哪些?
  1. 如何保证消息队列的高可用?
  1. 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
  1. 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?
  1. 如何保证消息的顺序性?
  1. 大量消息在 MQ 里长时间积压,该如何解决?
  1. MQ 中的消息过期失效了怎么办?
  1. RabbitMQ 有哪些重要的角色?
  1. RabbitMQ 有哪些重要的组件?
  1. RabbitMQ 有几种广播类型?
  1. Kafka 可以脱离 zookeeper 单独使用吗?为什么?
  1. Kafka 有几种数据保留的策略?
  1. Kafka 的分区策略有哪些?
  1. 谈下你对 Zookeeper 的认识?
  1. Zookeeper 都有哪些功能?
  1. 谈下你对 ZAB 协议的了解?
  1. Zookeeper 怎么保证主从节点的状态同步?
  1. Zookeeper 有几种部署模式?
  1. 说一下 Zookeeper 的通知机制?
  1. 集群中为什么要有主节点?
  1. 集群中有 3 台服务器,其中一个节点宕机,这个时候 Zookeeper 还可以使用吗?
  1. 说一下两阶段提交和三阶段提交的过程?分别有什么问题?
  1. Zookeeper 宕机如何处理?
  1. 说下四种类型的数据节点 Znode?
  1. Zookeeper 和 Dubbo 的关系?

好了,本文就写到这了,上述所有题目的答案我都整理成PDF了

答案的话我整理的一本《Java面试八股文》PDF里都有,如果需要可以直接点击获取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python语言的应用领域

发表于 2021-07-25

1.Windows 系统编程

 Python 是跨平台的程序设计语言,在Windows 系统下,通过使 用pywin32 模块提供的Windows API函数接口,就可以编写与 Windows 系统底层功能相关的Python 程序,包括访问注册表、 调用ActiveX控件以及各种COM 组件等工作。

 还有许多其他的日常系统维护和管理工作也可以交给Python 来 实现。所以想学的同学,有必要听一下这位老师的课、领取python福利奥,想学的同学可以到梦雅老师的围鑫(同音):前排的是:762,中间一排是:459,后排的一组是:510 ,把以上三组字母按照顺序组合起来即可,她会安排学习的。

 利用py2exe 模块可以将Python 程序转换为.exe 可执行程序,使得Python 程序可以脱离Python 系统环境来运行。

2.科学计算与数据可视化

 科学计算也称数值计算,是研究工程问题的近似 求解方法,并在计算机上进行程序实现的一门科 学,既有数学理论上的抽象性和严谨性,又有程 序设计技术上的实用性和实验性的特征。

 随着科学计算与数据可视化Python 模块的不断产 生,使得Python 语言可以在科学计算与数据可视 化领域发挥独特的作用。

 Python 中用于科学计算与数据可视化的模块有很 多,例如NumPy、SciPy、SymPy、Matplotlib、 Traits、TraitsUI、Chaco、TVTK、Mayavi、 VPython、OpenCV 等,涉及的应用领域包括数值 计算、符号计算、二维图表、三维数据可视化、 三维动画演示、图像处理以及界面设计等。

 NumPy 模块提供了一个在Python 中做科学计算的基础库,主要 用于矩阵处理与运算;SciPy 模块是在NumPy 模块的基础上开 发的,提供了一个在Python 中做科学计算的工具集。  例如,统计工具(statistics)、最优化工具(optimization)、 数值积分工具(numerical integration)、线性代数工具(linear algebra)、傅里叶变换工具(Fourier transforms)、信号处理 工具(signal processing)、图像处理工具(image processing)、 常微分方程求解工具(ODE solvers)等;Matplotlib 是比较常用的绘图模块,可以快速地将计算结果以不同类型的图形展示出来。所以想学的同学,有必要听一下这位老师的课、领取python福利奥,想学的同学可以到梦雅老师的围鑫(同音):前排的是:762,中间一排是:459,后排的一组是:510 ,把以上三组字母按照顺序组合起来即可,她会安排学习的。

3.数据库应用

 在数据库应用方面,Python 语言提供了对所有主流关 系数据库管理系统的接口,包括SQLite、Access、 MySQL、SQL Server、Oracle 等。

 Python 数据库模块有很多,例如,可以通过内置的 sqlite3模块访问SQLite 数据库,使用pywin32 模块访 问Access 数据库,使用pymysql 模块访问MySQL 数据 库,使用pywin32 和pymssql 模块来访问SQL Sever 数据库。

4.多媒体应用

 Python 多媒体应用开发可以为图形、图像、声音、视频等多媒体数 据处理提供强有力的支持。

 PyMedia 模块是一个用于多媒体操作的Python 模块,可以对WAV、 MP3、AVI 等多媒体格式文件进行编码、解码和播放;PyOpenGL 模块封装了OpenGL 应用程序编程接口,通过该模块可在Python程序中集 成二维或三维图形;PIL(Python Imaging Library,Python 图形库)为 Python 提供了强大的图像处理功能,并提供广泛的图像文件格式支 持。

 该模块能进行图像格式的转换、打印和显示,还能进行一些图像效果 的处理,如图形的放大、缩小和旋转等,是Python 进行图像处理的重要工具.所以想学的同学,有必要听一下这位老师的课、领取python福利奥,想学的同学可以到梦雅老师的围鑫(同音):前排的是:762,中间一排是:459,后排的一组是:510 ,把以上三组字母按照顺序组合起来即可,她会安排学习的。

5.网络应用

 Python 语言为众多的网络应用提供了解决方案,利用有关模块可方 便地定制出所需要的网络服务。

 Python 语言提供了socket 模块,对Socket 接口进行了二次封装,支持 Socket 接口的访问,简化了程序的开发步骤,提高了开发效率;

 Python 语言还提供了urllib、cookielib、httplib、scrapy 等大量模块, 用于对网页内容进行读取和处理,并结合多线程编程以及其他有关模 块可以快速开发网页爬虫之类的应用程序;  可以使用Python 语言编写CGI 程序,也可以把Python 程序嵌入到网页 中运行;

 Python 语言还支持Web 网站开发,比较流行的开发框架有web2py、 django 等。所以想学的同学,有必要听一下这位老师的课、领取python福利奥,想学的同学可以到梦雅老师的围鑫(同音):前排的是:762,中间一排是:459,后排的一组是:510 ,把以上三组字母按照顺序组合起来即可,她会安排学习的。

6.电子游戏应用

 Python 在很早的时候就是一种电子游戏编程工具。

 目前,在电子游戏开发领域也得到越来越广泛的应用。

 Pygame 就是用来开发电子游戏软件的Python 模块, 在SDL 库的基础上开发,可以支持多个操作系统。

 使用Pygame 模块,可以在Python 程序中创建功能丰富的游戏和多媒体程序。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…593594595…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%