基于非阻塞Java的数据库中间件--实践,问题和解决 II

背景介绍

饿了么数据访问层(DAL)是基于 IO.Netty 实现的高并发、高吞吐量的 Java 服务。为了追求高性能在客户端连接和 DB 连接都使用了异步 IO 处理 SQL 请求和结果集。

关于DAL的线程模型以及他们介绍这里就不再赘述了,有兴趣的同学可以阅读下上一篇文章基于非阻塞Java的数据库中间件–实践,问题和解决

本文主要介绍DAL在开发过程中关于Netty OOM的坑。

不期而遇的堆外OOM

介于堆外内存管理复杂性,我们的策略是避开直接使用堆外内存,由Netty框架自己处理堆外内存的申请与释放。

Netty里有四种主力的ByteBuf:UnpooledHeapByteBuf,UnpooledDirectByteBuf,PooledHeapByteBuf,PooledDirectByteBuf。顾名思义,Unpooled的对象就不需要太关心了, 能够依赖JVM GC自然回收。而PooledHeapByteBuf 和 PooledDirectByteBuf,则必须要依赖引用技术器和回收过程。使用池化对象的时候,必须自己管理计数技术,是不是有一种熟悉的C的调调。

我们在Netty eventloop线程拿到消息后直接保存到堆内内存中,在我们的自己的worker线程处理业务逻辑,最后通过Netty Unpooled工具封装消息交给nettty eventloop线程继续工作。

1
复制代码sqlSessionContext.clientWriteAndFlush(Unpooled.wrappedBuffer(authen.toPacket())

考虑到Netty缓存池的频频爆出的内存泄漏问题,我们也小心翼翼的避免使用池化缓存池,看到源代码里默认是关闭池化内存的,也是让我们松了一口气。

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码//注 4.1以后官方对池化内存信心大增,默认为打开
String allocType = SystemPropertyUtil.get("io.netty.allocator.type", "unpooled").toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: unpooled (unknown: {})", allocType);
}

另外对于异常我们做了足够的考虑,无论读写出现任何异常,直接关闭对应的netty channel。

然而堆外内存OOM还是不期而至。一台服务器跑了21天后,吃光了所有内存与swap,生生的把自己变成了流量黑洞。

躲不开的堆外内存池

Netty的写入过程可以划分为两个基本步骤write与flush.

write的过程是将“数据请求”添加到ChannelOutboundBuffer,这个buffer是和每个socket具体绑定的。“数据请求”采用链的方式一一相接,在添加时候并无容量控制。
flush是将ChannelOutboundBuffer中的一批数据请求拿出来消费,即拷入socket的sendbuffer。能写入多少数据,则移除多少“数据请求”, 如果没有写完,并不会移除ChannelOutboundBuffer中的数据。

在我们调用netty write方法的时候有一个步骤是校验写数据是否为堆外内存形式。


虽然是使用池化内存的选项是“unpooled”,Netty还是非常”贴心”地帮我们默认使用了专门用于写数据的简易的堆外内存池。这个池子的使用规则是如果拿到的对象大小不够存放需要写的内容,则扩充这个bytebuf对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {

private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
@Override
protected ThreadLocalUnsafeDirectByteBuf newObject(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
return new ThreadLocalUnsafeDirectByteBuf(handle);
}
};

static ThreadLocalUnsafeDirectByteBuf newInstance() {
ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}

对象总数大小

1
2
3
4
5
复制代码int maxCapacity = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity.default", 0);
if (maxCapacity <= 0) {
// TODO: Some arbitrary large number - should adjust as we get more production experience.
maxCapacity = 262144;
}

单个对象大小

1
2
复制代码THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);

单个netty线程最大默认为262144 * 64 * 1024 = 16G 。

最终我们还是没有绕开netty的堆外内存缓存的坑。由于DAL是多租户的系统,线上复杂的sql使用情况,使得最终内存池在数量稳定后,逐渐趋向与最大对象对齐。


Netty 堆外内存的正确使用姿势


知道问题的原因,处理起来也就顺理成章了。我们需要选择一个合适缓存bytebuf大小,通过统计线上mysql消息的大小,我们选择缓存对象最大值为2K。超过2k,在RECYCLER回收机制下,bytebuf使用结束了直接会被释放。

1
复制代码-Dio.netty.threadLocalDirectBufferSize=2048

在性能测试中我们也尝试过完全关闭池子(设置-Dio.netty.threadLocalDirectBufferSize=0),然而Java堆外内存申请确实消耗非常大,立马成为了我们的瓶颈,线程容易卡在堆外内存的申请与释放上。


其次,我们需要限制堆外内存对象数量的总数。主要面对的问题是mysql大结果集问题,我们在这里通过设置AutoRead做流控。由于AutoRead在epoll的边缘触发模式下失效,在建立连接的需要确认使用水平触发模式。

1
2
3
4
5
复制代码Bootstrap b = new Bootstrap();
b.group(AthenaEventLoopGroupCenter.getSeverWorkerGroup());
b.channel(AthenaEventLoopGroupCenter.getChannelClass());
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);

每次发送64K内容后我们先关闭autoread开关,等到客户端接受所有消息后再打开autoread开关。

1
2
3
4
5
复制代码/**
* Sets if {@link ChannelHandlerContext#read()} will be invoked automatically so that a user application doesn't
* need to call it at all. The default value is {@code true}.
*/
ChannelConfig setAutoRead(boolean autoRead);

另外需要谈到的是,Netty Channel的write 以及flush方法其实是反直觉的,它们并不会帮你把多次write的内容合并一个堆外内存对象,而是每次write就会生成一个堆外内存对象。针对mysql协议可能一次反回大量行数而整体数据量很小的情况(如查询某个表所有id列),就会催生大量的对象。

因此我们自己做了一次聚合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码public class WriteBuf {
private final List<ByteBuf> bufs = new LinkedList<ByteBuf>();
private int bufSize;

public int write(ByteBuf buf) {
bufs.add(buf);
bufSize += buf.readableBytes();
return bufSize;
}

public ByteBuf readall() {
try {
return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, bufs.size(), bufs);
} finally {
bufSize=0;
bufs.clear();
}
}
}

后记

我们一路走来,确实遇到了不少的坑,但不得不说Netty还是非常给力的。目前我们线上使用Zing JVM来解决GC停顿问题,配合Netty 的NIO模型,单机tps一般跑在7w,平均延迟在0.3ms内。

参考文档

Netty系列之Netty高性能之道

netty/netty

Netty: Home

作者介绍

林静 ,2011毕业于浙江大学,2015年加入饿了么,现任饿了么框架工具部架构师。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%