Netty的ByteBuf

xiaoxiao2021-03-01  11

转自:https://blog.csdn.net/yjw123456/article/details/77843931

正如之前所说,网络传输的基本单位是字节。Java NIO 提供了ByteBuffer作为它的容器,但是这个类使用起来比较复杂和麻烦。Netty提供了一个更好的实现:ByteBuf。

ByteBuf的API

Netty为数据处理提供的API通过抽象类ByteBuf和接口ByteBufHolder暴露出来。  下面列出ByteBuf API的优点:

可扩展到用户定义的buffer类型中通过内置的复合buffer类型实现透明的零拷贝(zero-copy)容量可以根据需要扩展切换读写模式不需要调用ByteBUffer.flip()方法读写采用不同的索引支持方法链接调用支持引用计数支持池技术(比如:线程池、数据库连接池)

ByteBuf类—Netty的数据容器

因为所有的网络通信都涉及到字节序列的移动,一个有效而易用的数据结构是非常必要的。Netty的ByteBuf实现达到并超过这些需求。下面了解一下如何通过索引来简化对获取它持有数据的操作。

工作原理

ByteBuf维护两个不同的索引:读索引和写索引。当你从ByteBuf中读,它的readerIndex增加了读取的字节数;同理,当你向ByteBuf中写,writerIndex增加。下图显示了一个空的ByteBuf的布局和状态:   为了理解这些索引的关系,考虑一下如果你读数据时readerIndex已经和writerIndex一样会发生什么。在这个点,你已经读完了可读的数据。尝试继续往下读会引发一个IndexOutOfBoundsException,就像引发数组越界那样。

ByteBuf中名称以read或write开头的方法会推进相应的索引,而以set或get开头的不会。  可以指定ByteBuf最大的容量,默认是Integer.MAX_VALUE。

ByteBuf的使用模式

为了了解它的使用模式,我们得首先记住上图所展示的内容—一个数组以及两个索引来控制读和写。

堆缓冲区(HEAP BUFFER)

最常使用的ByteBuf模式将数据保存到JVM的堆中。被称为支持数组(backing array),这个模式提供了在没有使用池技术的情况下快速分配和释放(在堆缓冲区中)。这种方法是非常适合于来处理内置数据(legacy data,直译遗留数据,个人觉得这里翻译为内置数据更好理解一些,如果不合适,以后再改)的,如下所示:

ByteBuf heapBuf = ...; if (heapBuf.hasArray()){//检查是否有支持数组 byte[] array = heapBuf.array(); //得到支持数组 int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();//计算第一个字节的偏移量 int length = heapBuf.readableBytes();//计算可读字节数 handleArray(array, offset, length); //调用你的方法来处理这个array }

当hasArray()返回false时尝试访问支持数组会抛出UnsupportedOperationException。这个用法与JDK的ByteBuffer类似

直接缓冲区(DIRECT BUFFER)

我们认为创建对象时内存总是从堆中分配?但并非总是如此。在JDK1.4中引入的NIO的ByteBuffer类允许JVM 通过本地(native)方法调用分配内存,其目的是通过免去中间交换的内存拷贝, 提升IO处理速度。

直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。这就解释了为什么直接缓冲区数据对网络数据传输来说是一种非常理想的方式。如果你的数据是存放在堆中分配的缓冲区,那么实际上,在通过 socket 发送数据之前,JVM需要将先数据复制到直接缓冲区。

这种方式的主要缺点是对于分配和释放内存空间来说比堆缓冲区消耗更大。如果你要处理内置数据的代码时可能会遇到另一个缺点:因为数据没有被分配到堆上,你可能需要做一个拷贝,如下所示:

ByteBuf directBuf = ... if (!directBuf.hasArray()) {//false表示为这是直接缓冲 int length = directBuf.readableBytes();//得到可读字节数 byte[] array = new byte[length]; //分配一个具有length大小的数组 directBuf.getBytes(directBuf.readerIndex(), array); //将缓冲区中的数据拷贝到这个数组中 handleArray(array, 0, length); //下一步处理 }

明显这要比使用支持数组的方式需要更多的工作,所以你如果提前知道数据会以一个数组的方式存取,推荐你使用堆内存。

复合缓冲区(COMPOSITE BUFFER)

第三种也是最后一种模式使用一个复合缓冲区,为多个ByteBuf提供一个组合的视图。你可以添加或者删除ByteBuf实例,一种JDK ByteBuffer中完全没有的实现。

Netty通过ByteBuf的子类-CompositeByteBuf来实现这种模式,提供了将多个buffer虚拟成一个合并的Buffer的技术。

注意:CompositeByteBuf中的ByteBuf实例可能同时包含堆缓冲区的和直接缓冲区的。如果CompositeByteBuf只含有一个实例,调用hasArray()方法会返回这个实例的hasArray()方法的值;否则总是返回false。

让我们考虑一条由两部分组成的消息,header和body,通过HTTP传输。这两部分由不同的应用程序模块产生,这个应用有为多条消息重用同一个body的选项。在这种情况下,会为每条消息创建一个新的header。

因为你不想为每条消息的两个buffer都重新分配空间,CompositeByteBuf就完美适合这种情况。  下图显示了这个消息的布局: 

下面先介绍如何通过JDK的ByteBuffer来实现这个需求:

//通过一个数组来存储这条消息 ByteBuffer [] message = new ByteBuffer[]{header,body}; //使用副本来合并这两个部分 ByteBuffer message2 =ByteBuffer.allocate(header.remaining() + body.remaining()); message2.put(header); message2.put(body); message2.flip()

这种分配和拷贝的方式显然是低效且不合适的。下面介绍如何通过CompositeByteBuf来实现:

CompositeByteBuf messageBuf = Unpooled.compositeBuffer(); ByteBuf headerBuf = ...; //直接缓冲或堆缓冲都可 ByteBuf bodyBuf = ...; // 直接缓冲或堆缓冲都可 messageBuf.addComponents(headerBuf, bodyBuf);//将ByteBuf实例添加到CompositeByteBuf中 ..... messageBuf.removeComponent(0); //删除header for (ByteBuf buf : messageBuf) {//遍历messageBuf中的ByteBuf System.out.println(buf.toString()); }

CompositeByteBuf可能不允许访问支持数组,所以访问CompositeByteBuf中的数据的方式类似于直接缓冲区模式,如下所示:

CompositeByteBuf compBuf = Unpooled.compositeBuffer(); int length = compBuf.readableBytes();//得到可读的字节数 byte[] array = new byte[length];//分配一个字节数组 compBuf.getBytes(compBuf.readerIndex(), array);//将数据读到这个字节数组中 handleArray(array, 0, array.length);

Netty通过CompositeByteBuf来优化socket的IO操作,尽可能的消除JDK buffer实现中的性能和内存使用中的不足。尽管这些优化被封装到Netty的核心代码中,但你应该意识到这些优化的影响。

字节级别的操作

除了基本的读写操作,ByteBuf提供了大量的修改它数据的方法。下面我们会讨论最重要的一些。

随机访问索引

ByteBuf 使用从0开始的索引,第一个字节的索引是 0,最后一个字节的索引是 ByteBuf的capacity()- 1。下面的代码显示了迭代ByteBuf的内容有多简单:

ByteBuf buffer = ...; for (int i = 0; i < buffer.capacity(); i++) { byte b = buffer.getByte(i); System.out.println((char) b); }

顺序访问索引

尽管ByteBuf有读写索引,而JDK的ByteBuffer只有一个索引,这就是为什么你需要使用flip()方法来切换读写模式。下图显示了ByteBuf被两个索引分成了三个区域: 

第一个是已经读取过的字节,因此可以被丢弃;第二个要可读的字节,也就是ByteBuf的内容;第三个是可添加新的字节的区域。

+-------------------+------------------+------------------+ | discardable bytes | readable bytes | writable bytes | | | (CONTENT) | | +-------------------+------------------+------------------+ | | | | 0 <= readerIndex <= writerIndex <= capacity

这和源码中示例图类似。

可丢弃字节(Discardable bytes)

上图中标记为”discardable bytes”的部分中的字节已经被读取过,通过调用discardReadBytes()方法它们能被丢弃同时它们的空间能被回收。这个部分中的初始大小为0,保存在readerIndex中,随着读操作(read* 方法)的执行而增加。

下图显示了调用discardReadBytes()后的结果。你能发现可丢弃字节部分的空间已经变得可用,并分配到可写空间中去了。注意,在调用discardReadBytes()后无法保证可写部分的内容是什么样的。

你可能会经常调用discardReadBytes()方法来增大可写部分的空间,请注意这很有可能会导致内存复制,因为可读的字节不得不移动到buffer的开端处。建议只有在真正需要的时候才这么做,比如当内存快要溢出(premium)的时候。

可读字节(Readable bytes)

可读字节部分存储了实际的内容。一个新分配、wrap、复制的buffer的readerIndex是0。任何名称以read或skip开头的操作会检索或跳过当前的readerIndex,然后增加读取了的字节数量。

看一下下面这个方法:

/** * 将这个buffer(this,调用这个方法的buffer对象)的数据传输到这个特定的dst。以this对象的当前 * readerIndex开始直到dst变得不可写,然后增加传输的字节数量到this对象的readerIndex上。 * * @throws IndexOutOfBoundsException 如果 dst.writableBytes > this.readableBytes */ public abstract ByteBuf readBytes(ByteBuf dst);

如果尝试从一个已经没有可读数据的buffer中读取数据会引发一个IndexOutOfBoundsException。  下面的代码显示了如何读取所有可读的数据:

ByteBuf buffer = ...; while (buffer.isReadable()) { System.out.println(buffer.readByte()); }

可写字节(Writable bytes)

可读字节部分是一个有未定义内容的可写内存区域。新分配的buffer的writerIndex是0。任何名称以write开头的方法会从当前的writerIndex开始写数据,增加刚才写的字节数量。

如果尝试写的字节大小超过了这个buffer的容量,会引发IndexOutOfBoundException。

下面显示了如何正确地向buffer中写数据:

ByteBuf buffer = ...; while (buffer.writableBytes() >= 4) { buffer.writeInt(random.nextInt()); }

索引管理(Index management)

JDK的InputStream定义了mark(int readlimit)和reset()两个方法,这些方法用来标注这个stream的当前索引到一个特定的位置,然后可以相应的将stream重置(reset())到刚才标注的位置。

类似地,你能通过调用markReaderIndex(), markWriterIndex(), resetReaderIndex()和 resetWriterIndex()来设置和复位ByteBuf的readerIndex和writerIndex。除了没有readlimit参数来指定什么时候标记失效,这些很像InputStream的调用。

你也能通过调用readerIndex(int)或writerIndex(int)方法来将索引移到一个特定的位置。尝试设置索引到一个无效的位置也会导致IndexOutOfBoundsException。

可以通过调用clear()方法将readerIndex和writerIndex都设为0。注意这并不会导致任何内存释放。

假设在调用clear()方法之前一个ByteBuf实例有3个部分,如下: 

调用之后如下: 

这一部分的大小和这个ByteBuf的容量一样大,因此所有的空间都是可写的。

调用clear()的开销没有discardReadBytes()那么大,因为它不需要任何内存复制。

搜索操作(Search operations)

有几种方法可以检测特定值的索引。最简单的是indexOf()方法,更复杂的方式是调用以ByteBufProcessor接口作为参数的方法。这个接口只定义了一个方法:

boolean process(byte value)  

ByteBufProcessor中还定义了很多目标常量,假如你的应用与Flash的socket有交互的话,它有以null为结尾的内容,调用forEachByte(ByteBufProcessor.FIND_NUL)来处理Flash中的数据还是很简单高效的,因为在处理过程中只做了很少的一些边界检查。

通过ByteBufProcessor来查找\r int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);

衍生(Derived)Buffer

衍生Buffer为ByteBuf提供一个以特定方式呈现内容的视图。这些视图通过如下方法创建:

duplicate()slice()slice(int,int)Unpooled.unmodifiableBuffer(…)order(ByteOrder)readSlice(int)

每个方法返回一个新的具有自己的读、写和标记索引的ByteBuf实例。新实例和源实例(调用者写方法的实例)之间共享内部存储。这让一个衍生Buffer可以低消耗的创建,但也意味着你修改了衍生Buffer的内容会改变源Buffer的内容(因为是共享的),反之亦然。

复制ByteBuf 如果你想真正的复制一个buffer,可通过copy()或copy(int,int)方法。

下面的代码展示了slice(int,int)方法的用法:

Charset utf8 = Charset.forName("UTF-8"); ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);//创建一个ByteBuf ByteBuf sliced = buf.slice(0, 14);//创建这个ByteBuf的一个切片 System.out.println(sliced.toString(utf8));//输出 Netty in Actio buf.setByte(0, (byte)'J'); assert buf.getByte(0) == sliced.getByte(0);//成功 System.out.println(sliced.toString(utf8));//输出 Jetty in Actio

下面看copy()方法与slice()方法的不同:

Charset utf8 = Charset.forName("UTF-8"); ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);//创建一个ByteBuf ByteBuf copy = buf.copy(0, 14);//创建这个ByteBuf的一个切片 System.out.println(copy.toString(utf8));//输出 Netty in Actio buf.setByte(0, (byte)'J'); assert buf.getByte(0) != copy.getByte(0);//成功

这两个例子展示了修改一个切片和一个拷贝对原来ByteBuf的影响。只要可能,推荐使用slice()方法来避免内存复制。

读写操作

Netty中有两种读写操作:

get()和set()操作以一个指定的索引开始但不会修改这个索引read()和write()操作以给定的索引开始并根据访问的数据大小而修改索引

下表列出了最常用的get*方法:

名称描述setBoolean(int, boolean)设置Boolean值到给定索引处setByte(int index, int value)设置byte值到给定索引处setMedium(int index, int value)设置24位中整型(24-bit medium)值到给定索引处setInt(int index, int value)设置int值到给定索引处setLong(int index, int value)设置long值到给定索引处setShort(int index, int value)设置short值到给定索引处

下面的代码描述了get()和set()方法的使用,从中可以看到它们不会修改读写索引:

Charset utf8 = Charset.forName("UTF-8"); ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); System.out.println((char)buf.getByte(0));//'N' int readerIndex = buf.readerIndex();//存储当前的读索引 int writerIndex = buf.writerIndex();//存储当前的写索引 buf.setByte(0, (byte)'B');//更新值 System.out.println((char)buf.getByte(0));//'B' assert readerIndex == buf.readerIndex();//成功 assert writerIndex == buf.writerIndex();//成功

下面来学习read*操作,这些方法作用于当前的读写索引。它们通过将ByteBuf看成stream的方式来从中读数据,下表显示了常用的方法:

名称描述readBoolean()返回当前readerIndex处的Boolean值,然后将readerIndex加1readByte()返回当前readerIndex处的byte值,然后将readerIndex加1readUnsignedByte()返回当前readerIndex处的无符号byte值并作为一个short类型,然后将readerIndex加1readMedium()返回当前readerIndex处的24位中整型值,然后将readerIndex加3readUnsignedMedium()返回当前readerIndex处的无符号24位中整型值,然后将readerIndex加3readInt()返回当前readerIndex处的int值,然后将readerIndex加4readUnsignedInt()返回当前readerIndex处的无符号int值,然后将readerIndex加4readLong()返回当前readerIndex处的long值,然后将readerIndex加8readShort()返回当前readerIndex处的short值,然后将readerIndex加2readUnsignedShort()返回当前readerIndex处的无符号short值,然后将readerIndex加2readBytes(ByteBuf or byte[]destination,int dstIndex [,int length])将当前ByteBuf(从当前readerIndex开始)的数据传输到目标ByteBuf(从dstIndex开始复制)。并增加传输数据的大小(字节数量)到当前ByteBuf的readerIndex中

几乎每个read*方法都有响应的write*方法,注意下表列出的这些方法的参数要要写入的值,而不是索引值。

名称描述writeBoolean(boolean)将Boolean值写入到当前writerIndex,然后将writerIndex加1writeByte(int)将byte值写入到当前writerIndex,然后将writerIndex加1writeMedium(int)将中整型值写入到当前writerIndex,然后将writerIndex加3writeInt(int)将int值写入到当前writerIndex,然后将writerIndex加4writeLong(long)将long值写入到当前writerIndex,然后将writerIndex加8writeShort(int)将short值写入到当前writerIndex,然后将writerIndex加2writeBytes(source ByteBuf or byte[] [,int srcIndex,int length])将数据从特定的ByteBuf或byte数组传输到当前ByteBuf(从它的writerIndex开始写)中。如果提供了srcIndex和length,那么就会从srcIndex开始拷贝,并拷贝length大小的字节到当前ByteBuf中。并增加传输数据的大小(字节数量)到当前ByteBuf的writerIndex中

下面展示这些方法怎么使用:

Charset utf8 = Charset.forName("UTF-8"); ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); System.out.println((char)buf.readByte());//N int readerIndex = buf.readerIndex(); int writerIndex = buf.writerIndex(); buf.writeByte((byte)'?'); System.out.println(buf.toString(utf8));//etty in Action rocks!? assert readerIndex == buf.readerIndex();//成功 assert writerIndex != buf.writerIndex();//成功

更多的操作

下表列出了其他有用的操作:

名称描述isReadable()如果至少还有一个字节可被读的话返回trueisWritable()如果至少还有一个位可写入的话返回truereadableBytes()返回可读的字节数量writableBytes()返回可写的字节数量capacity()返回当前ByteBuf能存储的字节数量maxCapacity()返回ByteBuf最多能存储多少字节数量hasArray()返回Byte是否有支持数组array()如果有支持数组,返回;否则抛UnsupportedOperationException

ByteBufHolder接口

我们经常会遇到除了需要存储实际的数据内容还需要存储各种各样的属性值的情况,比如HTTP响应,除了以字节表示的内容还会有状态码、cookie等等。

Netty提供了ByteBufHolder接口来满足这种用例。ByteBufHolder也支持Netty的高级特性,如buffer池,可以从buffer池中取一个可用的ByteBuf,如果需要还可以自动释放。

ByteBufHolder只有几个方法来访问底层的ByteBuf和引用计数,下表中列出了这些方法:

名称描述content()返回这个ByteBufHolder持有的ByteBufcopy()返回这个ByteBufHolder的一个深复制,包括一个它含有的ByteBuf数据的一个拷贝(非共享的)duplicate()返回这个ByteBufHolder的一个浅复制,包括一个它含有的ByteBuf数据的一个拷贝(共享的)

如果你想实现一个“消息对象”有效负载存储在ByteBuf,使用ByteBufHolder是一个好主意。

ByteBuf内存分配(ByteBuf allocation)

这一节我们会介绍管理ByteBuf实例的方法

请求式:ByteBufAllocator接口

为了减少分配和释放内存的总开销,Netty通过ByteBufAllocator实现ByteBuf池,ByteBufAllocator负责分配ByteBuf实例。

下表列出了ByteBufAllocator提供的方法:

名称描述buffer()返回一个基于直接缓冲区或堆缓冲区的ByteBufheapBuffer()返回一个基于堆缓冲区的ByteBufdirectBuffer()返回一个基于直接缓冲区的ByteBufioBuffer()返回一个适用于socket上的IO操作的ByteBuf(一般是直接缓冲区的)

你可以从一个Channel或与ChannelHandler绑定的ChannelHandlerContext中获得一个ByteBufAllocator的引用。

下面展示了这种方式的用法:

Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); .... ChannelHandlerContext ctx = ...; ByteBufAllocator allocator2 = ctx.alloc();

Netty为ByteBufAllocator提供了两种实现: PooledByteBufAllocator和UnpooledByteBufAllocator。前者通过ByteBuf的实例放入池中来提高性能同时减少内存碎片。后者每次返回一个新建的实例而没有通过池来缓存。

尽管Netty默认使用PooledByteBufAllocator,但是可以在启动你应用的时候通过 ChannelConfig 的 API来修改。

Unpooled buffers

可能你无法获得ByteBufAllocator的引用,在这种情况下,Netty提供了一个叫做Unpooled的工具类,它提供了静态的方法来创建不从池中获取的(unpooled)新建的ByteBuf实例。

下表列出了最重要的一些方法:

名称描述buffer()返回一个基于堆缓冲区的ByteBufdirectBuffer()返回一个基于直接缓冲区的ByteBufwrappedBuffer()返回一个包装了(wrap)给定数据的ByteBufcopiedBuffer()返回一个持有给定数据的一个拷贝的ByteBuf

Unpooled类使在非网络编程项目中也可以用到ByteBuf。

ByteBufUtil类

ByteBufUtil提供了用于操纵ByteBuf的静态帮助方法。因为这API是通用的,与池无关,因此这些方法已经在分配内存的类之外实现了。

这些静态方法中最重要的可能是hexdump()了,它能以十六进制的形式打印一个ByteBuf实例的内容。这在日志记录中十分有用;另一个有用的方法是equals(ByteBuf,ByteBuf)方法,它返回这两个ByteBuf是否相等(以一定的规则判断,比如可读字节数大小、a[aStartIndex : aStartIndex + length] == b[bStartIndex : bStartIndex + length] 其中 a[i:k]表示a[i]、a[i+1]、a[i+2]…a[k-1])。

引用计数

引用计数是一种优化内存使用和性能的技术,当对象不再被引用时,释放对象所持有的资源。Netty从4.0版本开始为ByteBuf和ByteBufHolder引入了引用计数,它们都实现了ReferenceCounted接口。

引用计数背后的原理不是很复杂,主要是跟踪有多少个活跃引用指向了某个对象。只要某个对象的引用计数大于0,可以保证这个对象不会被释放。当某个对象的引用计数变成0时,这个对象将会被释放。

引用计数对于池技术的实现至关重要,比如PooledByteBufAllocator。  在接下来的两段代码中,将会看到相关例子:

Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); .... ByteBuf buffer = allocator.directBuffer(); assert buffer.refCnt() == 1;//检测buffer的引用计数是否为1
ByteBuf buffer = ...; //减少这个对象的引用计数,如果减少到0,这个对象将会被释放 //,并且这个方法返回true。 boolean released = buffer.release();

试图访问被释放的引用计数对象将会导致一个IllegalReferenceCountException。  注意,一个特定的类可以自己定义它减少引用计数的方式,比如可以一次性将引用计数设为0。

转载请注明原文地址: https://www.6miu.com/read-3200224.html

最新回复(0)