Netty 自定义解码器处理半包消息

概述

在李林锋的Netty系列之Netty编解码框架分析中介绍了各种解码器,也推荐组合

LengthFieldBasedFrameDecoder
ByteToMessageDecoder

这两个解码器来处理业务消息。但是有时候为了灵活性,会直接选择继承

ByteToMessageDecoder

来处理业务消息,但是直接继承ByteToMessageDecoder,则需要自己处理半包问题。在李林锋的【netty权威指南】中,并没有描述如何自定义解码器来处理半包消息。下文会介绍这方面的知识。

在阅读本文内容之前,你至少需要了解以下两个知识点

1、 netty的ByteBuf类的基本api用法
2、什么是TCP半包

虽然JAVA NIO中也有个ByteBuffer类,但是在Netty程序中,基本都是直接用Netty的ByteBuf类,它包装了更多好用的接口,降低了使用缓冲区类的难度。

之前本人写过几篇关于处理半包消息的文章,读者也可以参看一下

自定义消息协议

目前自定义的消息协议用的最多的是在消息中头四个字节保存消息的长度,格式大概如下

  1. len : 表示消息的长度,通常用4个字节保存
  2. head : 消息头部
  3. body : 消息内容

无论每次请求的业务数据多大,都是使用上面的消息格式来表示的。

注意

在实际的项目中,消息格式可能会增加一些标志,例如,开始标记,结束标志,消息序列号,消息的协议类型(json或者二进制等),这里为了描述的方便,就不讲附加的这些消息标志了。

自定义解码器处理半包数据

如上描述,直接继承ByteToMessageDecoder类,同时覆盖其decode方法,完整实现代码如下

服务端代码

package nettyinaction.encode.lengthfield.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new SocketServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}





package nettyinaction.encode.lengthfield.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class SocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SelfDefineEncodeHandler());
        pipeline.addLast(new BusinessServerHandler());
    }
}







package nettyinaction.encode.lengthfield.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
        if (bufferIn.readableBytes() < 4) {
            return;
        }

        int beginIndex = bufferIn.readerIndex();
        int length = bufferIn.readInt();

        if ((bufferIn.readableBytes()+1) < length) {
            bufferIn.readerIndex(beginIndex);
            return;
        }

        bufferIn.readerIndex(beginIndex + 4 + length);

        ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);

        otherByteBufRef.retain();

        out.add(otherByteBufRef);
    }
}





package nettyinaction.encode.lengthfield.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class BusinessServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        int length = buf.readInt();
        assert length == (8);

        byte[] head = new byte[4];
        buf.readBytes(head);
        String headString = new String(head);
        assert  "head".equals(headString);

        byte[] body = new byte[4];
        buf.readBytes(body);
        String bodyString = new String(body);
        assert  "body".equals(bodyString);
    }
}

客户端代码

package nettyinaction.encode.lengthfield.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new SocketClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}





package nettyinaction.encode.lengthfield.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class SocketClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SocketClientHandler());
    }
}





package nettyinaction.encode.lengthfield.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf buffer = allocator.buffer(20);
        buffer.writeInt(8);
        buffer.writeBytes("head".getBytes());
        buffer.writeBytes("body".getBytes());

        ctx.writeAndFlush(buffer);
    }
}

客户端一旦启动,会发送一条长度为8的消息到服务端,服务端首先使用SelfDefineEncodeHandler类对消息进行解码,处理半包问题。如果消息是有效的完整的消息,当SelfDefineEncodeHandler处理完消息后,会把消息转发给BusinessServerHandler处理,BusinessServerHandler只是简单的做个验证,判断消息内容是否符合预期。

运行上面的代码,代码如预期那样,可以正确的读取到消息并解析消息。

这个例子中,最为核心的类就是SelfDefineEncodeHandler了。里面用了很多的技巧,要理解里面的每行代码,需要分两种情况来分析,分别是拆包和粘包

下面分别以拆包和粘包做两个小试验,来验证SelfDefineEncodeHandler是否能正常的处理半包问题。

拆包试验

先调整一下SocketClientHandler类中的channelActive方法中的代码,将body扩大几十倍,逼迫TCP发几次请求到达服务端,看看服务端的SelfDefineEncodeHandler能否正常处理。

package nettyinaction.encode.lengthfield.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new SocketClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}





package nettyinaction.encode.lengthfield.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class SocketClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SocketClientHandler());
    }
}





package nettyinaction.encode.lengthfield.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf buffer = allocator.buffer(20);
        buffer.writeInt(8);
        buffer.writeBytes("head".getBytes());
        buffer.writeBytes("body".getBytes());

        ctx.writeAndFlush(buffer);
    }
}

使用一个for循环,将消息body的长度设置为1600,加上长度为4的head,总共消息长度为1604。

然后调整一下服务端类SelfDefineEncodeHandler类的代码,加上三行代码。
第一行代码是加入一个类变量count,统计一下decode方法的调用次数

private static int count = 0;

接着在decode方法中加入三行代码

System.out.println("decode call count="+ ++count);
System.out.println("bufferIn.readableBytes()="+bufferIn.readableBytes());
System.out.println("beginIndex="+beginIndex);

打印出count和bufferIn.readableBytes()的大小以及beginIndex

最后在BusinessServerHandler类加入

private static int count = 0;

成员变量以及在channelRead方法中加入

System.out.println("BusinessServerHandler call count="+ ++count);

运行代码,打印结果如下

decode call count=1
bufferIn.readableBytes()=1024
beginIndex=0

decode call count=2
bufferIn.readableBytes()=1608
beginIndex=0

BusinessServerHandler call count=1

这个结果说明了,虽然客户端只是发送了一条消息,但是其实TCP底层是分两个包发送给服务端,第一次发送了1024个字节,后面的一次请求,才把消息剩下的内容发送给服务端。

虽然decode方法被调用了两次,但是第一次读取到的信息不完整,因此ByteToMessageDecoder会静静的等待另外一个包的到来,第二次读取完整消息后,才把消息转发给BusinessServerHandler类,从打印的结果看,
BusinessServerHandler类的channelRead方法只被调用了一次。

到此我们知道SelfDefineEncodeHandler类的decode方法是可以应付拆包问题的,那到底是如何做到的呢?现在我们回头仔细看看decode方法中的代码。

第一部分代码

if (bufferIn.readableBytes() < 4) {
            return;
}

如果接收到的字节还不到4个字节,也即是连消息长度字段中的内容都不完整的,直接return。

第二部分代码

 int beginIndex = bufferIn.readerIndex();
 int length = bufferIn.readInt();

 if ((bufferIn.readableBytes()+1) < length) {
      bufferIn.readerIndex(beginIndex);
      return;
 }

对于拆包这种场景,由于还未读取到完整的消息,(bufferIn.readableBytes()+1) 会小于length,并重置bufferIn的readerIndex为0,然后退出,ByteToMessageDecoder会乖乖的等待下个包的到来。

由于第一次调用中readerIndex被重置为0,那么decode方法被调用第二次的时候,beginIndex还是为0的。

第三部分代码

bufferIn.readerIndex(beginIndex + 4 + length);

将readerIndex设置为最大。首先代码能执行到这里,针对拆包这种场景而言,已经是读取到一条有效完整的消息了。这个时候需要通知ByteToMessageDecoder类,bufferIn中的数据已经读取完毕了,不要再调用decode方法了。ByteToMessageDecoder类的底层会根据bufferIn.isReadable()方法来判断是否读取完毕。只有将readerIndex设置为最大,bufferIn.isReadable()方法才会返回false。

第四部分代码

ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);

当decode方法执行完后,会释放bufferIn这个缓冲区,如果将执行完释放操作的bufferIn传递给下个处理器的话,一旦下个处理器调用bufferIn的读或者写的方法时,会立刻报出IllegalReferenceCountException异常的。

因此slice操作后,必须加上一个retain操作,让bufferIn的引用计数器加1,这样ByteToMessageDecoder会刀下留人,先不释放bufferIn。

粘包试验

首先将SocketClientHandler类中的channelActive方法的实现改为

for (int i = 0; i < 20; i++) {
    UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
    ByteBuf buffer = allocator.buffer(20);
    buffer.writeInt(8);
    buffer.writeBytes("head".getBytes());
    buffer.writeBytes("body".getBytes());

    ctx.writeAndFlush(buffer);
}

客户端发起20个请求到服务器端。

接着注释掉SocketServerInitializer类中的

pipeline.addLast(new SelfDefineEncodeHandler());

代码,使请求不走SelfDefineEncodeHandler解码器。

运行代码,执行结果如下

BusinessServerHandler call count=1

说明客户端发送了粘包,服务端只接收到一次请求。现在把代码调整回来,走SelfDefineEncodeHandler解码器,运行代码,执行效果如下

decode call count=1
bufferIn.readableBytes()=240
beginIndex=0
BusinessServerHandler call count=1

decode call count=2
bufferIn.readableBytes()=228
beginIndex=12
BusinessServerHandler call count=2

decode call count=3
bufferIn.readableBytes()=216
beginIndex=24
BusinessServerHandler call count=3

decode call count=4
bufferIn.readableBytes()=204
beginIndex=36
BusinessServerHandler call count=4

decode call count=5
bufferIn.readableBytes()=192
beginIndex=48
BusinessServerHandler call count=5

decode call count=6
bufferIn.readableBytes()=180
beginIndex=60
BusinessServerHandler call count=6

decode call count=7
bufferIn.readableBytes()=168
beginIndex=72
BusinessServerHandler call count=7

decode call count=8
bufferIn.readableBytes()=156
beginIndex=84
BusinessServerHandler call count=8

decode call count=9
bufferIn.readableBytes()=144
beginIndex=96
BusinessServerHandler call count=9

decode call count=10
bufferIn.readableBytes()=132
beginIndex=108
BusinessServerHandler call count=10

decode call count=11
bufferIn.readableBytes()=120
beginIndex=120
BusinessServerHandler call count=11

decode call count=12
bufferIn.readableBytes()=108
beginIndex=132
BusinessServerHandler call count=12

decode call count=13
bufferIn.readableBytes()=96
beginIndex=144
BusinessServerHandler call count=13

decode call count=14
bufferIn.readableBytes()=84
beginIndex=156
BusinessServerHandler call count=14

decode call count=15
bufferIn.readableBytes()=72
beginIndex=168
BusinessServerHandler call count=15

decode call count=16
bufferIn.readableBytes()=60
beginIndex=180
BusinessServerHandler call count=16

decode call count=17
bufferIn.readableBytes()=48
beginIndex=192
BusinessServerHandler call count=17

decode call count=18
bufferIn.readableBytes()=36
beginIndex=204
BusinessServerHandler call count=18

decode call count=19
bufferIn.readableBytes()=24
beginIndex=216
BusinessServerHandler call count=19

decode call count=20
bufferIn.readableBytes()=12
beginIndex=228
BusinessServerHandler call count=20

结果符合预期,客户端发送20次,服务端BusinessServerHandler类的channelRead执行了20次。SelfDefineEncodeHandler类是如何做到这一点的呢?还是得回头仔细看看decode方法。

第一部分代码

if (bufferIn.readableBytes() < 4) {
            return;
}

如果接收到的字节还不到4个字节,也即是连消息长度字段中的内容都不完整的,直接return。

第二部分代码

 int beginIndex = bufferIn.readerIndex();
 int length = bufferIn.readInt();

 if ((bufferIn.readableBytes()+1) < length) {
      bufferIn.readerIndex(beginIndex);
      return;
 }

由于客户端发送了粘包,decode方法将会接收到一条聚合了多条业务消息的大消息,因此(bufferIn.readableBytes()+1)肯定大于length, bufferIn的readerIndex不会被重置。只是decode方法每被执行一次,beginIndex将会递增12,也即是(length+4)。

第三部分代码

bufferIn.readerIndex(beginIndex + 4 + length);

对于粘包这种场景,这行代码就不是表示将readerIndex升到最高,而是将readerIndex后移(length+4)位,让beginIndex递增(length+4)。

第四部分代码

ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);

slice操作,目的是从大消息中截取出一条有效的业务消息。

参考的文章



相关文章

发表评论

Comment form

(*) 表示必填项

还没有评论。

跳到底部
返回顶部