Netty client stops reading socket data after days or hours
Here's the translation of the provided code:
java 1.8
spring-boot-starter-parent 2.3.2.RELEASE
spring-webflux 5.2.8.RELEASE
spring-boot-starter-reactor-netty 2.3.2.RELEASE
应用程序在 WebSphere Application Server 9 上运行
我的应用程序是一个具有重新连接支持的 Netty 客户端,用于读取发送到套接字的数据。它获取数据并搜索帧的起始和结束分隔符,一旦找到它,就将其发送到下一个处理程序进行处理。几天后或仅几小时后,客户端停止捕获发送到套接字的数据,观察日志文件,唯一的错误是:
```2020-08-13 15: 31: 34,885 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release () was not called before garbage collection```
public void run() {
LOGGER.info("Levantando la aplicacion CAPTURADOR");
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
private void doConnect() {
if (closed) {
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
这是 FrameExtractor 类
* 在 channelActive() 方法中,当通信通道激活时,我们利用它来创建上面提到的缓冲区。
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer();
* 在 channelInactive() 方法中,当通信通道不再活动时,我们释放在 channelRegistered 中创建的缓冲区。
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (null != buf) {
buf = null;
* 构建发送的数据,查找帧的结束标记并将其传递给下一个处理程序
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Level level = ResourceLeakDetector.getLevel();
// 参数 msg 是 Netty 的 ByteBuf。我们将其全部内容添加到我们的 buf 缓冲区的末尾,以累积字节的发送,直到找到帧的结束标记
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// 创建一个新的 ByteBuf 来复制帧直到结束标记
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
// 将字节添加到 buf 缓冲区,直到结束标记
// 通过调用 ctx.fireChannelRead(line) 通知下一个处理程序,传递我们的 line 缓冲区。我们不释放 line 缓冲区,因为它是接收它的处理程序的责任。
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
} finally {
// 释放传入的参数 msg 缓冲区。由于我们不再需要它,并且没有传递给任何其他地方,因此我们负责释放它。
这是 ClientHandler 类
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
try {
buffer.writeBytes((ByteBuf) msg);
byte[] bytes = new byte[buffer.readableBytes()];
int readerIndex = buffer.readerIndex();
buffer.getBytes(readerIndex, bytes);
bytes = CapturadorUtils.eliminarParidad(bytes);
String trama = new String(bytes);
CapturadorGenerico capturadorGenerico = trama.contains(Directlink.KEY_DIRECTLINK)
? capturadorFactory.getCapturador(Directlink.getDirectlink())
: capturadorFactory.getCapturador(Microcom.MICROCOM);
capturadorGenerico.parsearTrama(trama, bytes);
} catch (Exception e) {
LOGGER.error("Error producido en el pipe ClientHandler con la trama: " + msg, e);
} finally {
// 释放传入的参数 msg 缓冲区。由于我们不再需要它,并且没有传递给任何其他地方,因此我们负责释放它。
审查代码并分析文档 https://netty.io/wiki/reference-counted-objects.html 后,
I have the following scenario:
java 1.8
spring-boot-starter-parent 2.3.2.RELEASE
spring-webflux 5.2.8.RELEASE
spring-boot-starter-reactor-netty 2.3.2.RELEASE
The application runs on WebSphere Application Server 9
My application is a Netty client with reconnection support that reads the data that is sent to a socket. The data is taken and the start and end delimiters of the frame are searched, once it finds it, it sends it to the next handler which processes the information. After several days or only hours the client stops capturing the data that is sent to the socket, observing the log file the only error is:
2020-08-13 15: 31: 34,885 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release () was not called before garbage collection
This is main class
public void run() {
LOGGER.info("Levantando la aplicacion CAPTURADOR");
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
private void doConnect() {
if (closed) {
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
This is FrameExtractor class
* En el método channelActive() que es al que nos llama netty cuando el canal de comunicación está activo,
* aprovechamos para crear el buffer que mencionamos.
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer();
* En el metodo channelInactive() que es al que nos llama netty cuando el canal de comunicación deja de estar
* activo, aprovechamos para liberar el buffer que creamos @channelRegistered.
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (null != buf) {
buf = null;
* Arma el envio de la medicion buscando el fin de trama y lo pasa al siguiente handler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Level level = ResourceLeakDetector.getLevel();
// El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
// nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
// Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
// Avisamos al siguiente handler, pasandole nuestro buffer line. No liberamos el buffer line porque es
// responsabilidad del que lo recibe.
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
This is ClientHandler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
try {
buffer.writeBytes((ByteBuf) msg);
byte[] bytes = new byte[buffer.readableBytes()];
int readerIndex = buffer.readerIndex();
buffer.getBytes(readerIndex, bytes);
bytes = CapturadorUtils.eliminarParidad(bytes);
String trama = new String(bytes);
CapturadorGenerico capturadorGenerico = trama.contains(Directlink.KEY_DIRECTLINK)
? capturadorFactory.getCapturador(Directlink.getDirectlink())
: capturadorFactory.getCapturador(Microcom.MICROCOM);
capturadorGenerico.parsearTrama(trama, bytes);
} catch (Exception e) {
LOGGER.error("Error producido en el pipe ClientHandler con la trama: " + msg, e);
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
Reviewing the code and analyzing the documentation https://netty.io/wiki/reference-counted-objects.html I did not find what the error could be. Buffers are released correctly.
Add the log file
2020-08-14 11:23:30,404 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
Is in the lines
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
I change the method
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
// nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
LOGGER.info("Trama recibida: " + data);
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
// Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
// Avisamos al siguiente handler, pasandole nuestro buffer line. No liberamos el buffer line porque es
// responsabilidad del que lo recibe.
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
And still have the error
2020-08-27 16:33:36,256 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected
but now in line
Add the class ClientWithNettyHandlers
`public class ClientWithNettyHandlers extends SpringBootServletInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientWithNettyHandlers.class);
ClientHandler clientHandler;
FrameExtractor frameExtractor;
private volatile EventLoopGroup workerGroup;
private volatile Bootstrap bootstrap;
private volatile boolean closed = false;
private String remoteHost;
private int remotePort;
private String logLevel;
public void run() {
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
private void doConnect() {
if (closed) {
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
public void closeNettyClient() {
System.out.println("Shutting down Netty Client: " + getServerInfo());
public void close() {
closed = true;
Future<?> future = workerGroup.shutdownGracefully();
LOGGER.info("Stopped Tcp Client: " + getServerInfo());
* @return
private String getServerInfo() {
return String.format("RemoteHost=%s RemotePort=%d", remoteHost, remotePort);
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(ClientWithNettyHandlers.class);
* @param args
public static void main(String[] args) {
SpringApplication.run(ClientWithNettyHandlers.class, args);
得分: 1
这会导致您通过 ctx.alloc().buffer();
分配的缓冲区泄漏,因为您在调用 release()
之前替换了 line
ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
The leak is cases by this:
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
This will leak the buffer that you allocated via ctx.alloc().buffer();
as you replace the line
with calling release()
before. You could for example fix this by using:
ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
得分: 0
Finally, I solved the error by modifying the last handler in the pipeline and for each frame obtained I created a thread and sent it to continue with the application's own processing, parsing, insertion in the database, etc. Perhaps Netty manages some prevention mechanism in the handling of buffers and by delaying the processing the buffer that was storing the frames until identifying the beginning and end of the same was full. I want to highlight that the error that I had originally in the handling of the buffer could be solved with the advice of @NormanMaurer using skipBytes(...).
Finally, I solved the error by modifying the last handler in the pipeline and for each frame obtained I created a thread and sent it to continue with the application's own processing, parsing, insertion in the database, etc.
Perhaps Netty manages some prevention mechanism in the handling of buffers and by delaying the processing the buffer that was storing the frames until identifying the beginning and end of the same was full.
I want to highlight that the error that I had originally in the handling of the buffer could be solved with the advice of @NormanMaurer using skipBytes(...).