背景

最近消息中间件项目进行联调,我负责Server端,使用Java的Netty框架。同事负责Client端,使用Go的net包,消息使用Protobuf序列化。联调时Client发送的消息Server端解析出错,经过分析发现是Server与Client粘包处理方式不一致导致,Server使用的是Protobuf提供的粘包处理方式,Client使用的是消息头定义长度的处理方式,探索一下Protobuf粘包处理方式有何不同。

编码类

public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
    @Override
    protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = computeRawVarint32Size(bodyLen);
        out.ensureWritable(headerLen + bodyLen);
        writeRawVarint32(out, bodyLen);
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }
    /**
     * Writes protobuf varint32 to (@link ByteBuf).
     * @param out to be written to
     * @param value to be written
     */
    static void writeRawVarint32(ByteBuf out, int value) {
        while (true) {
            if ((value & ~0x7F) == 0) {
                out.writeByte(value);
                return;
            } else {
                out.writeByte((value & 0x7F) | 0x80);
                value >>>= 7;
            }
        }
    }
    /**
     * Computes size of protobuf varint32 after encoding.
     * @param value which is to be encoded.
     * @return size of value encoded as protobuf varint32.
     */
    static int computeRawVarint32Size(final int value) {
        if ((value & (0xffffffff <<  7)) == 0) {
            return 1;
        }
        if ((value & (0xffffffff << 14)) == 0) {
            return 2;
        }
        if ((value & (0xffffffff << 21)) == 0) {
            return 3;
        }
        if ((value & (0xffffffff << 28)) == 0) {
            return 4;
        }
        return 5;
    }
}

encode()方法

protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        // 获取消息长度
	int bodyLen = msg.readableBytes(); 
	// 计算表示消息体长度所需的字节数量
        int headerLen = computeRawVarint32Size(bodyLen);
	// 拿到所有需要写入的数据长度,对缓冲区进行扩容
        out.ensureWritable(headerLen + bodyLen);
	// 将表示消息体长度的字节写入缓冲区
        writeRawVarint32(out, bodyLen);
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

writeRawVarint32()方法

先看value & ~0x7F(value & 0x7F) | 0x80value >>>= 7这几个看不懂的地方,&|~>>>=这些符号为计算机的位运算符号,分别代表与、或、非、忽略符号位右移(a>>>=n 相当于 a = a>>>n

计算value & ~0x7F

分别假设value值为100200

100转二进制为01100100200转二进制为11001000

计算100 & ~0x7F

十进制 十六进制 运算符
100 0x64 0 1 1 0 0 1 0 0
-128 ~0x7f & 1 0 0 0 0 0 0 0
0 0x00 0 0 0 0 0 0 0 0

计算200 & ~0x7F

十进制 十六进制 运算符
200 0xc8 1 1 0 0 1 0 0 0
-128 ~0x7f & 1 0 0 0 0 0 0 0
128 0x80 1 0 0 0 0 0 0 0

这里运算结果使用十进制表示二进制是不准确的,仅作参考,需要根据数据类型进行转换,比如:10000000转换为byte类型是-128,转换为int是128

通过以上计算可以看出:

可以使用小于7个位表示的数字即可满足条件,7个位可以表示$2^7=128$个数字,取值范围是0~127,也就是说0~127可以满足条件,这一步的目的是保证写入表示消息体长度的最后一位字节是正数,后面会说到。

value=100满足条件,所以向bytebuf中写入字节01100100,然后return方法结束。

value=200不满足条件,那么看(value & 0x7F) | 0x80这一步运算。

计算(value & 0x7F) | 0x80

十进制 十六进制 运算符
200 0xc8 1 1 0 0 1 0 0 0
127 0x7f & 0 1 1 1 1 1 1 1
- - - - - - - - - - -
72 0x48 0 1 0 0 1 0 0 0
128 0x80 | 1 0 0 0 0 0 0 0
- - - - - - - - - - -
200 0xc8 1 1 0 0 1 0 0 0

计算结果还是200,我们分析一下步骤:

value & 0x7f:取出最后七个位,|0x80:将首位转为1

即取出最后7个位,高位补1,正好一个字节的长度,将11001000写入bytebuf,再看value >>>= 7

计算value >>>= 7

十进制 运算符
200 1 1 0 0 1 0 0 0
>>> 1 1 0 0 1 0 0 0
1 0 0 0 0 0 0 0 1

忽略符号右移其实就是将后7位挤出去,在前边补7个0。

计算机中一般首位是符号位,0表示正数,1表示负数。

这里需要注意是>>表示右移,不改变符号,最高位与原来符号保持一致。 >>>是忽略符号位右移,最高位补0。

200 >>>= 7的结果为00000001,继续走if判断,满足条件,将00000001写入bytebuf

最终value=200写入bytebuf的字节是1100100000000001

至此,三个看不懂的位运算都理解了,那么我们连起来看一下:

如果value可以用7个字节表示(或者说是value在0~127范围内),将value转换为字节写入bytebuf,跳出循环,方法结束。

如果value不能用7个字节表示(或者说是value不在0~127范围内),取最后7个位,高位补1,写入bytebuf中,右移7位(将刚才取出的7位删掉),再次判断是否满足if条件,不满足就继续上面的操作,直到满足条件为止。

总结一下writeRawVarint32方法,其实是把一个整数拆分成多个字节,倒序写入bytebuf中,如果将每个字节转换为byte类型,最后一个字节总是正数,前面的字节都是负数。我们可以猜测,接收消息时以第一个正数为分割,将表示消息体长度的字节与消息体字节拆分开,再通过位运算将前者组合起来就得到了消息体的长度。

computeRawVarint32Size()方法

我们在writeRawVarint32方法分析中了解了位运算,再看computeRawVarint32Size方法就很简单了。

计算机表示负数

0xffffffff转换为二进制是11111111 1111111 11111111 11111111转换为有有符号int类型是-1。为什么是-1?

因为计算机使用二进制可以做加法运算,但是没办法做减法运算,加上一个负数就相当于做了减法运算,现在问题是如何表示负数?

曾经有原码表示法、反码表示法,这里不做赘述,现在使用的是补码表示法。

补码表示法是将正数的二进制取反,然后在最后一位+1。

通过例子看一下:

有符号int类型的1用二进制可以表示为00000000 0000000 00000000 0000001取反得到11111111 11111111 11111111 11111110+1得到11111111 11111111 11111111 11111111转换为十六进制是0xffffffff

计算value & (0xffffffff << 7)

<<表示左移,从左边挤出去7个位,在右边补7个0。

这里仍然假设value分为为100,200。

// 计算(100 & (0xffffffff <<  7))
  00000000 0000000 00000000 01100100 // 100
& 11111111 1111111 11111111 10000000 // 0xffffffff <<  7 
  00000000 0000000 00000000 00000000 // 结果:0
// 计算(200 & (0xffffffff <<  7))
  00000000 0000000 00000000 11001000 // 200
& 11111111 1111111 11111111 10000000 // 0xffffffff <<  7
  00000000 0000000 00000000 10000000 // 结果:128
// 计算(200 & (0xffffffff <<  14))
  00000000 0000000 00000000 11001000 // 200
& 11111111 1111111 11000000 00000000 // 0xffffffff <<  14
  00000000 0000000 00000000 00000000 // 结果:0

从以上计算可以看出,如果value可以用小于7个位来表示,则左移7个位可以满足,如果value可以用8~14个位来表示,左移14个位可以满足。

100、200计算结果分别为1、2,与writeRawVarint32方法写入的字节数量一致。

writeRawVarint32是方法7个7个的取出位,这里按7个位来计算所需字节数量,最终返回表示消息体长度的字节数量。

解码类

public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
    // TODO maxFrameLength + safe skip + fail-fast option
    //      (just like LengthFieldBasedFrameDecoder)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        in.markReaderIndex();
        int preIndex = in.readerIndex();
        int length = readRawVarint32(in);
        if (preIndex == in.readerIndex()) {
            return;
        }
        if (length < 0) {
            throw new CorruptedFrameException("negative length: " + length);
        }
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
        } else {
            out.add(in.readRetainedSlice(length));
        }
    }
    /**
     * Reads variable length 32bit int from buffer
     *
     * @return decoded int if buffers readerIndex has been forwarded else nonsense value
     */
    private static int readRawVarint32(ByteBuf buffer) {
        if (!buffer.isReadable()) {
            return 0;
        }
        buffer.markReaderIndex();
        byte tmp = buffer.readByte();
        if (tmp >= 0) {
            return tmp;
        } else {
            int result = tmp & 127;
            if (!buffer.isReadable()) {
                buffer.resetReaderIndex();
                return 0;
            }
            if ((tmp = buffer.readByte()) >= 0) {
                result |= tmp << 7;
            } else {
                result |= (tmp & 127) << 7;
                if (!buffer.isReadable()) {
                    buffer.resetReaderIndex();
                    return 0;
                }
                if ((tmp = buffer.readByte()) >= 0) {
                    result |= tmp << 14;
                } else {
                    result |= (tmp & 127) << 14;
                    if (!buffer.isReadable()) {
                        buffer.resetReaderIndex();
                        return 0;
                    }
                    if ((tmp = buffer.readByte()) >= 0) {
                        result |= tmp << 21;
                    } else {
                        result |= (tmp & 127) << 21;
                        if (!buffer.isReadable()) {
                            buffer.resetReaderIndex();
                            return 0;
                        }
                        result |= (tmp = buffer.readByte()) << 28;
                        if (tmp < 0) {
                            throw new CorruptedFrameException("malformed varint.");
                        }
                    }
                }
            }
            return result;
        }
    }
}

readRawVarint32()方法

方法就不细看了,验证一下我们之前的猜测。还是使用value=200写入的1100100000000001字节举例看一下。

private static int readRawVarint32(ByteBuf buffer) {
        if (!buffer.isReadable()) {
            return 0;
        }
        buffer.markReaderIndex();
	// 读取第一个字节
        byte tmp = buffer.readByte(); // tmp = 11001000 首位是1,是个负数,小于0
	// 判断是否大于等于0,
	// 大于等于0说明是最后一个表示消息体长度的字节,直接return
        if (tmp >= 0) {
            return tmp;
        } else {
	    // 小于0 tmp & 127 取出后七位
            int result = tmp & 127; // result = 11001000 & 01111111 = 01001000
            if (!buffer.isReadable()) {
                buffer.resetReaderIndex();
                return 0;
            }
	    // 再取第二个字节,判断是否大于等于0 
            if ((tmp = buffer.readByte()) >= 0) { //tmp = 00000001  
		// 这一步操作相当于是把上一步取出的7个字节拿出来拼在tmp<<7的后面
                result |= tmp << 7;
		// result = 01001000 | tmp << 7
		// tmp << 7 = 10000000
                // 01001000 | 10000000 = 11001000 转换为int类型是200
            }
	    // 后面的代码与以上步骤大同小异,不再赘述了
            return result;
        }
    }

总结

涉及到的基础知识:计算机表示整数、位运算、进制转换

一般处理粘包的方式有三种:

  1. 定长消息:每次发送消息的长度固定,比如,总是发送100个字节。

  2. 特殊符号分割:以特殊字符作为分隔符,读到特殊字符时,认为上一条消息结束。

  3. 消息头定义长度:在消息体前增加消息体的长度,一般使用四个字节,读取消息时先读取四个字节,得到消息体长度,再根据长度读取消息。

Netty Protobuf提供的处理粘包处理方式是在消息体前加正负数,并且以第一个正数作为分割。可以说是消息头定义长度方式+特殊符号分割方式的结合版。巧妙利用二进制的位运算和计算机表示整数的特点实现动态消息长度,发送较短消息时可以比消息头定义长度的方式节省1-3个字节。

博客小白的第一篇文档,如有错误,还望指正。

发表回复