【踩坑日记】记一次netty引起的oom处置过程
背景
该应用是个业务数据推送客户端,业务功能是从数据库中不断的取出数据,把数据按一定格式处理后推送给数仓前置,数仓前置服务是个java项目运行在一个10年前的小机器上(奔腾2030,2013年),并且需要对数据进行正则校验,数据量差不多1秒300个数据的写入,采用netty-client和netty-server来创建tcp通道。
单条数据平均约65个字符,占用66个字节位
问题的发现
应用宿主机监控告警,应用内存占用一直处于危险线,查看应用日志发现: 数据写入tcp通道时频繁出现无法分配内存的OOM异常错误。由于应用未配置内存溢出dump,修改启动参数后重新启动
这里提醒下应用已经要配置输出dump日志,以便于快速发现定位问题
- 启动参数增加设定了
HeapDumpOnOutOfMemoryError
和HeapDumpPath
dump文件存储路径
java -jar -Xmx6G -Xms6G -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/serverPath/applicationName.hprof -XX:+UseG1GC app.jar > app.log &
通过
jstat -gc [pid]
查看内存状态,发现OU一直增长后无限接近于OC,FGCT次数上升但是不见OU占用的内存下降
处置过程
分析内存文件
通过JProfiler打开dump文件,查看堆内存占用
可以看到ChanelOutboundBuffer占据了4828M的内存
问题来源
我们来分析一下neety怎么写出一个tcp消息
通过nettyClient成功连接到接收方的nettyServer后,会创建一个channel对象,调用channel的writeAndFlush
方法写入消息,这里我们将EventLoopGroup绑定到boot时,设定的channel是NioSocketChannel
,本次按理以这个channel的writeAndFlush方法为例,由于NioSocketChannel没有实现这个方法,所以调用的实现是其父类AbstractNioByteChannel的父类AbstractNioChannel的父类AbstractChannel实现的
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return pipeline.writeAndFlush(msg, promise);
}
具体的调用写入的方法是
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
我们调用的是writeAndFlushed所以具体的执行是这个方法
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
这一部分的方法执行的过程就是把消息写入到ChannelOutboundHandler后调用flush写入到outboundBuffer上,这个具体的是调用AbstractUnsafe的write
和flush
,其中执行的是ChannelOutboundBuffer的addMessage
和addFlush
/**
* Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
* the message was written.
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// Touch the message to make it easier to debug buffer leaks.
// this save both checking against the ReferenceCounted interface
// and makes better use of virtual calls vs interface ones
if (msg instanceof AbstractReferenceCountedByteBuf) {
((AbstractReferenceCountedByteBuf) msg).touch();
} else {
ReferenceCountUtil.touch(msg);
}
// 将消息添加到未刷新数组后,增加挂起的字节数
incrementPendingOutboundBytes(entry.pendingSize, false);
}
/**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
写完entry后会触发flush0()
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
// Check if we need to generate the exception at all.
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
inFlush0 = false;
}
}
这里会调用doWrite()
##
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioDomainSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// Always use nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
//重新计算下次写入的大小
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
这里会循环16(默认配置)次去写入,看到这里好像我们知道了writeAndFlush会一直往buffer中写数据,当buffer写到网络中去的速度大于写入到buffer的速度时,就会出现堆积,从而导致OOM
解决
我们在分析netty源码的时候发现,再addMessage和addFlush时,分别有2个方法来计数写入多少(incrementPendingOutboundBytes
)写完多少(decrementPendingOutboundBytes
)
/**
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(long size) {
incrementPendingOutboundBytes(size, true);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
/**
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(long size) {
decrementPendingOutboundBytes(size, true, true);
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
这里在incr和decr的时候会去计算buffer里的数量是否达到水位线
public final class WriteBufferWaterMark {
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
public static final WriteBufferWaterMark DEFAULT =
new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);
在创建channel的时候配置合适的水位值,和在写入的时候判断一下channel.isWritable()
的返回值
/**
* Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
* not exceed the write watermark of the {@link Channel} and
* no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
* {@code false}.
* 当且仅当{@linkplain totalPendingWriteBytes()待挂字节总数}未超过{@link Channel}的写入水印}且未将{@linkplain setUserDefinedWritability(int, boolean)用户自定义可写性标志}设置为{@code false}时返回{@code true}。
*/
public boolean isWritable() {
return unwritable == 0;
}
转载自:https://juejin.cn/post/7381413495092363303