LEAK: 未调用ByteBuf.release() – 我们如何解决这个问题?

huangapple go评论65阅读模式
英文:

LEAK: ByteBuf.release() was not called - how do we solve this?

问题

这是您提供的内容的翻译:

我们有一个基于Netty的网络流量密集型Java应用程序/服务器。

附注:我主要支持这个应用程序,我没有构建它,所以我不完全了解它。

我们有时会出现下面显示的错误。以前,我们在服务器运行了3-4天后才会出现这个错误。现在我注意到,即使在重新启动服务器/应用程序后的10-15分钟内,我们也会遇到这个错误。

我不明白这是怎么可能的。这个错误是否令人担忧,我们该如何修复它?我记得以前曾对这个错误进行了广泛的研究,当时我甚至尝试过升级和修补Netty,但没有任何帮助完全解决这个问题。

操作系统:Linux
Java版本:1.8
Netty版本:netty-all-4.1.30.Final.jar

这是唯一的应用程序特定代码行,其他所有操作都在Netty中进行。

com.company.japp.protocol.http.decoders.ConditionalHttpChunkAggregator.channelRead

这是否是Netty本身的某种错误?Netty的升级或任何其他配置调整是否有助于解决这个问题?

[2020-09-04 08:33:53,072]

错误

io.netty.util.ResourceLeakDetector

LEAK:在垃圾回收之前未调用ByteBuf.release()。

有关更多信息,请参见https://netty.io/wiki/reference-counted-objects.html。

最近的访问记录:
创建于:
io.netty.buffer.AbstractByteBufAllocator.compositeDirectBuffer(AbstractByteBufAllocator.java:221)
io.netty.buffer.AbstractByteBufAllocator.compositeBuffer(AbstractByteBufAllocator.java:199)
io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:255)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
com.company.japp.protocol.http.decoders.ConditionalHttpChunkAggregator.channelRead(ConditionalHttpChunkAggregator.java:112)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)

这是ConditionalHttpChunkAggregator的代码:

package com.company.japp.protocol.http.decoders;

import com.company.japp.IHttpProxyServer;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.HashSet;

@ChannelHandler.Sharable
public class ConditionalHttpChunkAggregator extends HttpObjectAggregator {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConditionalHttpChunkAggregator.class);

    private volatile boolean sendaschunked;
    private volatile int maxContentLength;

    private static IHttpProxyServer iHttpProxyServer;

    public static void initialize(IHttpProxyServer iHttpProxyServer) {
        ConditionalHttpChunkAggregator.iHttpProxyServer = iHttpProxyServer;
    }

    public ConditionalHttpChunkAggregator(int maxContentLength) {
        super(maxContentLength);
        this.maxContentLength = maxContentLength;
        sendaschunked = false;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        if ((msg instanceof HttpResponse)) {
            HttpResponse response = (HttpResponse)msg;
            if ((msg instanceof HttpMessage)) {
                HttpMessage httpmessage= (HttpMessage)msg;

                try  {
                    // If the content length exceeds the threshhold, then send it as chunked
                    // It's too large to process substitutions
                    Long contentlength = 
                    		httpmessage.headers().get(HttpHeaders.Names.CONTENT_LENGTH) != null ? 
                    		Long.valueOf(httpmessage.headers().get(HttpHeaders.Names.CONTENT_LENGTH)) : -1;
                    if (contentlength >= maxContentLength) {
                        sendaschunked = true;
                    } else {
                    // Check content types
                         HashSet<String> chunkabletypes = iHttpProxyServer.getConfig().getProperty("chunkabletypes");
                        if (!chunkabletypes.isEmpty() && response.headers().contains(HttpHeaders.Names.CONTENT_TYPE)) {
                            String contentType = response.headers().get(HttpHeaders.Names.CONTENT_TYPE).toLowerCase().trim();
                            if (contentType.length()>0) {
                                sendaschunked = chunkabletypes.contains(contentType);
                                if (!sendaschunked) {
                                    for (String chunkabletype: chunkabletypes) {
                                        // Begins with
                                        if (contentType.indexOf(chunkabletype)==0) {
                                            sendaschunked = true;
                                            break;
                                        }
                                    }
                                }
                            }
                        }
                    }
                    if (sendaschunked) {
                        ctx.fireChannelRead(msg);
                        return;
                    }
                }
                catch(Exception ex) {
                    logger.error("error determining chunkable viability", ex);
                }
           

<details>
<summary>英文:</summary>

We have a netty-based network traffic intensive Java app/server.   

*Side note: I mainly support this app, I didn&#39;t build it so I don&#39;t know it in full.*       

We sometimes get this error as shown below. Previously we used to get this error after the server has been up for 3-4 days. Now I noticed that we are getting this error even just 10-15 mins after rebooting the server/app.     

I don&#39;t understand how this is possible. Is this error something worrying and how can we fix it? I recall doing extensive research in the past on this same error, back then I even tried upgrading and patching netty but nothing helped to fully resolve the issue.

OS: Linux     
Java version: 1.8      
Netty version: netty-all-4.1.30.Final.jar      

This is the only line of app-specific code, everything else happens in Netty.    

`com.company.japp.protocol.http.decoders.ConditionalHttpChunkAggregator.channelRead`  

Is this a bug of some sort in Netty itself? Would Netty upgrade or any other configuration tuning help here?

	[2020-09-04 08:33:53,072]
	
	ERROR
	
	io.netty.util.ResourceLeakDetector
	
	LEAK: ByteBuf.release() was not called before it&#39;s garbage-collected. 
	
	See https://netty.io/wiki/reference-counted-objects.html for more information.
	
	Recent access records: 
	Created at:
		io.netty.buffer.AbstractByteBufAllocator.compositeDirectBuffer(AbstractByteBufAllocator.java:221)
		io.netty.buffer.AbstractByteBufAllocator.compositeBuffer(AbstractByteBufAllocator.java:199)
		io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:255)
		io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
		com.company.japp.protocol.http.decoders.ConditionalHttpChunkAggregator.channelRead(ConditionalHttpChunkAggregator.java:112)
		io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
		io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
		io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
		io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
		io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
		io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
		io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
		io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
		io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
		io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
		io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
		io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
		io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
		io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
		io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
		io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
		io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
		io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
		io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
		io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
		io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
		io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
		io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		java.lang.Thread.run(Thread.java:748)
	
	
	
	
Here is the code for the `ConditionalHttpChunkAggregator`.    


	package com.company.japp.protocol.http.decoders;
	
	import com.company.japp.IHttpProxyServer;
	import io.netty.channel.ChannelDuplexHandler;
	import io.netty.channel.ChannelHandler;
	import io.netty.channel.ChannelHandlerContext;
	import io.netty.handler.codec.http.HttpHeaders;
	import io.netty.handler.codec.http.HttpMessage;
	import io.netty.handler.codec.http.HttpObjectAggregator;
	import io.netty.handler.codec.http.HttpResponse;
	import io.netty.util.internal.logging.InternalLogger;
	import io.netty.util.internal.logging.InternalLoggerFactory;
	
	import java.util.HashSet;
	
	@ChannelHandler.Sharable
	public class ConditionalHttpChunkAggregator extends HttpObjectAggregator {
	    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConditionalHttpChunkAggregator.class);
	
	    private volatile boolean        sendaschunked;
	    private volatile int        maxContentLength;
	
	    private static IHttpProxyServer         iHttpProxyServer;
	
	    public static void initialize(IHttpProxyServer iHttpProxyServer) {
	        ConditionalHttpChunkAggregator.iHttpProxyServer = iHttpProxyServer;
	    }
	
	    public ConditionalHttpChunkAggregator(int maxContentLength) {
	        super(maxContentLength);
	        this.maxContentLength = maxContentLength;
	        sendaschunked = false;
	    }
	
	    @Override
	    public void channelRead(ChannelHandlerContext ctx, Object msg){
	        if ((msg instanceof HttpResponse)) {
	            HttpResponse response = (HttpResponse)msg;
	            if ((msg instanceof HttpMessage)) {
	                HttpMessage httpmessage= (HttpMessage)msg;
	
	                try  {
	                    // If the content length exceeds the threshhold, then send it as chunked
	                    // It&#39;s too large to process substitutions
	                    Long contentlength = 
	                    		httpmessage.headers().get(HttpHeaders.Names.CONTENT_LENGTH) != null ? 
	                    		Long.valueOf(httpmessage.headers().get(HttpHeaders.Names.CONTENT_LENGTH)) : -1;
	                    if (contentlength &gt;= maxContentLength) {
	                        sendaschunked = true;
	                    } else {
	                    // Check content types
	                         HashSet&lt;String&gt; chunkabletypes = iHttpProxyServer.getConfig().getProperty(&quot;chunkabletypes&quot;);
	                        if (!chunkabletypes.isEmpty() &amp;&amp; response.headers().contains(HttpHeaders.Names.CONTENT_TYPE)) {
	                            String contentType = response.headers().get(HttpHeaders.Names.CONTENT_TYPE).toLowerCase().trim();
	                            if (contentType.length()&gt;0) {
	                                sendaschunked = chunkabletypes.contains(contentType);
	                                if (!sendaschunked) {
	                                    for (String chunkabletype: chunkabletypes) {
	                                        // Begins with
	                                        if (contentType.indexOf(chunkabletype)==0) {
	                                            sendaschunked = true;
	                                            break;
	                                        }
	                                    }
	                                }
	                            }
	                        }
	                    }
	                    if (sendaschunked) {
	                        ctx.fireChannelRead(msg);
	                        return;
	                    }
	                }
	                catch(Exception ex) {
	                    logger.error(&quot;error determining chunkable viability&quot;, ex);
	                }
	            }
	        }
	        if (sendaschunked) {
	            ctx.fireChannelRead(msg);
	            return;
	        }
	
	        try {
	            super.channelRead(ctx, msg);
	            
	        } catch (Exception e) {
	            logger.error(&quot;error determining chunkable viability&quot;, e);
	            e.printStackTrace();
	        }
	    }
	}


And this is the value for the chunkable types property:  

`video/x-ms-wvx,video/x-flv,application/x-shockwave-flash,video/quicktime,video/,audio/`


Edit
---

I think this is bug in netty 4.1.30 around this line 255 in `io.netty.handler.codec.MessageAggregator`. Seems this CompositeByteBuf is allocated but not released. Am I right? I am hoping for some authoritative answer to either confirm or reject this idea.

            // A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
            CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents); // LINE 255 
            if (m instanceof ByteBufHolder) {
                appendPartialContent(content, ((ByteBufHolder) m).content());
            }
            currentMessage = beginAggregation(m, content);




</details>


# 答案1
**得分**: 4

在我们的情况下问题出在没有读取响应主体导致字节缓冲区永远不会被释放

原始代码
```java
WebClient client = WebClient.create("some_url");
return client.get().exchange().flatMap(response -> Mono.just(builder.up().build()));

修正后的代码:

WebClient client = WebClient.create("some_url");
return client.get().retrieve().toBodilessEntity().flatMap(response -> Mono.just(builder.up().build()));

即使是exchange的Javadoc也指出了这一点:
> 自5.3起弃用,因为可能会导致内存和/或连接泄漏。请使用exchangeToMono(Function)、exchangeToFlux(Function);还考虑使用retrieve(),它通过ResponseEntity提供对响应状态和标头的访问,以及错误状态处理。

致谢:https://stackoverflow.com/a/51321602/3242721

要在本地复现此问题,我使用了-Dio.netty.leakDetectionLevel=paranoid的VM参数。

英文:

In our case it was a problem in the body of the response not being read which resulted into byte buf never being released.

Original code:

WebClient client = WebClient.create(&quot;some_url&quot;);
return client.get().exchange().flatMap(response -&gt; Mono.just(builder.up().build()));

Fixed code:

WebClient client = WebClient.create(&quot;some_url&quot;);
return client.get().retrieve().toBodilessEntity().flatMap(response -&gt; Mono.just(builder.up().build()));

Even the javadoc for exchange says it:
> Deprecated since 5.3 due to the possibility to leak memory and/or connections; please, use exchangeToMono(Function), exchangeToFlux(Function); consider also using retrieve() which provides access to the response status and headers via ResponseEntity along with error status handling.

Kudos: https://stackoverflow.com/a/51321602/3242721

To reproduce the issue on localhost I used -Dio.netty.leakDetectionLevel=paranoid vm args.

答案2

得分: 1

你需要释放由你分配的 ByteBuf。这不是Netty的错误。

ByteBuf 位于 com.company.japp.protocol.http.decoders.ConditionalHttpChunkAggregator.channelRead(ConditionalHttpChunkAggregator.java:112)

英文:

You need to release Bytebuf allocated by you. It's not a Netty bug.

ByteBuf is at com.company.japp.protocol.http.decoders.ConditionalHttpChunkAggregator.channelRead(ConditionalHttpChunkAggregator.java:112)

huangapple
  • 本文由 发表于 2020年9月4日 20:50:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/63741540.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定