英文:
Netty client stops reading socket data after days or hours
问题
Here's the translation of the provided code:
这是提供的代码的翻译:
我有以下情景:
java 1.8
spring-boot-starter-parent 2.3.2.RELEASE
spring-webflux 5.2.8.RELEASE
spring-boot-starter-reactor-netty 2.3.2.RELEASE
应用程序在 WebSphere Application Server 9 上运行
我的应用程序是一个具有重新连接支持的 Netty 客户端,用于读取发送到套接字的数据。它获取数据并搜索帧的起始和结束分隔符,一旦找到它,就将其发送到下一个处理程序进行处理。几天后或仅几小时后,客户端停止捕获发送到套接字的数据,观察日志文件,唯一的错误是:
```2020-08-13 15: 31: 34,885 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release () was not called before garbage collection```
这是主要类
```@Bean
public void run() {
LOGGER.info("Levantando la aplicacion CAPTURADOR");
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
});
socketChannel.pipeline().addLast(frameExtractor);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
socketChannel.pipeline().addLast(clientHandler);
}
});
doConnect();
}
/**
*
*/
private void doConnect() {
if (closed) {
return;
}
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
}
});
}
这是 FrameExtractor 类
/**
* 在 channelActive() 方法中,当通信通道激活时,我们利用它来创建上面提到的缓冲区。
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer();
}
/**
* 在 channelInactive() 方法中,当通信通道不再活动时,我们释放在 channelRegistered 中创建的缓冲区。
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (null != buf) {
buf.release();
buf = null;
}
}
/**
* 构建发送的数据,查找帧的结束标记并将其传递给下一个处理程序
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Level level = ResourceLeakDetector.getLevel();
// 参数 msg 是 Netty 的 ByteBuf。我们将其全部内容添加到我们的 buf 缓冲区的末尾,以累积字节的发送,直到找到帧的结束标记
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// 创建一个新的 ByteBuf 来复制帧直到结束标记
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
// 将字节添加到 buf 缓冲区,直到结束标记
buf.readBytes(indexOf2);
// 通过调用 ctx.fireChannelRead(line) 通知下一个处理程序,传递我们的 line 缓冲区。我们不释放 line 缓冲区,因为它是接收它的处理程序的责任。
ctx.fireChannelRead(line);
buf.discardReadBytes();
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
}
} finally {
// 释放传入的参数 msg 缓冲区。由于我们不再需要它,并且没有传递给任何其他地方,因此我们负责释放它。
ReferenceCountUtil.release(msg);
}
}
这是 ClientHandler 类
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
try {
buffer.writeBytes((ByteBuf) msg);
byte[] bytes = new byte[buffer.readableBytes()];
int readerIndex = buffer.readerIndex();
buffer.getBytes(readerIndex, bytes);
bytes = CapturadorUtils.eliminarParidad(bytes);
String trama = new String(bytes);
CapturadorGenerico capturadorGenerico = trama.contains(Directlink.KEY_DIRECTLINK)
? capturadorFactory.getCapturador(Directlink.getDirectlink())
: capturadorFactory.getCapturador(Microcom.MICROCOM);
capturadorGenerico.parsearTrama(trama, bytes);
} catch (Exception e) {
LOGGER.error("Error producido en el pipe ClientHandler con la trama: " + msg, e);
} finally {
// 释放传入的参数 msg 缓冲区。由于我们不再需要它,并且没有传递给任何其他地方,因此我们负责释放它。
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(buffer);
}
}
审查代码并分析文档 https://netty.io/wiki/reference-counted-objects.html 后,
英文:
I have the following scenario:
java 1.8
spring-boot-starter-parent 2.3.2.RELEASE
spring-webflux 5.2.8.RELEASE
spring-boot-starter-reactor-netty 2.3.2.RELEASE
The application runs on WebSphere Application Server 9
My application is a Netty client with reconnection support that reads the data that is sent to a socket. The data is taken and the start and end delimiters of the frame are searched, once it finds it, it sends it to the next handler which processes the information. After several days or only hours the client stops capturing the data that is sent to the socket, observing the log file the only error is:
2020-08-13 15: 31: 34,885 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release () was not called before garbage collection
This is main class
public void run() {
LOGGER.info("Levantando la aplicacion CAPTURADOR");
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
});
socketChannel.pipeline().addLast(frameExtractor);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
socketChannel.pipeline().addLast(clientHandler);
}
});
doConnect();
}
/**
*
*/
private void doConnect() {
if (closed) {
return;
}
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
}
});
}
This is FrameExtractor class
/**
* En el método channelActive() que es al que nos llama netty cuando el canal de comunicación está activo,
* aprovechamos para crear el buffer que mencionamos.
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer();
}
/**
* En el metodo channelInactive() que es al que nos llama netty cuando el canal de comunicación deja de estar
* activo, aprovechamos para liberar el buffer que creamos @channelRegistered.
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (null != buf) {
buf.release();
buf = null;
}
}
/**
* Arma el envio de la medicion buscando el fin de trama y lo pasa al siguiente handler
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Level level = ResourceLeakDetector.getLevel();
// El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
// nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
// Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
buf.readBytes(indexOf2);
// Avisamos al siguiente handler, pasandole nuestro buffer line. No liberamos el buffer line porque es
// responsabilidad del que lo recibe.
ctx.fireChannelRead(line);
buf.discardReadBytes();
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
}
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
}
}
This is ClientHandler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
try {
buffer.writeBytes((ByteBuf) msg);
byte[] bytes = new byte[buffer.readableBytes()];
int readerIndex = buffer.readerIndex();
buffer.getBytes(readerIndex, bytes);
bytes = CapturadorUtils.eliminarParidad(bytes);
String trama = new String(bytes);
CapturadorGenerico capturadorGenerico = trama.contains(Directlink.KEY_DIRECTLINK)
? capturadorFactory.getCapturador(Directlink.getDirectlink())
: capturadorFactory.getCapturador(Microcom.MICROCOM);
capturadorGenerico.parsearTrama(trama, bytes);
} catch (Exception e) {
LOGGER.error("Error producido en el pipe ClientHandler con la trama: " + msg, e);
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(buffer);
}
}
Reviewing the code and analyzing the documentation https://netty.io/wiki/reference-counted-objects.html I did not find what the error could be. Buffers are released correctly.
Add the log file
2020-08-14 11:23:30,404 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:173)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:107)
cl.mop.dga.satelital.capturador.handler.FrameExtractor.channelRead(FrameExtractor.java:76)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:682)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:617)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:534)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:785)
Is in the lines
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
I change the method
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
// nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
LOGGER.info("Trama recibida: " + data);
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
// Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
buf.readBytes(indexOf2);
// Avisamos al siguiente handler, pasandole nuestro buffer line. No liberamos el buffer line porque es
// responsabilidad del que lo recibe.
ctx.fireChannelRead(line);
buf.discardReadBytes();
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
}
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
}
}
And still have the error
2020-08-27 16:33:36,256 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected
but now in line
buf.readBytes(indexOf2);
Add the class ClientWithNettyHandlers
`public class ClientWithNettyHandlers extends SpringBootServletInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientWithNettyHandlers.class);
@Autowired
@Qualifier("ClientHandler")
ClientHandler clientHandler;
@Autowired
@Qualifier("FrameExtractor")
FrameExtractor frameExtractor;
private volatile EventLoopGroup workerGroup;
private volatile Bootstrap bootstrap;
private volatile boolean closed = false;
private String remoteHost;
private int remotePort;
private String logLevel;
@Bean
public void run() {
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
});
socketChannel.pipeline().addLast(frameExtractor);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
socketChannel.pipeline().addLast(clientHandler);
}
});
doConnect();
}
/**
*
*/
private void doConnect() {
if (closed) {
return;
}
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
}
});
}
/**
*
*/
@PreDestroy
public void closeNettyClient() {
close();
System.out.println("Shutting down Netty Client: " + getServerInfo());
}
/**
*
*/
public void close() {
closed = true;
Future<?> future = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
LOGGER.info("Stopped Tcp Client: " + getServerInfo());
}
/**
*
* @return
*/
private String getServerInfo() {
return String.format("RemoteHost=%s RemotePort=%d", remoteHost, remotePort);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(ClientWithNettyHandlers.class);
}
/**
*
* @param args
*/
public static void main(String[] args) {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
SpringApplication.run(ClientWithNettyHandlers.class, args);
}
}
答案1
得分: 1
这会导致您通过 ctx.alloc().buffer();
分配的缓冲区泄漏,因为您在调用 release()
之前替换了 line
。您可以通过以下方式修复此问题,例如:
ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
英文:
The leak is cases by this:
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
This will leak the buffer that you allocated via ctx.alloc().buffer();
as you replace the line
with calling release()
before. You could for example fix this by using:
ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
答案2
得分: 0
Finally, I solved the error by modifying the last handler in the pipeline and for each frame obtained I created a thread and sent it to continue with the application's own processing, parsing, insertion in the database, etc. Perhaps Netty manages some prevention mechanism in the handling of buffers and by delaying the processing the buffer that was storing the frames until identifying the beginning and end of the same was full. I want to highlight that the error that I had originally in the handling of the buffer could be solved with the advice of @NormanMaurer using skipBytes(...).
英文:
Finally, I solved the error by modifying the last handler in the pipeline and for each frame obtained I created a thread and sent it to continue with the application's own processing, parsing, insertion in the database, etc.
Perhaps Netty manages some prevention mechanism in the handling of buffers and by delaying the processing the buffer that was storing the frames until identifying the beginning and end of the same was full.
I want to highlight that the error that I had originally in the handling of the buffer could be solved with the advice of @NormanMaurer using skipBytes(...).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论