本文共 1931 字,大约阅读时间需要 6 分钟。
Netty数据处理流程解析
1. 数据流动机制
Netty采用ChannelPipeline来管理数据的读写流程。数据从ChannelHandlerContext开始,沿着Pipeline传播到各个ChannelHandler,直到达到目标处理结点。
2. 编码器工作流程
2.1 MessageToByteEncoder编码步骤
判断对象:检查当前ChannelHandler是否能处理传入的Java对象。 分配内存:为新的ByteBuf分配内存空间。 调用编码方法:子类实现encode()方法将Java对象编码为字节流。 释放对象:编码完成后释放原Java对象。 传播数据:将编码后的ByteBuf传递给下一个ChannelHandler。 异常处理:确保异常情况下的内存释放。 2.2 编码步骤总结
- MessageToByteEncoder通过acceptOutboundMessage()判断能否处理消息。
- 子类encode()方法实现具体编码协议。
- 最终将编码后的ByteBuf传播到下一个ChannelHandler。
2.3 子类编码实现示例
public class Encoder extends MessageToByteEncoder { protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception { out.writeByte(response.getVersion()); out.writeInt(4 + response.getData().length); out.writeBytes(response.getData()); }} 3. 写入和刷新过程
3.1 writeAndFlush方法执行流程
writeAndFlush调用入口:ctx.channel().writeAndFlush(msg)。 执行流程: - 从TailContext开始传播,调用ChannelHandlerContext的writeAndFlush()。
- 逐个调用ChannelHandler的write()方法,直到某个Handler不再传播。
- 逐个调用ChannelHandler的flush()方法,直到某个Handler不再传播。
- 最终传播到HeadContext的write()和flush(),执行unsafe.write()和flush()。
3.2 writeAndFlush执行细节
- write()传播:从TailContext到HeadContext,逐个调用ChannelHandler的write()。
- flush()传播:从TailContext到HeadContext,逐个调用ChannelHandler的flush()。
- unsafe.write():将数据写入底层缓冲区。
- unsafe.flush():刷新底层缓冲区,将数据写到Socket。
4. unsafe.write()和flush()实现
4.1 unsafe.write()入口
- 通过HeadContext的write()方法调用,传递到AbstractUnsafe的write()。
4.2 unsafe.write()逻辑
Direct化ByteBuf:检查是否是直接内存,若不是则转换为直接内存。 添加到写缓冲区:封装为Entry对象,添加到ChannelOutboundBuffer。 写缓冲区管理:根据内存使用情况,控制写入和可写状态。 4.3 写缓冲区数据结构
- ChannelOutboundBuffer维护一个单向链表,包含Entry对象。
- Entry包含msg、size、promise等信息。
- 通过flush()刷新链表中的数据到Socket。
4.4 flush()实现
设置flushedEntry指针:指向unflushedEntry。 遍历和刷新:逐个将ByteBuf写到Socket,移除Entry。 内存管理:根据写缓冲区大小,控制可写状态。 5. 对象转字节流写入底层
- 数据从自定义对象传播到MessageToByteEncoder编码。
- 编码完成后,ByteBuf通过ChannelPipeline传播到HeadContext。
- HeadContext的unsafe.write()将ByteBuf添加到写缓冲区。
- flush()将写缓冲区数据刷新到Socket。
通过以上流程,Netty实现了高效的数据处理和传输机制。
转载地址:http://avcfk.baihongyu.com/