在平常使用tcp连接时,可能会出现tcp粘包的现象。
在客户端与服务端建立连接后,客户端提交数据后,如果发送的数据较大,该数据包会被拆分为几个小的包进行发送,如果数据比较小,可能会等待一会,把后续的多个小包封装成一个大点的包一起发送。服务端获取客户端的数据可能出现如下情况:
服务端发送A、B两个包
- A、B分开发送,服务端正常分两次收到两个包
- A、B被一起发送、服务端一次收到两个包AB
- A被拆分为A1、A2、可能出现服务端收到A1、A2B(或者B被拆开,B拆开的包和A粘在一起,或者AB都被拆开组合)
除了第一种是正常的,后面两种都会出现问题。
Netty使用
maven项目:
1 2 3 4 5 6
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.23.Final</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| public class TimeServer { public static void main(String[] args) throws InterruptedException { new TimeServer().bind(9999); }
public void bind(int port) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } }); ChannelFuture channelFuture = bootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } public class TimeServerHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String request = new String(req, "utf-8"); System.out.printf("The time server receive other: %s ,count is %d\n", request, ++count); String currentTime = "QUERY TIME".equalsIgnoreCase(request) ? format.format(new Date()) : "ERROR"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } public class TimeClient { public static void main(String[] args) throws InterruptedException { new TimeClient().connect("127.0.0.1", 9999); }
public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } } public class TimeClientHandler extends ChannelInboundHandlerAdapter{ private final ByteBuf firstMessage;
public TimeClientHandler() { byte[] req = "QUERY TIME".getBytes(); firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(firstMessage); }
int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.printf("Now is %s count is %d\n", body, ++count); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
|
正常启动Server后启动Client,不会出现问题。
修改Client出现粘包
修改TimeClientHandler.channelActive
1 2 3 4 5 6 7
| ByteBuf message; for (int i = 0; i < 100; i++) { byte[] req = "QUERY TIME".getBytes(); message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); }
|
重新启动客户端,会发现服务端打印的数据有问题,客户端打印的返回值也有问题。这样一位部分数据被粘在一起了,服务端返回给客户端同理。
一位对于小包而言,在发送数据的时候,可能会等待后续包一起发送,有个等待时间,如果客户端发送数据后设置等待时间会出现什么情况,如下:
1 2 3 4 5 6 7 8 9
| ByteBuf message; for (int i = 0; i < 100; i++) { byte[] req = "QUERY TIME".getBytes(); message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); TimeUnit.MILLISECONDS.sleep(100); }
|
重新启动客户端,发现服务端获取的数据正常了,但是出现了客户端收到的服务端返回的数据时异常的,因为我们在Server代码中设置了ChannelOption.SO_BACKLOG为1024,可以打印客户端第一次获取的返回数据长度,刚刚好是1024。
解决出现粘包的情况
如果不使用netty的粘包解决办法需要自己手动设置ltv(length, type, value),在发送消息时,在包的头部加入包长度,包类型,后面才是具体的数据。这样在获取tcp包时通过包头部去获取后续包的内容。或者在不同的消息间加入特殊分隔符(入http使用的是/r/n/r/n)
通过LineBasedFrameDecoder解决
修改TimeServer.bind中bootstrap.childHandler中匿名内部类代码:
1 2 3 4 5 6 7 8
| @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new TimeServerHandler()); }
|
修改TimeServerHandler.channelRead:
1 2 3 4 5
| SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.printf("The time server receive other: %s ,count is %d\n", msg, ++count); String currentTime = "QUERY TIME".equalsIgnoreCase((String)msg) ? format.format(new Date()) : "ERROR";
ctx.write(currentTime + System.getProperty("line.separator"));
|
修改TimeClient.connect中bootstrap.childHandler中匿名内部类代码:
1 2 3 4
| ch.pipeline().addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new TimeClientHandler());
|
修改TimeClientHandler.channelActive:
1 2 3 4
| for (int i = 0; i < 100; i++) { ctx.writeAndFlush("QUERY TIME" + System.getProperty("line.separator")); }
|
修改TimeClientHandler.channelRead:
1 2
| 直接输出 System.out.printf("Now is %s count is %d\n", msg, ++count);
|
启动服务端、客户端,访问正常。
- 这里需要注意的我们设置的LineBasedFrameDecoder接受的长度是1024,如果超过这个长度会报错。
这里通过LineBasedFrameDecoder、StringDecoder、StringEncoder组合就是按行来做粘包拆包处理,在发送的字符串末尾添加换行符。
除了使用LineBasedFrameDecoder解决之前的问题,在Netty中还提供了两种方案:
1、DelimiterBasedFrameDecoder:自动完成以分隔符作为码流结束标识的消息解码。
2、FixedLengthFrameDecoder:固定长度解码器,按照指定长度对消息进行自动解码。
对于DelimiterBasedFrameDecoder而言如果在设定的长度下还没有获取到分隔符一样会抛出异常,避免因为异常码流导致缺失分隔符。
对于FixedLengthFrameDecoder而言,如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下一个包到达后进行拼包,直到读取到一个完整的包。