什么是TCP拆包、粘包?

在网络通信中,数据在底层都是以字节流形式在流动,那么发送方和接受方理应有一个约定(协议),只有这样接受方才知道需要接受多少数据,哪些数据需要在一起处理;如果没有这个约定,就会出现本应该一起处理的数据,被TCP划分为多个包发给接收方进行处理,如下图:

看一个TCP拆包、粘包的实例

客户端Handler:

服务端Handler:

运行结果:

上面的程序本意是CLIENT发送3次消息给SERVER,SERVER端理应处理3次,可是结果SERVER却将3条消息一次处理了。

那么如何解决TCP拆包、粘包问题呢?其实思路不外乎有3种:

第一种:发定长数据

接收方拿固定长度的数据,发送方发送固定长度的数据即可。但是这样的缺点也是显而易见的:如果发送方的数据长度不足,需要补位,浪费空间。

第二种:在包尾部增加特殊字符进行分割

发送方发送数据时,增加特殊字符;在接收方以特殊字符为准进行分割

第三种:自定义协议

类似于HTTP协议中的HEAD信息,比如我们也可以在HEAD中,告诉接收方数据的元信息(数据类型、数据长度等)

Netty如何解决TCP拆包、粘包问题?

中,涉及到了JAVA SOCKET这方面的处理,大家可以参考。接下来,我们来看Netty这个框架是如何帮助我们解决这个问题的。本篇博客的代码在基础上进行。

方式一:定长消息

Server启动类:

Client Handler:

运行结果:

利用FixedLengthFrameDecoder,加入到管道流处理中,长度够了接收方才能收到。

方式二:自定义分隔符

Server启动类:

Client Handler:

运行结果:

方式三:自定义协议

下面我们将简单实现一个自定义协议:

HEAD信息中包含:数据长度、数据版本

数据内容

MyHead

public class MyHead {    //数据长度    private int length;    //数据版本    private int version;    public MyHead(int length, int version) {        this.length = length;        this.version = version;    }    public int getLength() {        return length;    }    public void setLength(int length) {        this.length = length;    }    public int getVersion() {        return version;    }    public void setVersion(int version) {        this.version = version;    }}

MyMessage

public class MyMessage {    //消息head    private MyHead head;    //消息body    private String content;    public MyMessage(MyHead head, String content) {        this.head = head;        this.content = content;    }    public MyHead getHead() {        return head;    }    public void setHead(MyHead head) {        this.head = head;    }    public String getContent() {        return content;    }    public void setContent(String content) {        this.content = content;    }    @Override    public String toString() {        return String.format("[length=%d,version=%d,content=%s]",head.getLength(),head.getVersion(),content);    }}

编码器

/** * Created by Administrator on 17-1-9. * 编码器 将自定义消息转化成ByteBuff */public class MyEncoder extends MessageToByteEncoder
 {    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception {        int length = myMessage.getHead().getLength();        int version = myMessage.getHead().getVersion();        String content = myMessage.getContent();        byteBuf.writeInt(length);        byteBuf.writeInt(version);        byteBuf.writeBytes(content.getBytes(Charset.forName("UTF-8")));    }}

×××

/** * Created by Administrator on 17-1-9. * ×××  将ByteBuf数据转化成自定义消息 */public class MyDecoder extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {        int length = byteBuf.readInt();        int version = byteBuf.readInt();        byte[] body = new byte[length];        byteBuf.readBytes(body);        String content = new String(body, Charset.forName("UTF-8"));        MyMessage myMessage = new MyMessage(new MyHead(length,version),content);        list.add(myMessage);    }}

Server启动类

public class Main {    public static void main(String[] args) {        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)        EventLoopGroup workerGroup = new NioEventLoopGroup(); // (2)        int port = 8867;        try {            ServerBootstrap b = new ServerBootstrap(); // (3)            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class) // (4)                    .childHandler(new ChannelInitializer
() { // (5)                @Override                public void initChannel(SocketChannel ch) throws Exception {                    ch.pipeline().addLast(new MyEncoder())                    .addLast(new MyDecoder())                    .addLast(new ServerHandler());                }            })            .option(ChannelOption.SO_BACKLOG, 128)          // (6)            .childOption(ChannelOption.SO_KEEPALIVE, true); // (7)            // Bind and start to accept incoming connections.            ChannelFuture f = b.bind(port).sync(); // (8)            // Wait until the server socket is closed.            // In this example, this does not happen, but you can do that to gracefully            // shut down your server.            System.out.println("start server....");            f.channel().closeFuture().sync();            System.out.println("stop server....");        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();            System.out.println("exit server....");        }    }}

Server Handler

public class ServerHandler  extends ChannelHandlerAdapter {    //每当从客户端收到新的数据时,这个方法会在收到消息时被调用    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        MyMessage in = (MyMessage) msg;        try {            // Do something with msg            System.out.println("server get :" + in);        } finally {            //ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放            //or ((ByteBuf)msg).release();            ReferenceCountUtil.release(msg);        }    }    //exceptionCaught()事件处理方法是当出现Throwable对象才会被调用    //当Netty由于IO错误或者处理器在处理事件时抛出的异常时    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();    }}

Client启动类

public class Client {    public static void main(String[] args) {        EventLoopGroup group = new NioEventLoopGroup();        try {              Bootstrap b = new Bootstrap();              b.group(group)               .channel(NioSocketChannel.class)               .handler(new ChannelInitializer
() {                   @Override                   public void initChannel(SocketChannel ch) throws Exception {                       ChannelPipeline p = ch.pipeline();                       p.addLast(new MyDecoder());                       p.addLast(new MyEncoder());                       p.addLast(new ClientHandler());                   }               });              // Start the client.              ChannelFuture f = b.connect("127.0.0.1", 8867).sync();              // Wait until the connection is closed.              f.channel().closeFuture().sync();          } catch (InterruptedException e) {            e.printStackTrace();        } finally {              // Shut down the event loop to terminate all threads.              group.shutdownGracefully();          }    }}

Client Handler

public class ClientHandler extends ChannelHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        ctx.writeAndFlush(new MyMessage(new MyHead("abcd".getBytes("UTF-8").length,1),"abcd"));    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf in = (ByteBuf) msg;        try {            // Do something with msg            System.out.println("client get :" + in.toString(CharsetUtil.UTF_8));            ctx.close();        } finally {            //ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放            //or ((ByteBuf)msg).release();            ReferenceCountUtil.release(msg);        }    }}

运行结果

到这里,你会发现Netty处理TCP拆包、粘包问题很简单,通过编解码技术支持,让我们编写自定义协议也很方便,在后续的Netty博客中,我将继续为大家介绍Netty在实际中的一些应用(比如实现心跳检测),See You~