Netty客户端在数天或数小时后停止读取套接字数据。

huangapple go评论150阅读模式
英文:

Netty client stops reading socket data after days or hours

问题

Here's the translation of the provided code:

这是提供的代码的翻译:

我有以下情景

java 1.8

spring-boot-starter-parent 2.3.2.RELEASE

spring-webflux 5.2.8.RELEASE

spring-boot-starter-reactor-netty 2.3.2.RELEASE

应用程序在 WebSphere Application Server 9 上运行

我的应用程序是一个具有重新连接支持的 Netty 客户端用于读取发送到套接字的数据它获取数据并搜索帧的起始和结束分隔符一旦找到它就将其发送到下一个处理程序进行处理几天后或仅几小时后客户端停止捕获发送到套接字的数据观察日志文件唯一的错误是

```2020-08-13 15: 31: 34,885 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release () was not called before garbage collection```

这是主要类
```@Bean
    public void run() {
        LOGGER.info("Levantando la aplicacion CAPTURADOR");
        closed = false;
        workerGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addFirst(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                        super.channelInactive(ctx);
                        ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
                    }
                });
                socketChannel.pipeline().addLast(frameExtractor);
                socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
                socketChannel.pipeline().addLast(clientHandler);
            }
        });
        doConnect();
    }

    /**
     *
     */
    private void doConnect() {
        if (closed) {
            return;
        }
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    LOGGER.info("Started Tcp Client: " + getServerInfo());
                } else {
                    LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
                    f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
                }
            }
        });
    }

这是 FrameExtractor 类


/**
* 在 channelActive() 方法中,当通信通道激活时,我们利用它来创建上面提到的缓冲区。
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer();
}
/**
* 在 channelInactive() 方法中,当通信通道不再活动时,我们释放在 channelRegistered 中创建的缓冲区。
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (null != buf) {
buf.release();
buf = null;
}
}
/**
* 构建发送的数据,查找帧的结束标记并将其传递给下一个处理程序
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Level level = ResourceLeakDetector.getLevel();
// 参数 msg 是 Netty 的 ByteBuf。我们将其全部内容添加到我们的 buf 缓冲区的末尾,以累积字节的发送,直到找到帧的结束标记
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// 创建一个新的 ByteBuf 来复制帧直到结束标记
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
// 将字节添加到 buf 缓冲区,直到结束标记
buf.readBytes(indexOf2);
// 通过调用 ctx.fireChannelRead(line) 通知下一个处理程序,传递我们的 line 缓冲区。我们不释放 line 缓冲区,因为它是接收它的处理程序的责任。
ctx.fireChannelRead(line);
buf.discardReadBytes();
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
}
} finally {
// 释放传入的参数 msg 缓冲区。由于我们不再需要它,并且没有传递给任何其他地方,因此我们负责释放它。
ReferenceCountUtil.release(msg);
}
}

这是 ClientHandler 类

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
try {
buffer.writeBytes((ByteBuf) msg);
byte[] bytes = new byte[buffer.readableBytes()];
int readerIndex = buffer.readerIndex();
buffer.getBytes(readerIndex, bytes);
bytes = CapturadorUtils.eliminarParidad(bytes);
String trama = new String(bytes);
CapturadorGenerico capturadorGenerico = trama.contains(Directlink.KEY_DIRECTLINK)
? capturadorFactory.getCapturador(Directlink.getDirectlink())
: capturadorFactory.getCapturador(Microcom.MICROCOM);
capturadorGenerico.parsearTrama(trama, bytes);
} catch (Exception e) {
LOGGER.error("Error producido en el pipe ClientHandler con la trama: " + msg, e);
} finally {
// 释放传入的参数 msg 缓冲区。由于我们不再需要它,并且没有传递给任何其他地方,因此我们负责释放它。
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(buffer);
}
}

审查代码并分析文档 https://netty.io/wiki/reference-counted-objects.html 后,

英文:

I have the following scenario:

java 1.8

spring-boot-starter-parent 2.3.2.RELEASE

spring-webflux 5.2.8.RELEASE

spring-boot-starter-reactor-netty 2.3.2.RELEASE

The application runs on WebSphere Application Server 9

My application is a Netty client with reconnection support that reads the data that is sent to a socket. The data is taken and the start and end delimiters of the frame are searched, once it finds it, it sends it to the next handler which processes the information. After several days or only hours the client stops capturing the data that is sent to the socket, observing the log file the only error is:

2020-08-13 15: 31: 34,885 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release () was not called before garbage collection

This is main class

    public void run() {
LOGGER.info("Levantando la aplicacion CAPTURADOR");
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
});
socketChannel.pipeline().addLast(frameExtractor);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
socketChannel.pipeline().addLast(clientHandler);
}
});
doConnect();
}
/**
*
*/
private void doConnect() {
if (closed) {
return;
}
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
}
});
}

This is FrameExtractor class


/**
* En el método channelActive() que es al que nos llama netty cuando el canal de comunicación está activo,
* aprovechamos para crear el buffer que mencionamos.
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer();
}
/**
* En el metodo channelInactive() que es al que nos llama netty cuando el canal de comunicación deja de estar
* activo, aprovechamos para liberar el buffer que creamos @channelRegistered.
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (null != buf) {
buf.release();
buf = null;
}
}
/**
* Arma el envio de la medicion buscando el fin de trama y lo pasa al siguiente handler
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Level level = ResourceLeakDetector.getLevel();
// El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
// nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
// Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
buf.readBytes(indexOf2);
// Avisamos al siguiente handler, pasandole nuestro buffer line. No liberamos el buffer line porque es
// responsabilidad del que lo recibe.
ctx.fireChannelRead(line);
buf.discardReadBytes();
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
}
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
}
}

This is ClientHandler

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
try {
buffer.writeBytes((ByteBuf) msg);
byte[] bytes = new byte[buffer.readableBytes()];
int readerIndex = buffer.readerIndex();
buffer.getBytes(readerIndex, bytes);
bytes = CapturadorUtils.eliminarParidad(bytes);
String trama = new String(bytes);
CapturadorGenerico capturadorGenerico = trama.contains(Directlink.KEY_DIRECTLINK)
? capturadorFactory.getCapturador(Directlink.getDirectlink())
: capturadorFactory.getCapturador(Microcom.MICROCOM);
capturadorGenerico.parsearTrama(trama, bytes);
} catch (Exception e) {
LOGGER.error("Error producido en el pipe ClientHandler con la trama: " + msg, e);
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(buffer);
}
}

Reviewing the code and analyzing the documentation https://netty.io/wiki/reference-counted-objects.html I did not find what the error could be. Buffers are released correctly.

Add the log file

2020-08-14 11:23:30,404 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:173)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:107)
cl.mop.dga.satelital.capturador.handler.FrameExtractor.channelRead(FrameExtractor.java:76)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:682)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:617)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:534)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:785)

Is in the lines

ByteBuf line = ctx.alloc().buffer();

line = buf.copy(indexOf1, indexOf2 - indexOf1);

I change the method

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
// nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
LOGGER.info("Trama recibida: " + data);
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
// Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
buf.readBytes(indexOf2);
// Avisamos al siguiente handler, pasandole nuestro buffer line. No liberamos el buffer line porque es
// responsabilidad del que lo recibe.
ctx.fireChannelRead(line);
buf.discardReadBytes();
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
}
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
}
}

And still have the error

2020-08-27 16:33:36,256 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected

but now in line

buf.readBytes(indexOf2);

Add the class ClientWithNettyHandlers

`public class ClientWithNettyHandlers extends SpringBootServletInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientWithNettyHandlers.class);
@Autowired
@Qualifier("ClientHandler")
ClientHandler clientHandler;
@Autowired
@Qualifier("FrameExtractor")
FrameExtractor frameExtractor;
private volatile EventLoopGroup workerGroup;
private volatile Bootstrap bootstrap;
private volatile boolean closed = false;
private String remoteHost;
private int remotePort;
private String logLevel;
@Bean
public void run() {
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
});
socketChannel.pipeline().addLast(frameExtractor);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
socketChannel.pipeline().addLast(clientHandler);
}
});
doConnect();
}
/**
*
*/
private void doConnect() {
if (closed) {
return;
}
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
}
});
}
/**
*
*/
@PreDestroy
public void closeNettyClient() {
close();
System.out.println("Shutting down Netty Client: " + getServerInfo());
}
/**
*
*/
public void close() {
closed = true;
Future<?> future = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
LOGGER.info("Stopped Tcp Client: " + getServerInfo());
}
/**
*
* @return
*/
private String getServerInfo() {
return String.format("RemoteHost=%s RemotePort=%d", remoteHost, remotePort);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(ClientWithNettyHandlers.class);
}
/**
*
* @param args
*/
public static void main(String[] args) {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
SpringApplication.run(ClientWithNettyHandlers.class, args);
}
}

答案1

得分: 1

这会导致您通过 ctx.alloc().buffer(); 分配的缓冲区泄漏,因为您在调用 release() 之前替换了 line。您可以通过以下方式修复此问题,例如:

ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
英文:

The leak is cases by this:

             // Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);

This will leak the buffer that you allocated via ctx.alloc().buffer(); as you replace the line with calling release() before. You could for example fix this by using:

            ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);

答案2

得分: 0

Finally, I solved the error by modifying the last handler in the pipeline and for each frame obtained I created a thread and sent it to continue with the application's own processing, parsing, insertion in the database, etc. Perhaps Netty manages some prevention mechanism in the handling of buffers and by delaying the processing the buffer that was storing the frames until identifying the beginning and end of the same was full. I want to highlight that the error that I had originally in the handling of the buffer could be solved with the advice of @NormanMaurer using skipBytes(...).

英文:

Finally, I solved the error by modifying the last handler in the pipeline and for each frame obtained I created a thread and sent it to continue with the application's own processing, parsing, insertion in the database, etc.
Perhaps Netty manages some prevention mechanism in the handling of buffers and by delaying the processing the buffer that was storing the frames until identifying the beginning and end of the same was full.
I want to highlight that the error that I had originally in the handling of the buffer could be solved with the advice of @NormanMaurer using skipBytes(...).

huangapple
  • 本文由 发表于 2020年8月14日 03:50:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/63402296.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定