在平常使用tcp连接时,可能会出现tcp粘包的现象。
在客户端与服务端建立连接后,客户端提交数据后,如果发送的数据较大,该数据包会被拆分为几个小的包进行发送,如果数据比较小,可能会等待一会,把后续的多个小包封装成一个大点的包一起发送。服务端获取客户端的数据可能出现如下情况:
服务端发送A、B两个包
- A、B分开发送,服务端正常分两次收到两个包
- A、B被一起发送、服务端一次收到两个包AB
- A被拆分为A1、A2、可能出现服务端收到A1、A2B(或者B被拆开,B拆开的包和A粘在一起,或者AB都被拆开组合)
除了第一种是正常的,后面两种都会出现问题。
Netty使用
maven项目:
| 12
 3
 4
 5
 6
 
 | <dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-all</artifactId>
 <version>4.0.23.Final</version>
 </dependency>
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 
 | public class TimeServer {public static void main(String[] args) throws InterruptedException {
 new TimeServer().bind(9999);
 }
 
 public void bind(int port) throws InterruptedException {
 
 NioEventLoopGroup bossGroup = new NioEventLoopGroup();
 
 NioEventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap bootstrap = new ServerBootstrap();
 bootstrap.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 1024)
 .childHandler(new ChannelInitializer<SocketChannel>() {
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(new TimeServerHandler());
 }
 });
 ChannelFuture channelFuture = bootstrap.bind(port).sync();
 channelFuture.channel().closeFuture().sync();
 } finally {
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
 }
 public class TimeServerHandler extends ChannelInboundHandlerAdapter {
 private int count;
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 ByteBuf buf = (ByteBuf) msg;
 byte[] req = new byte[buf.readableBytes()];
 buf.readBytes(req);
 String request = new String(req, "utf-8");
 System.out.printf("The time server receive other: %s ,count is %d\n", request, ++count);
 String currentTime = "QUERY TIME".equalsIgnoreCase(request) ? format.format(new Date()) : "ERROR";
 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
 
 ctx.write(resp);
 }
 
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 
 
 ctx.flush();
 }
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 ctx.close();
 }
 }
 public class TimeClient {
 public static void main(String[] args) throws InterruptedException {
 new TimeClient().connect("127.0.0.1", 9999);
 }
 
 public void connect(String host, int port) throws InterruptedException {
 EventLoopGroup group = new NioEventLoopGroup();
 Bootstrap bootstrap = new Bootstrap();
 try {
 bootstrap.group(group).channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>() {
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(new TimeClientHandler());
 }
 });
 ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
 channelFuture.channel().closeFuture().sync();
 } finally {
 group.shutdownGracefully();
 }
 }
 }
 public class TimeClientHandler extends ChannelInboundHandlerAdapter{
 private final ByteBuf firstMessage;
 
 public TimeClientHandler() {
 byte[] req = "QUERY TIME".getBytes();
 firstMessage = Unpooled.buffer(req.length);
 firstMessage.writeBytes(req);
 }
 
 
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
 ctx.writeAndFlush(firstMessage);
 }
 
 int count;
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 ByteBuf buf = (ByteBuf) msg;
 byte[] req = new byte[buf.readableBytes()];
 buf.readBytes(req);
 String body = new String(req, "utf-8");
 System.out.printf("Now is %s count is %d\n", body, ++count);
 }
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 ctx.close();
 }
 }
 
 | 
正常启动Server后启动Client,不会出现问题。
修改Client出现粘包
修改TimeClientHandler.channelActive
| 12
 3
 4
 5
 6
 7
 
 | ByteBuf message;for (int i = 0; i < 100; i++) {
 byte[] req = "QUERY TIME".getBytes();
 message = Unpooled.buffer(req.length);
 message.writeBytes(req);
 ctx.writeAndFlush(message);
 }
 
 | 
重新启动客户端,会发现服务端打印的数据有问题,客户端打印的返回值也有问题。这样一位部分数据被粘在一起了,服务端返回给客户端同理。
一位对于小包而言,在发送数据的时候,可能会等待后续包一起发送,有个等待时间,如果客户端发送数据后设置等待时间会出现什么情况,如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | ByteBuf message;
 for (int i = 0; i < 100; i++) {
 byte[] req = "QUERY TIME".getBytes();
 message = Unpooled.buffer(req.length);
 message.writeBytes(req);
 ctx.writeAndFlush(message);
 TimeUnit.MILLISECONDS.sleep(100);
 }
 
 | 
重新启动客户端,发现服务端获取的数据正常了,但是出现了客户端收到的服务端返回的数据时异常的,因为我们在Server代码中设置了ChannelOption.SO_BACKLOG为1024,可以打印客户端第一次获取的返回数据长度,刚刚好是1024。
解决出现粘包的情况
如果不使用netty的粘包解决办法需要自己手动设置ltv(length, type, value),在发送消息时,在包的头部加入包长度,包类型,后面才是具体的数据。这样在获取tcp包时通过包头部去获取后续包的内容。或者在不同的消息间加入特殊分隔符(入http使用的是/r/n/r/n)
通过LineBasedFrameDecoder解决
修改TimeServer.bind中bootstrap.childHandler中匿名内部类代码:
| 12
 3
 4
 5
 6
 7
 8
 
 | @Overrideprotected void initChannel(SocketChannel ch) throws Exception {
 
 ch.pipeline().addLast(new LineBasedFrameDecoder(1024))
 .addLast(new StringDecoder())
 .addLast(new StringEncoder())
 .addLast(new TimeServerHandler());
 }
 
 | 
修改TimeServerHandler.channelRead:
| 12
 3
 4
 5
 
 | SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.printf("The time server receive other: %s ,count is %d\n", msg, ++count);
 String currentTime = "QUERY TIME".equalsIgnoreCase((String)msg) ? format.format(new Date()) : "ERROR";
 
 ctx.write(currentTime + System.getProperty("line.separator"));
 
 | 
修改TimeClient.connect中bootstrap.childHandler中匿名内部类代码:
| 12
 3
 4
 
 | ch.pipeline().addLast(new LineBasedFrameDecoder(1024)).addLast(new StringDecoder())
 .addLast(new StringEncoder())
 .addLast(new TimeClientHandler());
 
 | 
修改TimeClientHandler.channelActive:
| 12
 3
 4
 
 | for (int i = 0; i < 100; i++) {
 ctx.writeAndFlush("QUERY TIME" + System.getProperty("line.separator"));
 }
 
 | 
修改TimeClientHandler.channelRead:
| 12
 
 | 直接输出System.out.printf("Now is %s count is %d\n", msg, ++count);
 
 | 
启动服务端、客户端,访问正常。
- 这里需要注意的我们设置的LineBasedFrameDecoder接受的长度是1024,如果超过这个长度会报错。
这里通过LineBasedFrameDecoder、StringDecoder、StringEncoder组合就是按行来做粘包拆包处理,在发送的字符串末尾添加换行符。
除了使用LineBasedFrameDecoder解决之前的问题,在Netty中还提供了两种方案:
1、DelimiterBasedFrameDecoder:自动完成以分隔符作为码流结束标识的消息解码。
2、FixedLengthFrameDecoder:固定长度解码器,按照指定长度对消息进行自动解码。
对于DelimiterBasedFrameDecoder而言如果在设定的长度下还没有获取到分隔符一样会抛出异常,避免因为异常码流导致缺失分隔符。
对于FixedLengthFrameDecoder而言,如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下一个包到达后进行拼包,直到读取到一个完整的包。