protected abstract T createMessage()

in servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpObjectDecoder.java [231:459]


    protected abstract T createMessage(ByteBuf buffer, int firstStart, int firstLength,
                                       int secondStart, int secondLength,
                                       int thirdStart, int thirdLength);

    @Override
    protected final void decode(final ChannelHandlerContext ctx, final ByteBuf buffer) {
        switch (currentState) {
            case SKIP_CONTROL_CHARS: {
                if (!skipControlCharacters(buffer)) {
                    return;
                }
                currentState = State.READ_INITIAL;
            }
            case READ_INITIAL: {
                final long longLFIndex = findCRLF(buffer, maxStartLineLength, allowLFWithoutCR);
                if (longLFIndex < 0) {
                    handlePartialInitialLine(ctx, buffer);
                    return;
                }

                // Parse the initial line:
                // https://tools.ietf.org/html/rfc7230#section-3.1.1
                // request-line = method SP request-target SP HTTP-version CRLF
                // https://tools.ietf.org/html/rfc7230#section-3.1.2
                // status-line = HTTP-version SP status-code SP reason-phrase CRLF
                final int lfIndex = crlfIndex(longLFIndex);
                final int nonControlIndex = crlfBeforeIndex(longLFIndex);
                final int aStart = buffer.readerIndex();    // We already skipped all preface control chars
                // Look only for a WS, other checks will be done later by request/response decoder
                final int aEnd = buffer.forEachByte(aStart + 1, nonControlIndex - aStart, FIND_WS);
                if (aEnd < 0) {
                    throw newStartLineError("first");
                }

                final int bStart = aEnd + 1;    // Expect a single WS
                final int bEnd;
                try {
                    bEnd = buffer.forEachByte(bStart, nonControlIndex - bStart + 1,
                            isDecodingRequest() ? FIND_VCHAR_END : FIND_WS);
                } catch (IllegalCharacterException cause) {
                    throw new StacklessDecoderException(
                            "Invalid start-line: HTTP request-target contains an illegal character", cause);
                }
                if (bEnd < 0 || bEnd == bStart) {
                    throw newStartLineError("second");
                }

                final int cStart = bEnd + 1;    // Expect a single WS
                // Other checks will be done later by request/response decoder
                final int cLength = cStart > nonControlIndex ? 0 : nonControlIndex - cStart + 1;

                // Consume the initial line bytes from the buffer.
                consumeCRLF(buffer, lfIndex);

                message = createMessage(buffer, aStart, aEnd - aStart, bStart, bEnd - bStart, cStart, cLength);
                currentState = State.READ_HEADER;
                closeHandler.protocolPayloadBeginInbound(ctx);
                // fall-through
            }
            case READ_HEADER: {
                State nextState = readHeaders(buffer);
                if (nextState == null) {
                    return;
                }
                assert message != null;
                if (shouldClose(message)) {
                    closeHandler.protocolClosingInbound(ctx);
                }
                currentState = nextState;
                switch (nextState) {
                    case SKIP_CONTROL_CHARS:
                        // fast-path
                        // No content is expected.
                        ctx.fireChannelRead(message);
                        closeHandler.protocolPayloadEndInbound(ctx);
                        resetNow();
                        return;
                    case READ_CHUNK_SIZE:
                        // Chunked encoding - generate HttpMessage first.  HttpChunks will follow.
                        ctx.fireChannelRead(message);
                        return;
                    default:
                        // <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that
                        // if a request does not have either a transfer-encoding or a content-length header then the
                        // message body length is 0. However for a response the body length is the number of octets
                        // received prior to the server closing the connection. So we treat this as variable length
                        // chunked encoding.
                        long contentLength = contentLength();
                        if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
                            ctx.fireChannelRead(message);
                            closeHandler.protocolPayloadEndInbound(ctx);
                            resetNow();
                            return;
                        }

                        assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
                                nextState == State.READ_VARIABLE_LENGTH_CONTENT;

                        ctx.fireChannelRead(message);

                        if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
                            // chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by
                            // chunk.
                            chunkSize = contentLength;
                        }

                        // We return here, this forces decode to be called again where we will decode the content
                        return;
                }
                // fall-through
            }
            case READ_VARIABLE_LENGTH_CONTENT: {
                // Keep reading data as a chunk until the end of connection is reached.
                int toRead = buffer.readableBytes();
                if (toRead > 0) {
                    ByteBuf content = buffer.readRetainedSlice(toRead);
                    cumulationIndex = buffer.readerIndex();
                    ctx.fireChannelRead(newBufferFrom(content));
                }
                return;
            }
            case READ_FIXED_LENGTH_CONTENT: {
                int toRead = buffer.readableBytes();

                // Check if the buffer is readable first as we use the readable byte count
                // to create the HttpChunk. This is needed as otherwise we may end up with
                // create a HttpChunk instance that contains an empty buffer and so is
                // handled like it is the last HttpChunk.
                //
                // See https://github.com/netty/netty/issues/433
                if (toRead == 0) {
                    return;
                }

                if (toRead > chunkSize) {
                    toRead = (int) chunkSize;
                }
                ByteBuf content = buffer.readRetainedSlice(toRead);
                chunkSize -= toRead;
                cumulationIndex = buffer.readerIndex();

                if (chunkSize == 0) {
                    // Read all content.
                    // https://tools.ietf.org/html/rfc7230.html#section-4.1
                    // This is not chunked encoding so there will not be any trailers.
                    ctx.fireChannelRead(newBufferFrom(content));
                    closeHandler.protocolPayloadEndInbound(ctx);
                    resetNow();
                } else {
                    ctx.fireChannelRead(newBufferFrom(content));
                }
                return;
            }
            // everything else after this point takes care of reading chunked content. basically, read chunk size,
            // read chunk, read and ignore the CRLF and repeat until 0
            case READ_CHUNK_SIZE: {
                final long longLFIndex = findCRLF(buffer, MAX_HEX_CHARS_FOR_LONG, false);
                if (longLFIndex < 0) {
                    return;
                }
                final int lfIndex = crlfIndex(longLFIndex);
                long chunkSize = getChunkSize(buffer, lfIndex);
                consumeCRLF(buffer, lfIndex);
                this.chunkSize = chunkSize;
                if (chunkSize == 0) {
                    currentState = State.READ_CHUNK_FOOTER;
                    return;
                }
                currentState = State.READ_CHUNKED_CONTENT;
                // fall-through
            }
            case READ_CHUNKED_CONTENT: {
                assert chunkSize <= Integer.MAX_VALUE;
                final int toRead = min((int) chunkSize, buffer.readableBytes());
                if (toRead == 0) {
                    return;
                }
                Buffer chunk = newBufferFrom(buffer.readRetainedSlice(toRead));
                chunkSize -= toRead;
                cumulationIndex = buffer.readerIndex();

                ctx.fireChannelRead(chunk);

                if (chunkSize != 0) {
                    return;
                }
                currentState = State.READ_CHUNK_DELIMITER;
                // fall-through
            }
            case READ_CHUNK_DELIMITER: {
                // Read the chunk delimiter
                final long longLFIndex = findCRLF(buffer, CHUNK_DELIMETER_SIZE, false);
                if (longLFIndex < 0) {
                    return;
                }
                consumeCRLF(buffer, crlfIndex(longLFIndex));
                currentState = State.READ_CHUNK_SIZE;
                break;
            }
            case READ_CHUNK_FOOTER: {
                HttpHeaders trailer = readTrailingHeaders(buffer);
                if (trailer == null) {
                    return;
                }
                if (!trailer.isEmpty()) {
                    ctx.fireChannelRead(trailer);
                }
                closeHandler.protocolPayloadEndInbound(ctx);
                resetNow();
                return;
            }
            case UPGRADED: {
                int readableBytes = buffer.readableBytes();
                if (readableBytes > 0) {
                    // Keep on consuming as otherwise we may trigger an DecoderException,
                    // other handler will replace this codec with the upgraded protocol codec to
                    // take the traffic over at some point then.
                    // See https://github.com/netty/netty/issues/2173
                    ByteBuf opaquePayload = buffer.readBytes(readableBytes);
                    cumulationIndex = buffer.readerIndex();
                    // TODO(scott): revisit how upgrades are going to be done. Do we use Netty buffers or not?
                    ctx.fireChannelRead(opaquePayload);
                }
                break;
            }
            default:
                throw new Error();
        }
    }