in servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpObjectDecoder.java [231:459]
protected abstract T createMessage(ByteBuf buffer, int firstStart, int firstLength,
int secondStart, int secondLength,
int thirdStart, int thirdLength);
@Override
protected final void decode(final ChannelHandlerContext ctx, final ByteBuf buffer) {
switch (currentState) {
case SKIP_CONTROL_CHARS: {
if (!skipControlCharacters(buffer)) {
return;
}
currentState = State.READ_INITIAL;
}
case READ_INITIAL: {
final long longLFIndex = findCRLF(buffer, maxStartLineLength, allowLFWithoutCR);
if (longLFIndex < 0) {
handlePartialInitialLine(ctx, buffer);
return;
}
// Parse the initial line:
// https://tools.ietf.org/html/rfc7230#section-3.1.1
// request-line = method SP request-target SP HTTP-version CRLF
// https://tools.ietf.org/html/rfc7230#section-3.1.2
// status-line = HTTP-version SP status-code SP reason-phrase CRLF
final int lfIndex = crlfIndex(longLFIndex);
final int nonControlIndex = crlfBeforeIndex(longLFIndex);
final int aStart = buffer.readerIndex(); // We already skipped all preface control chars
// Look only for a WS, other checks will be done later by request/response decoder
final int aEnd = buffer.forEachByte(aStart + 1, nonControlIndex - aStart, FIND_WS);
if (aEnd < 0) {
throw newStartLineError("first");
}
final int bStart = aEnd + 1; // Expect a single WS
final int bEnd;
try {
bEnd = buffer.forEachByte(bStart, nonControlIndex - bStart + 1,
isDecodingRequest() ? FIND_VCHAR_END : FIND_WS);
} catch (IllegalCharacterException cause) {
throw new StacklessDecoderException(
"Invalid start-line: HTTP request-target contains an illegal character", cause);
}
if (bEnd < 0 || bEnd == bStart) {
throw newStartLineError("second");
}
final int cStart = bEnd + 1; // Expect a single WS
// Other checks will be done later by request/response decoder
final int cLength = cStart > nonControlIndex ? 0 : nonControlIndex - cStart + 1;
// Consume the initial line bytes from the buffer.
consumeCRLF(buffer, lfIndex);
message = createMessage(buffer, aStart, aEnd - aStart, bStart, bEnd - bStart, cStart, cLength);
currentState = State.READ_HEADER;
closeHandler.protocolPayloadBeginInbound(ctx);
// fall-through
}
case READ_HEADER: {
State nextState = readHeaders(buffer);
if (nextState == null) {
return;
}
assert message != null;
if (shouldClose(message)) {
closeHandler.protocolClosingInbound(ctx);
}
currentState = nextState;
switch (nextState) {
case SKIP_CONTROL_CHARS:
// fast-path
// No content is expected.
ctx.fireChannelRead(message);
closeHandler.protocolPayloadEndInbound(ctx);
resetNow();
return;
case READ_CHUNK_SIZE:
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
ctx.fireChannelRead(message);
return;
default:
// <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that
// if a request does not have either a transfer-encoding or a content-length header then the
// message body length is 0. However for a response the body length is the number of octets
// received prior to the server closing the connection. So we treat this as variable length
// chunked encoding.
long contentLength = contentLength();
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
ctx.fireChannelRead(message);
closeHandler.protocolPayloadEndInbound(ctx);
resetNow();
return;
}
assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
nextState == State.READ_VARIABLE_LENGTH_CONTENT;
ctx.fireChannelRead(message);
if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by
// chunk.
chunkSize = contentLength;
}
// We return here, this forces decode to be called again where we will decode the content
return;
}
// fall-through
}
case READ_VARIABLE_LENGTH_CONTENT: {
// Keep reading data as a chunk until the end of connection is reached.
int toRead = buffer.readableBytes();
if (toRead > 0) {
ByteBuf content = buffer.readRetainedSlice(toRead);
cumulationIndex = buffer.readerIndex();
ctx.fireChannelRead(newBufferFrom(content));
}
return;
}
case READ_FIXED_LENGTH_CONTENT: {
int toRead = buffer.readableBytes();
// Check if the buffer is readable first as we use the readable byte count
// to create the HttpChunk. This is needed as otherwise we may end up with
// create a HttpChunk instance that contains an empty buffer and so is
// handled like it is the last HttpChunk.
//
// See https://github.com/netty/netty/issues/433
if (toRead == 0) {
return;
}
if (toRead > chunkSize) {
toRead = (int) chunkSize;
}
ByteBuf content = buffer.readRetainedSlice(toRead);
chunkSize -= toRead;
cumulationIndex = buffer.readerIndex();
if (chunkSize == 0) {
// Read all content.
// https://tools.ietf.org/html/rfc7230.html#section-4.1
// This is not chunked encoding so there will not be any trailers.
ctx.fireChannelRead(newBufferFrom(content));
closeHandler.protocolPayloadEndInbound(ctx);
resetNow();
} else {
ctx.fireChannelRead(newBufferFrom(content));
}
return;
}
// everything else after this point takes care of reading chunked content. basically, read chunk size,
// read chunk, read and ignore the CRLF and repeat until 0
case READ_CHUNK_SIZE: {
final long longLFIndex = findCRLF(buffer, MAX_HEX_CHARS_FOR_LONG, false);
if (longLFIndex < 0) {
return;
}
final int lfIndex = crlfIndex(longLFIndex);
long chunkSize = getChunkSize(buffer, lfIndex);
consumeCRLF(buffer, lfIndex);
this.chunkSize = chunkSize;
if (chunkSize == 0) {
currentState = State.READ_CHUNK_FOOTER;
return;
}
currentState = State.READ_CHUNKED_CONTENT;
// fall-through
}
case READ_CHUNKED_CONTENT: {
assert chunkSize <= Integer.MAX_VALUE;
final int toRead = min((int) chunkSize, buffer.readableBytes());
if (toRead == 0) {
return;
}
Buffer chunk = newBufferFrom(buffer.readRetainedSlice(toRead));
chunkSize -= toRead;
cumulationIndex = buffer.readerIndex();
ctx.fireChannelRead(chunk);
if (chunkSize != 0) {
return;
}
currentState = State.READ_CHUNK_DELIMITER;
// fall-through
}
case READ_CHUNK_DELIMITER: {
// Read the chunk delimiter
final long longLFIndex = findCRLF(buffer, CHUNK_DELIMETER_SIZE, false);
if (longLFIndex < 0) {
return;
}
consumeCRLF(buffer, crlfIndex(longLFIndex));
currentState = State.READ_CHUNK_SIZE;
break;
}
case READ_CHUNK_FOOTER: {
HttpHeaders trailer = readTrailingHeaders(buffer);
if (trailer == null) {
return;
}
if (!trailer.isEmpty()) {
ctx.fireChannelRead(trailer);
}
closeHandler.protocolPayloadEndInbound(ctx);
resetNow();
return;
}
case UPGRADED: {
int readableBytes = buffer.readableBytes();
if (readableBytes > 0) {
// Keep on consuming as otherwise we may trigger an DecoderException,
// other handler will replace this codec with the upgraded protocol codec to
// take the traffic over at some point then.
// See https://github.com/netty/netty/issues/2173
ByteBuf opaquePayload = buffer.readBytes(readableBytes);
cumulationIndex = buffer.readerIndex();
// TODO(scott): revisit how upgrades are going to be done. Do we use Netty buffers or not?
ctx.fireChannelRead(opaquePayload);
}
break;
}
default:
throw new Error();
}
}