英文:
Handling STX-ETX frame with Netty
问题
需要使用Netty实现以下TCP/IP协议:
消息结构:
消息嵌套在STX-ETX帧中:
STX MESSAGE ETX
0x02 7b20224d... 0x03
消息中不需要对STX和ETX进行转义,因为它们是以JSON格式存在。
转义序列如下:
JSON.stringify ({ "a": "\ x02 \ x03 \ x10" }) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}。"
有关STX、ETX控制代码的更多信息。
消息的长度可能不同,它将具有JSON格式,类似于:
\0x02{"messageID": "Heartbeat"}\0x03
我的想法是将自定义的帧定界符与StringEncoder/StringDecoder结合起来。
对于自定义的帧定界符 -> 使用0x03
作为定界符,并跳过第一个字节(0x02)。
因此,创建了以下FrameDelimiterDecoder
:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf buffFrame = null;
Object frame = super.decode(ctx, buffer);
if (frame instanceof ByteBuf) {
buffFrame = (ByteBuf) frame;
} else {
log.info("frame: {}", frame);
}
if (buffFrame != null) {
buffFrame.writeBytes(buffer.skipBytes(1));
} else {
log.warn("buffer is <null>");
}
return buffFrame;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
}
}
并在初始化中使用它:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new FrameDelimiterDecoder(1024 * 1024, Unpooled.wrappedBuffer(FrameConstant.ETX)));
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
然而,它总是失败并显示:
c.s.netty.init.FrameDelimiterDecoder : java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)
无法理解缺少什么。
如何使用Netty处理STX-ETX帧的请求/响应?
英文:
Have to implement the following TCP/IP protocol implementation with Netty:
Message structure:
The messages are embedded in an STX-ETX frame:
STX MESSAGE ETX
0x02 7b20224d... 0x03
An `escaping` of STX and ETX within the message is not necessary since it is in JSON format
Escape sequence are following:
>JSON.stringify ({"a": "\ x02 \ x03 \ x10"}) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}".
Here is more info about STX, ETX control codes.
Length of the message could be different and it will have JSON format, something like:
>\0x02{"messageID": "Heartbeat"}\0x03
My idea was made a combination of custom Frame delimiter with StringEncoder/StringDecoder.
For custom Frame delimiter -> use 0x03
as a delimiter and skip the first byte (0x02).
So created the following FrameDelimiterDecoder
:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf buffFrame = null;
Object frame = super.decode(ctx, buffer);
if (frame instanceof ByteBuf) {
buffFrame = (ByteBuf) frame;
} else {
log.info("frame: {}", frame);
}
if (buffFrame != null) {
buffFrame.writeBytes(buffer.skipBytes(1));
} else {
log.warn("buffer is <null>");
}
return buffFrame;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
}
}
And use it for initialisation:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new FrameDelimiterDecoder(1024 * 1024, Unpooled.wrappedBuffer(FrameConstant.ETX)));
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
However, it always fails with:
c.s.netty.init.FrameDelimiterDecoder : java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)
Could not understand what is missing there.
How to process STX-ETX frame for request/response with Netty?
答案1
得分: 0
经过尝试和失败,最终解决了这个问题。
重新思考了FrameDelimiterDecoder
的代码,找到了使用字节数组的方法,并最终将其转换为ByteBuf
。我相信可以直接使用缓冲区进行处理,或者使用来自NIO包的ByteBuffer然后进行转换。
对我来说,最简单的方法是:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
boolean inMessage = false;
int size = buffer.readableBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
buffer.readBytes(byteBuffer);
byte[] byteArray = new byte[size - 2];
byte[] data = byteBuffer.array();
int index = 0;
for (byte b : data) {
if (b == FrameConstant.START_OF_TEXT) {
if (!inMessage) {
inMessage = true;
} else {
log.warn("Unexpected STX received!");
}
} else if (b == FrameConstant.END_OF_TEXT) {
if (inMessage) {
inMessage = false;
} else {
log.warn("Unexpected ETX received!");
}
} else {
if (inMessage) {
byteArray[index] = b;
index += 1;
}
}
}
return Unpooled.wrappedBuffer(byteArray);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof InterruptedException) {
log.warn("interrupted exception occurred");
Thread.currentThread().interrupt();
} else {
log.error("FrameDelimiterEncoder exception occurred:", cause);
}
}
}
其中FrameConstant
如下:
@UtilityClass
public class FrameConstant {
public final int START_OF_TEXT = 0x02;
public final int END_OF_TEXT = 0x03;
public final int MAX_FRAME_LENGTH = 1024 * 1024;
}
然后进行初始化:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the delimiter first
pipeline.addLast(getDelimiterDecoder());
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
private FrameDelimiterDecoder getDelimiterDecoder() {
ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[]{FrameConstant.END_OF_TEXT});
return new FrameDelimiterDecoder(FrameConstant.MAX_FRAME_LENGTH, delimiter);
}
}
对于处理程序进行了一些修改:
@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
private final PermissionService permissionService;
private final EntranceService entranceService;
private final Gson gson;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String remoteAddress = ctx.channel().remoteAddress().toString();
String stringMsg = (String) msg;
if (log.isDebugEnabled()) {
log.debug("CLIENT_IP: {}", remoteAddress);
log.debug("CLIENT_REQUEST: {}", stringMsg);
}
if (HEARTBEAT.containsName(stringMsg)) {
HeartbeatResponse heartbeatResponse = buildHeartbeatResponse();
sendResponse(ctx, heartbeatResponse);
}
}
private <T> void sendResponse(ChannelHandlerContext ctx, T response) {
ctx.writeAndFlush(formatResponse(response));
}
private <T> String formatResponse(T response) {
String realResponse = String.format("%s%s%s",
(char) FrameConstant.START_OF_TEXT,
gson.toJson(response),
(char) FrameConstant.END_OF_TEXT);
log.debug("response: {}", realResponse);
return realResponse;
}
}
最终,它会正确地发送格式良好的响应:
英文:
After try and fails finally, solved this issue.
Rethink the code for FrameDelimiterDecoder
and found the way how to do it with an array of bytes and at the end convert to ByteBuf
. I believe it could be done with the buffer directly or to use ByteBuffer from NIO package and then convert.
The simplest for me was:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
boolean inMessage = false;
int size = buffer.readableBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
buffer.readBytes(byteBuffer);
byte[] byteArray = new byte[size - 2];
byte[] data = byteBuffer.array();
int index = 0;
for (byte b : data) {
if (b == FrameConstant.START_OF_TEXT) {
if (!inMessage) {
inMessage = true;
} else {
log.warn("Unexpected STX received!");
}
} else if (b == FrameConstant.END_OF_TEXT) {
if (inMessage) {
inMessage = false;
} else {
log.warn("Unexpected ETX received!");
}
} else {
if (inMessage) {
byteArray[index] = b;
index += 1;
}
}
}
return Unpooled.wrappedBuffer(byteArray);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof InterruptedException) {
log.warn("interrupted exception occurred");
Thread.currentThread().interrupt();
} else {
log.error("FrameDelimiterEncoder exception occurred:", cause);
}
}
}
Where FrameConstant
look like:
@UtilityClass
public class FrameConstant {
public final int START_OF_TEXT = 0x02;
public final int END_OF_TEXT = 0x03;
public final int MAX_FRAME_LENGTH = 1024 * 1024;
}
Then initialize it:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the delimiter first
pipeline.addLast(getDelimiterDecoder());
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
private FrameDelimiterDecoder getDelimiterDecoder() {
ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[]{FrameConstant.END_OF_TEXT});
return new FrameDelimiterDecoder(FrameConstant.MAX_FRAME_LENGTH, delimiter);
}
}
And some modification for handler:
@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
private final PermissionService permissionService;
private final EntranceService entranceService;
private final Gson gson;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String remoteAddress = ctx.channel().remoteAddress().toString();
String stringMsg = (String) msg;
if (log.isDebugEnabled()) {
log.debug("CLIENT_IP: {}", remoteAddress);
log.debug("CLIENT_REQUEST: {}", stringMsg);
}
if (HEARTBEAT.containsName(stringMsg)) {
HeartbeatResponse heartbeatResponse = buildHeartbeatResponse();
sendResponse(ctx, heartbeatResponse);
}
}
private <T> void sendResponse(ChannelHandlerContext ctx, T response) {
ctx.writeAndFlush(formatResponse(response));
}
private <T> String formatResponse(T response) {
String realResponse = String.format("%s%s%s",
(char) FrameConstant.START_OF_TEXT,
gson.toJson(response),
(char) FrameConstant.END_OF_TEXT);
log.debug("response: {}", realResponse);
return realResponse;
}
And finally, it sends correctly formed response back:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论