时间:2021-07-01 10:21:17 帮助过:10人阅读
ByteToMessageDecoder 本身是一个 ChannelInboundHandler
ByteToMessageDecoder 中有 2 种数据积累器,一种拷贝式,一种组合式,默认使用拷贝式,组合式更省内存,更复杂,会慢点。
// io.netty.handler.codec.ByteToMessageDecoder#cumulator private Cumulator cumulator = MERGE_CUMULATOR; public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { try { final int required = in.readableBytes(); // 需要扩容 if (required > cumulation.maxWritableBytes() || (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || cumulation.isReadOnly()) { // Expand cumulation (by replacing it) under the following conditions: // - cumulation cannot be resized to accommodate the additional data // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. return expandCumulation(alloc, cumulation, in); } // 把新读到的数据写入积累器 return cumulation.writeBytes(in); } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) in.release(); } } };
入口在
// io.netty.handler.codec.ByteToMessageDecoder#channelRead public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { // 从对象池获取 CodecOutputList CodecOutputList out = CodecOutputList.newInstance(); try { // 从 socket 读到的数据 ByteBuf data = (ByteBuf) msg; // 是否第一次接收数据 first = cumulation == null; if (first) { cumulation = data; } else { // 数据积累器是拷贝式 // 如果上一次由于拆包,没有解析出消息,则数据会存留在积累器中 // 积累器接收新的数据 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // 对积累器的数据进行解码 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); firedChannelRead |= out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } }
解码
// io.netty.handler.codec.ByteToMessageDecoder#callDecode protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { // 已解码的消息个数 int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); // 调用子类的 decode 方法 decodeRemovalReentryProtection(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } // 前后的消息个数相等,表示没有解码新的消息 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { // 拆包情况,走这个分支,跳出循环,读取更多的数据 break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
// io.netty.handler.codec.LengthFieldBasedFrameDecoder#decode protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (discardingTooLongFrame) { discardingTooLongFrame(in); } if (in.readableBytes() < lengthFieldEndOffset) { return null; } int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset; // 读取报文的长度字段,解析出报文长度 // ByteBuf.getUnsignedByte 并不会移动 readerIndex long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); if (frameLength < 0) { failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset); } frameLength += lengthAdjustment + lengthFieldEndOffset; if (frameLength < lengthFieldEndOffset) { failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset); } if (frameLength > maxFrameLength) { exceededFrameLength(in, frameLength); return null; } // never overflows because it‘s less than maxFrameLength int frameLengthInt = (int) frameLength; // 拆包情况,读到的数据小于报文的长度,无法解析,需要继续接收数据,进行第二次解析 if (in.readableBytes() < frameLengthInt) { return null; } if (initialBytesToStrip > frameLengthInt) { failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip); } in.skipBytes(initialBytesToStrip); // extract frame int readerIndex = in.readerIndex(); int actualFrameLength = frameLengthInt - initialBytesToStrip; ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); in.readerIndex(readerIndex + actualFrameLength); return frame; }
处理拆包的流程:
1. 从 channel 读取的数据会先放到数据积累器中
2. 使用解码器对数据进行解码,拆包情况则是接收的数据小于报文的实际长度,因此解码失败
3. 重新从 channel 读取消息,放入积累器中
4. 再次解码消息,若成功则传播到 pipeline 中,失败则继续接收消息
从 LengthFieldBasedFrameDecoder 看 netty 处理拆包
标签:http 解析 无法解析 ann oid error input sage flow