Netty源码之内存管理(二)(4144) Netty源

Netty源码之内存管理(二)(4.1.44)

前面做了很多铺垫(Netty源码之内存管理(一)),带着大家熟悉了与内存分配相关的类的定义和分配逻辑。但并没有真正落实到 jemalloc 思想在源码是如何体现的。本章就是对 PoolChunk 逐字解析,死扣细节。在分析源码之前我们需要对分配的内存级别有一个清晰的定位,当分配 Huge 级别对象,直接使用 PoolChunk 包装,并没有复杂的分配逻辑。而对于 tiny&small&normal 级别来说,进行精细化的内存管理十分有必要的。
开局一张图:
满二叉树结构示意图.png
PoolChunk 本质就是维护这一棵满二叉树,这棵树默认管理 16MB 内存(这个值是可以手动设置)。memoryMap[] 是可变的数组,Netty 在这个数组上逻辑构建一棵满二叉树(当然也可以用链表之类的数据结构,但是随机索引效率不高,我们可以根据数组索引快速定位到某一层的第一个节点。但链表是不能做到的),depthMap[] 表示每个节点对应的深度,这是不可变的。一个 Long 型被分成高、低两部分,高 32 位记录小内存分配信息,低 32 位记录节点下标值。当分配 normal 级别内存时,只有低 32 位信息有用,它的值表示节点序号(起始值为 1),当分配 tiny&normal 级别内存时,高、低两部分确定某个 page 下的某个 subpage
还有一个比较有意思的是任意节点所管理的内存大小都是 2 的次幂,因此 Netty 会对用户申请的内存大小进行规格化的原因就在这里。任意规格值(当然不能超过 PoolChunkSize)都能找到合适的节点,除非没有节点可满足当前内存申请,那只能新创建一个 PoolChunk。还有另外一个疑问就是如何更新 memoryMap[] 数组呢,请看大屏幕:
内存分配流程图.png
index 表示节点索引,value 表示对应 memoryMap[index] 。
用户第一次申请 4MB 大小内存,由于内存大小确定,因此所在的层的位置也可以通过 maxOrder - (log2(normCapacity) - pageShifts) 确定,4MB 对应层数(也可理解为深度)为 2。
内存分配过程如下,其实对应方法 allocateNode(int) 实现逻辑: 首先判断节点 1 的使用状态: memoryMap1 != unusable(12) 表示节点 1 可用,但由于层数不匹配,所以获取子树节点 2,同时判断使用状态,发现 != 12 且层数不匹配,那就继续获取子节点 4,发现 !=12 且层数匹配,所以节点 4 就用作此次内存分配的节点,并更新 memoryMap[4]=12 表示节点 4 已使用,变量 handle 的低 32 位记录子节点位置信息,同时循环更新父节点的 memoryMap,父节点的值是子节点的 memoryMap 的最小值,所以 memoryMap[2]=2,memoryMap[1] =1。这样,第一次申请 4MB 大小内存就算完成了。
第二次申请 4MB 大小内存,当在第 2 层判断节点 4 的 memoryMap 值等于 12,会判断兄弟节点 5 是否满足分配。大家好好休会。

PoolChunk 内存分配

PoolChunk 是 jemalloc3.x 算法思想的体现,里面以 allocate 开头的 API 就是内存分配算法的实现。入口方法是 allocate(PooledByteBuf, int, int)

allocate(PooledByteBuf, int, int)

这个方法做的事情有:

  • 根据申请内存大小选择合适的分配策略。具体为如果 >=pageSize,使用 allocateRun() 方法分配,否则使用 allocateSubpage() 分配,它们都会返回句柄值 handle。
  • 初始化 ByteBuf

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码// io.netty.buffer.PoolChunk#allocate
/**
*
* @param buf 「ByteBuf」对象,它是物理内存的承托
* @param reqCapacity 用户所需内存大小
* @param normCapacity 规格值
* @return
*/
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {

// 低32位: 节点索引
// 高32位: 位图索引
final long handle;
// 位操作判断大小
if ((normCapacity & subpageOverflowMask) != 0) {
// #1 申请>=8KB
handle = allocateRun(normCapacity);
} else {
// #2 申请<8KB
handle = allocateSubpage(normCapacity);
}

if (handle < 0) {
return false;
}

// #3 如果「PoolChunk」存在缓存的「ByteBuffer」就复用
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;

// #4 初始化ByteBuf内存相关信息
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}

allocateRun(int)

方法 allocateRun(int) 做的事情也不多,主要有:

  • 根据规格值计算所对应的深度 d
  • 调用 allocateNode(d) 完成内存分配
  • 更新剩余空闲值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码// io.netty.buffer.PoolChunk#allocateRun
/**
* 申请大小为「norCapacity」的内存块
*/
private long allocateRun(int normCapacity) {
// #1 计算当前规格值所对应树的深度d
// log2(normCapacity): 获取当前值所对应最高位1的序号,
// pageShifts: 默认值为 13,也就是 pageSize=8192 最高位为1的序号,因为这里分配的是 >=8192,
// 所以需要减去它的偏移量,即从0开始。
// maxOrder: 默认值为 11,maxOrder - 偏移量 = 确切(合适)的树高度
// 可以想象normCapacity 从 8192 不断向上增长,那树的高度也不断变小
int d = maxOrder - (log2(normCapacity) - pageShifts);

// #2 △在深度d的节点中寻找空闲节点并分配内存
int id = allocateNode(d);
if (id < 0) {
return id;
}

// #3 更新剩余空闲值
freeBytes -= runLength(id);

// #4 返回信息
return id;
}

private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1; // 31
/**
* 获取以2为底的对数值
* 思路是数有多少个0
*/
private static int log2(int val) {
// compute the (0-based, with lsb = 0) position of highest set bit i.e, log2
// Integer.numberOfLeadingZeros(int): 返回无符号整型的最高非零位前面的0的个数(包括符号位在内)
// 31-0位数量=非0位数量,比如 0000...1000 Integer.numberOfLeadingZeros(0000...1000) = 28,
// 31-28=3,其实就是获取最高位1的序号(从右至左,起始序号为0)
return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
}

allocateNode(int)

终于到了内存分配的重头戏,它属于节点粒度的分配逻辑。整体思路并不难,前面也通过图解讲述过,但由于采用了太多位运算所以看起来有点头晕。所以我们先熟悉一下部分位运算公式,规定

  • id^=1: id 为奇数则 -1,id 为偶数则 +1。这里用来获取偶数的兄弟节点。比如 id=2,则其兄弟节点为 id^=1 = 3。
  • id<<=1: 相当于 id=id*2,目的是跳转到节点 id 的左子节点。比如 id = 2,它的左子节点值为 4。
  • 1<<d: 表示 1*2 。对在任意深度为d的节点,节点的索引值在 2 到 2-1 范围内。比如深度为 1,则索引值在 [2, 3],当深度为 2,索引值在 [4, 7] 范围内。
  • initial=-(1 << d): 对 2取反,目的是用来判断与目标深度值 d 是否匹配,可以把 initial 可以看成是掩码。当匹配目标深度,有 id & initial == initial,若当前深度<目标深度,有 id & initial ==0。比如目标深度为 2,那 initial=-4,当id=1时,id&initial=0,说明还没有到达目标深度,获取最左子节点(id=id*2)2,此时 2 & initial=0,说明还没有到达目标深度,继续获取最左子节点 4,此时 4&-4=4,此时就找到了目标深度。然后就可以从左到右找寻空闲节点并进行内存分配。

allocateNode(int depth) 目标是在深度 d 中找到空闲的节点并,如果存在按规则更新 memoryMap 相应节点的值并返回节点序号。思路也是比较清晰,从头结点开始判断,如果可分配但深度不匹配则获取左子节点,如果不可分配就返回 -1。继续判断左子节点是否可分配以及深度是否匹配,如果都不匹配,继续重复上面步骤。如果深度匹配但当前节点不可分配(val>d=true),那就获取兄弟节点继续重复上述步骤。如果深度匹配且当前节点可分配,则该节点就是此次申请的目标节点,并将它设置为 unusable 不可用状态,同时,按公式 memoryMap[父节点] = Min(子节点1,子节点2) 循环更新 memoryMap。
还有一个有意思的点需要注意,就是 memoryMap 存储的值,它是这棵树的核心。初始化的值以及后续更新也做得非常巧妙,我说不上来,大家慢慢休会吧。通过内存分配示意图再来休会一下上面的文字描述:
分配示意图_1.png
分配示意图_2.png
相关源码解析如下
PoolChunk_AllocateNode.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码// io.netty.buffer.PoolChunk#allocateNode
/**
* 在深度「d」查找空闲节点
* @param d
* @return 空闲节点索引,如果返回-1表示当前「PoolChunk」无法满足此次内存分配
*/
private int allocateNode(int d) {
int id = 1;
// 用于判断节点是否匹配深度「d」
int initial = - (1 << d);

// #1 判断根节点是否可用
byte val = value(id);
if (val > d) {
// 根节点不可用,返回「-1」
return -1;
}

// #2 从上往下、从左往右遍历节点。直到找到合适的空闲节点并返回索引值。
// id&initial=0: 表明节点id 匹配 深度d,否则不匹配
// val<d: 表明节点id有空闲内存可分配,若val>d表明节点id无空闲内存可分配
// 循环退出条件是 val>d 或 id&initial !=0,当匹配到空闲节点时,通常 val<d 且 id&initial !=0
while (val < d || (id & initial) == 0) {

// id = id * 2;
id <<= 1;
// 获取momeryMap[id]的值
val = value(id);

// 如果val>d,表明节点「id」不满足此次分配
// 这个判断十分巧妙,前面也出现过一次,用来判断根节点是否满足深度为d的内存申请,
// 如果val>d则不满足,因为,父节点的val值是两个子节点的最小值,
// 因此如果父节点的val>d那说明子节点无法满足当前深度d的内存申请
if (val > d) {
// 获取兄弟节点, 继续查找
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
// #3 将选中的节点标记为「unusable」
setValue(id, unusable);

// #4 循环更新父节点「memoryMap」的值,其值取两个子节点最小的那个值
updateParentsAlloc(id);

// #5 Return
return id;
}

private byte value(int id) {
return memoryMap[id];
}

/**
* 从节点id开始一直到根节点,更新对应的memoryMap的值
*/
private void updateParentsAlloc(int id) {
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
id = parentId;
}
}

allocateNode(int depth) 是分配 Normal 级别的核心方法,本质是维护 memoryMap[] 数组,遍历树查找空闲内存满足本次内存申请。

allocateSubpage(int)

这个方法是申请 Tiny&Small 级别内存。上一章节讲过对该内存分配的思想: 简单一句话,将某个空闲的 page 拆分成若干个 subpage,使用对象 PoolSubpage 对这些若干个 subpage 进行管理。
PoolChunk#allocateSubpage.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
java复制代码// io.netty.buffer.PoolChunk#allocateSubpage
/**
* 申请「tiny&small」级别内存
*/
private long allocateSubpage(int normCapacity) {

// #1 先去「PoolSubpagePools[]」数组中是否能找到合适的「PoolSubpage」
// 如果存在,则使用现成的,否则得新建一个「PoolSubpage」对象后再放入「PoolSubpagePools[]」数组中
// 「PoolSubpage[]」每个节点都会初始化一个head节点,所以这里返回一定不为空
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);

// 子页直接可叶子节点层查找
int d = maxOrder;
// 对头结点上锁,通过数组减少锁的粒度,提高并发性能
synchronized (head) {
// #2 在深度d获取空闲子叶
int id = allocateNode(d);
if (id < 0) {
return id;
}

// 获取成功,对该「page」进行拆分改造
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;

freeBytes -= pageSize;

// #3 获取偏移量(相对于 maxSubpageAlloc)
// 可以理解为 subpageIdx = id - maxSubpageAllocs
int subpageIdx = subpageIdx(id);
// 定位到「PoolChunk」内部的「PoolSubpage[]」
PoolSubpage<T> subpage = subpages[subpageIdx];

if (subpage == null) {
// 如果不存在,则创建
subpage = new PoolSubpage<T>(head, // 这个头结点来自「PoolArena」
this, // 当前「PoolChunk」
id, // 节点「id」
runOffset(id), // 节点「id」字节偏移量
pageSize, // 页大小
normCapacity); // 每个「subpage」等份大小
// 记录新创建的「PoolSubpage」
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}

// #4 使用「PoolSubpage」分配内存
return subpage.allocate();
}
}

/**
* 移除最高位,获得偏移量
*/
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
}

/**
* 获取节点「id」字节偏移量。
*/
private int runOffset(int id) {
// << 优先级高于 ^
// depth(id): 获取节点「id」对应的深度
// int index = 1 << depth(id) => 2^depth(id),获取节点「id」所在层的最左节点的索引值
// id ^ index => |id-index|,节点id相对所在层的最左节点的偏移量
int shift = id ^ 1 << depth(id);

// runLength(id): 获取节点id的单位值(byte)比如,对于序号为4的节点,它所对应的值为 4194304,即 4MB
// shift * runLength(id): 也就是前面已用的数
// size=runLength(id): 节点id所分配内存大小
// shift*size: 字节偏移量
return shift * runLength(id);
}

/**
* 获取节点「id」所分配的内存大小,单位:字节
* 等价于 chunkSize/(2^maxOrder)
*/
private int runLength(int id) {
// log2ChunkSize=log2chunkSize
return 1 << (log2ChunkSize - depth(id));
}

这里对 allocateSubpage(int) 源码做个小总结: 这个方法主要的目的是创建一个 PoolSubpage 对象,然后委托这个对象完成 tiny&small 级别内存分配。PoolChunk 在内部使用 PoolSubage[] 数组保存 PoolSubpage 对象引用。PoolSubage[] 长度和二叉树的叶子节点个数相同,它们一一对应。PoolSubpage 对象是 page 的化身,它拥有管理 pageSize 大小内存的能力。PoolChunk 只需管理 page,两者分工明确。

PoolSubpage 内存分配

PoolSubpage 内部相关变量之前已经解释过,在这里解释的。它管理内存的逻辑是将 pageSize 大小的内存块等分成若干个了块,子块个数是根据本次申请内存大小所决定,比如申请 1KB 内存,那么会找到一个空闲的 page 并将其拆分成 8 等份(8KB/1KB=8)。并使用位图记录每份子块的使用状态,1 表示已使用,0 表示未使用。最多可分为 512 等份,底层使用 long[] 数组存储位图信息。64 位的句柄值的高 32 位存储位信息,低 32 位存储储节点索引值。因此,核心的问题是如何使用 long[] 数组记录使用情况呢?
源码之下无秘密:
PoolSubpage#allocate.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
java复制代码// io.netty.buffer.PoolSubpage#allocate
/**
* 「PoolSubpage」内存分配入口
* @return 返回此次内存分配在bitmap的索引值
*/
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}

// 无可用分片,返回-1
if (numAvail == 0 || !doNotDestroy) {
return -1;
}

// #1 获取下一个可用的分片索引(绝对值)
// 第一个索引值为0
final int bitmapIdx = getNextAvail();

// #2 除以64,确定bitmap[]哪一个
// 第一个bitmap索引值为0
int q = bitmapIdx >>> 6;

// #3 &63: 确认64位长度long的哪一位
// 除以64取余,获取当前绝对 id 的偏移量
// 63: 0011 1111
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;

// 更新第r位的值为1
// << 优先级高于 |=
bitmap[q] |= 1L << r;

// 更新可用数量
if (-- numAvail == 0) {
// 如果可用数量为0,表示子页中再无可分配的空间
// 需要从双向链表中移除
removeFromPool();
}

// 将bitmapIdx 转换为long存储,long 高32位存储的是小内存位置索引
return toHandle(bitmapIdx);
}

/**
* 获取下一个可用的「分片内存块」
*/
private int getNextAvail() {
int nextAvail = this.nextAvail;
// nextAvail>=0,表明可以直接使用
if (nextAvail >= 0) {
this.nextAvail = -1;
return nextAvail;
}
// nextAvaild<0,需要寻找下一个可用的「分片内存块」
return findNextAvail();
}

/**
* 获取下一个可用的「分片内存块」
* 本质是搜索 bitmap[] 数组为0的索引值
*/
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
// 循环遍历
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];

// #1 先判断整个bits是否有「0」位
// 不可用时bits为「0XFFFFFFFFFFFFFFFF」,~bits=0
// 可用时~bits !=0
if (~bits != 0) {
// #2 找寻可用的位
return findNextAvail0(i, bits);
}
}
return -1;
}

/**
* 搜索下一个可用位
*
*/
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;

// i << 6 => i * 2^6=i*64
// 想象把long[]展开,baseVal就是基址
final int baseVal = i << 6;

for (int j = 0; j < 64; j ++) {
// bits & 1: 判断最低位是否为0
if ((bits & 1) == 0) {
// 找到空闲子块,组装数据
// baseVal|j => baseVal + j,基址+位的偏移值
int val = baseVal | j;
// 不能越界
if (val < maxNumElems) {
return val;
} else {
break;
}
}

// 无符号右移1位
bits >>>= 1;
}
return -1;
}

// io.netty.buffer.PoolSubpage#toHandle
/**
* 将bitmap索引信息写入高32位,memoryMapIdx信息写入低32位
*
* 0x4000000000000000L: 最高位为1,其他所有位为0。
* 为什么使用0x4000000000000000L数值?
* 是因为对于第一次小内存分配情况,如果高32位为0,则返回句柄值的高位为 0,
* 低32位为 2048(第11层的第一个节点的索引值),但是这个返回值并不会当成子页来处理,从而影响后续的逻辑判断
* 详见 https://blog.csdn.net/wangwei19871103/article/details/104356566
* @param bitmapIdx bitmap索引值
* @return 句柄值
*/
private long toHandle(int bitmapIdx) {
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}

bitmap 填充示意图
bitmap填充图.png
小结上面的源码: PoolSubpage 使用 8 个 long 值存储子块的使用情况,句柄高 32 位存储位图索引值,低 32 位存储 节点索引值。通过大量的位运算提高了性能,通过源码阅读,也提升了位编程应用技巧。
这有一个关于 PoolSubpage 的是如何和 PoolArena 配合使用,因为我们知道,PoolArena 也存有 PoolSubpage[] 数组对象,这些数组对象是怎么被添加的呢?答案在在初始化 PoolSubpage 时就添加到对应的 PoolArena#poolsubpage[] 中了。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码// io.netty.buffer.PoolSubpage
/**
* 「PoolSubpage」构造器
* @param head 从「PoolArena」对象中获取「PoolSubpage」结点做头结点
* @param chunk 当前「PoolSubpage」属性的「PoolChunk」对象
* @param memoryMapIdx 所属的「Page」的节点值
* @param runOffset 对存储容器为「byte[]」有用,表示偏移量
* @param pageSize 页大小,默认值为: 8KB
* @param elemSize 元素个数
*/
PoolSubpage(PoolSubpage<T> head,
PoolChunk<T> chunk,
int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64

// 添加到「head」链表中
init(head, elemSize);
}

// 初始化「PoolSubpage」
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
// 初始化各类参数
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
// 根据元素个数确定所需要bitmap个数,即确认「bitmapLength」值
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}

for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
// 添加至双向链表中,供后续分配使用
addToPool(head);
}

private void addToPool(PoolSubpage<T> head) {
assert prev == null && next == null;
prev = head;
next = head.next;
next.prev = this;
head.next = this;
}

以上,就是对 PoolSubpage 内存分析的源码解析,理清思路,功能拆解之后并不困难。

PoolChunk 如何回收内存

讲完了物理内存分配,还没有讲 PoolChunk 是如何回收内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码// io.netty.buffer.PoolChunk#free
/**
* 「PoolChunk」释放「handle」表示的内存块。
* 本质是修改对应节点的「memoryMap」值。
* 参数「nioByteBuf」如果不为空,则会放入「Deque」队列中缓存,减少GC
*/
void free(long handle, ByteBuffer nioBuffer) {

// #1 获取句柄「handle」低32位数值,该值表示节点id
int memoryMapIdx = memoryMapIdx(handle);

// #2 获取句柄「handle」高32位数值,该值表示bitmap索引值
int bitmapIdx = bitmapIdx(handle);

// #3 如果bitmqpIdx不为0,说明当前属于subpage释放
if (bitmapIdx != 0) { // free a subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;

// #4 别忘记,「PoolArena」对象中也存有「PoolSubpage」的引用哦
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
// 交给「PoolSubpage」专业人员释放吧
// 0x3FFFFFFF: 0011 1111 1111 1111 1111 1111 1111 1111,
// bitmapIdx & 0x3FFFFFFF: 保留低30位的值
// 为什么要抹去最高2位呢?因为生成handle是通过 |0x4000000000000000L 操作
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
return;
}
}
}

// #5 更新空闲内存信息
freeBytes += runLength(memoryMapIdx);

// #6 更新memoryMap信息
setValue(memoryMapIdx, depth(memoryMapIdx));

// #6 循环更新父节点的值
// 注意: 当更新父节点值时,有可能遇到两个兄弟节点的值都为初始值,
// 此时,父节点的值也为初始化而非两者之中最小值
updateParentsFree(memoryMapIdx);

// #7 缓存「ByteBuffer」对象
if (nioBuffer != null && cachedNioBuffers != null &&
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.offer(nioBuffer);
}
}

// 无符号右移32位即可
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE);
}

从源码可看出,PoolChunk 回收一块内存十分简单。回收 PoolSubpage 稍微麻烦一点,因为还需要和 PoolArena 中的 PoolSubpage 保持同步。

PoolSubpage 如何回收内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
java复制代码// io.netty.buffer.PoolSubpage#free
/**
*「PoolSubpage」释放内存块
* 目标是修改相应「bitmap」的值
*
* @param head 来自「PooArena#PoolSubpage[]」数组的head节点
* @param bitmapIdx bitmap索引值
*/
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}

// 确认bitmap[] 数组索引值
int q = bitmapIdx >>> 6;
// 确认在64位中的哪一位
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;

// 修改对应位为0
bitmap[q] ^= 1L << r;

// 设置可用位信息,待下次分配时直接使用
setNextAvail(bitmapIdx);

// 因为当numAvail=0时,表示无可用内存块,则会从「PoolArena#PoolSubpage[]」数组中移除
// 这次添加后就可以从新回到「PoolArena[]」队列中
if (numAvail ++ == 0) {
// 添加队列
addToPool(head);
return true;
}

if (numAvail != maxNumElems) {
// 还没有到达饱和,即完成这次分配后还有可用空闲,那直接返回
return true;
} else {
// 达到饱和,无内存块可用
if (prev == next) {
// 如果当前链表只有这么一个PoolSubpage对象,就不移除了
return true;
}

// 移除链表
doNotDestroy = false;
removeFromPool();
return false;
}
}

PoolSubpage 需要照顾到 PoolArena 的 PoolSubpage[] 变量,所以稍微代码量多一点。但逻辑十分清楚。看代码就十分明白,我就不强行总结了。

如何回收整个 PoolChunk

核心代码在 PoolArena,根据有无 Cleaner 释放内存 memory 对象即可。

1
2
3
4
5
6
7
8
9
java复制代码// io.netty.buffer.PoolArena.DirectArena#destroyChunk
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
}
}

小结

Netty 的内存回收是庞大的,两篇文章从 ByteBuf 体系结构讲到源码级的内存分配实现,似乎还是没有讲全讲透。这里只不过把最核心的代码拧出来给大家口味,最终还是希望看到这些文章的各位 DEBUG 调试走一遍。我深知自己的知识能力水平有限,文章部分地方的表达能力欠缺,有些简单的地方描述过于复杂,而恰恰需要讲清楚的地方一笔带过,敬请读者斧正。

我的公众号

搜索 小道一下

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%