ParseResult RtmpChunkStream::Feed()

in src/brpc/policy/rtmp_protocol.cpp [1426:1647]


ParseResult RtmpChunkStream::Feed(const RtmpBasicHeader& bh,
                                  butil::IOBuf* source, Socket* socket) {
    // Parse message header. Notice that basic header is still in source.
    uint32_t header_len = bh.header_length;
    bool has_extended_ts = false;
    uint32_t timestamp_delta = 0;
    RtmpMessageHeader mh;
    uint32_t cur_chunk_size = 0;
    RtmpContext* ctx = connection_context();
    const uint32_t chunk_size_in = ctx->_chunk_size_in;
        
    switch (bh.fmt) {
    case RTMP_CHUNK_TYPE0: {
        // Type 0 chunk headers are 11 bytes long. This type MUST be
        // used at the start of a chunk stream, and whenever the stream
        // timestamp goes backward (e.g., because of a backward seek).
        const uint32_t MSG_HEADER_LEN = 11u;
        header_len += MSG_HEADER_LEN;
        if (source->size() < header_len) {
            return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
        }
        char buf[header_len + 4/*extended ts*/];
        const char* p = (const char*)source->fetch(buf, header_len)
            + bh.header_length;
        mh.timestamp = ReadBigEndian3Bytes(p);
        if (mh.timestamp == 0xFFFFFFu) {
            header_len += 4u;
            if (source->size() < header_len) {
                return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
            }
            p = (const char*)source->fetch(buf, header_len)
                + bh.header_length; // fetch again.
            mh.timestamp = ReadBigEndian4Bytes(p + MSG_HEADER_LEN);
            has_extended_ts = true;
        }
        timestamp_delta = mh.timestamp;
        mh.message_length = ReadBigEndian3Bytes(p + 3);
        _r.left_message_length = mh.message_length;
        cur_chunk_size = std::min(chunk_size_in, _r.left_message_length);
        if (source->size() < header_len + cur_chunk_size) {
            return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
        }
        _r.left_message_length -= cur_chunk_size;
        mh.message_type = p[6];
            
        // NOTE: stream_id is little endian.
        char* pmsid = (char*)&mh.stream_id;
        pmsid[0] = p[7];
        pmsid[1] = p[8];
        pmsid[2] = p[9];
        pmsid[3] = p[10];
    } break;
    case RTMP_CHUNK_TYPE1: {
        // Type 1 chunk headers are 7 bytes long. The message stream ID is
        // not included; this chunk takes the same stream ID as the
        // preceding chunk. Streams with variable-sized messages
        // (for example, many video formats) SHOULD use this format for
        // the first chunk of each new message after the first.
        const uint32_t MSG_HEADER_LEN = 7u;
        header_len += MSG_HEADER_LEN;
        if (source->size() < header_len) {
            return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
        }
        char buf[header_len + 4/*extended ts*/];
        const char* p = (const char*)source->fetch(buf, header_len)
            + bh.header_length;
        timestamp_delta = ReadBigEndian3Bytes(p);
        if (timestamp_delta == 0xFFFFFFu) {
            header_len += 4u;
            if (source->size() < header_len) {
                return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
            }
            p = (const char*)source->fetch(buf, header_len)
                + bh.header_length; // fetch again.
            timestamp_delta = ReadBigEndian4Bytes(p + MSG_HEADER_LEN);
            has_extended_ts = true;
        }
        if (!_r.last_msg_header.is_valid()) {
            LOG(ERROR) << "No last message in chunk_stream=" << _cs_id
                       << " for ChunkType1";
            return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
        }
        mh.timestamp = _r.last_msg_header.timestamp + timestamp_delta;
        mh.message_length = ReadBigEndian3Bytes(p + 3);
        _r.left_message_length = mh.message_length;
        cur_chunk_size = std::min(chunk_size_in, _r.left_message_length);
        if (source->size() < header_len + cur_chunk_size) {
            return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
        }
        _r.left_message_length -= cur_chunk_size;
        mh.message_type = p[6];
        mh.stream_id = _r.last_msg_header.stream_id;
    } break;
    case RTMP_CHUNK_TYPE2: {
        // Type 2 chunk headers are 3 bytes long. Neither the stream ID
        // nor the message length is included; this chunk has the same
        // stream ID and message length as the preceding chunk. Streams
        // with constant-sized messages (for example, some audio and data
        // formats) SHOULD use this format for the first chunk of each
        // message after the first.
        const uint32_t MSG_HEADER_LEN = 3u;
        header_len += MSG_HEADER_LEN;
        if (source->size() < header_len) {
            return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
        }
        char buf[header_len + 4/*extended ts*/];
        const char* p = (const char*)source->fetch(buf, header_len)
            + bh.header_length;
        timestamp_delta = ReadBigEndian3Bytes(p);
        if (timestamp_delta == 0xFFFFFFu) {
            header_len += 4u;
            if (source->size() < header_len) {
                return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
            }
            p = (const char*)source->fetch(buf, header_len)
                + bh.header_length; // fetch again.
            timestamp_delta = ReadBigEndian4Bytes(p + MSG_HEADER_LEN);
            has_extended_ts = true;
        }
        if (!_r.last_msg_header.is_valid()) {
            LOG(ERROR) << "No last message in chunk_stream=" << _cs_id
                       << " for ChunkType2";
            return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
        }
        mh.timestamp = _r.last_msg_header.timestamp + timestamp_delta;
        mh.message_length = _r.last_msg_header.message_length;
        cur_chunk_size = std::min(chunk_size_in, _r.left_message_length);
        if (source->size() < header_len + cur_chunk_size) {
            return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
        }
        _r.left_message_length -= cur_chunk_size;
        mh.message_type = _r.last_msg_header.message_type;
        mh.stream_id = _r.last_msg_header.stream_id;
    } break;
    case RTMP_CHUNK_TYPE3: {
        // Type 3 chunks have no message header, everything inherits from
        // previous chunks.
        if (!_r.last_has_extended_ts) {
            timestamp_delta = _r.last_timestamp_delta;
        } else {
            header_len += 4u;
            if (source->size() < header_len) {
                return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
            }
            char buf[header_len];
            const char* p = (const char*)source->fetch(buf, header_len)
                + bh.header_length;
            timestamp_delta = ReadBigEndian4Bytes(p);
            has_extended_ts = true;
            // librtmp may not set extended timestamp for non-first type-3
            // chunks of a message. Assume that the extended timestamp exists,
            // if the timestamp does not match the ones in previous chunks
            // (of the message), rewind the parsing cursor.
            if (!_r.first_chunk_of_message &&
                timestamp_delta > 0 &&
                timestamp_delta != _r.last_timestamp_delta) {
                header_len -= 4;
                timestamp_delta = _r.last_timestamp_delta;
            }
        }
        if (!_r.last_msg_header.is_valid()) {
            LOG(ERROR) << "No last message in chunk_stream=" << _cs_id
                       << " for ChunkType3";
            return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
        }
        mh.timestamp = _r.last_msg_header.timestamp;
        if (_r.first_chunk_of_message) {
            // Only the first type-3 chunk adds the delta.
            mh.timestamp += timestamp_delta;
        }
        mh.message_length = _r.last_msg_header.message_length;
        cur_chunk_size = std::min(chunk_size_in, _r.left_message_length);
        if (source->size() < header_len + cur_chunk_size) {
            return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
        }
        _r.left_message_length -= cur_chunk_size;
        mh.message_type = _r.last_msg_header.message_type;
        mh.stream_id = _r.last_msg_header.stream_id;
    } break;
    } // switch(fmt)

    source->pop_front(header_len);
    source->cutn(&_r.msg_body, cur_chunk_size);
    ctx->AddReceivedBytes(socket, header_len + cur_chunk_size);
    const int vlvl = RPC_VLOG_LEVEL + 2;
    VLOG(vlvl) << socket->remote_side()
              << ": Chunk{chunk_stream_id=" << bh.chunk_stream_id
              << " fmt=" << bh.fmt
              << " body_size=" << cur_chunk_size << '}';
    _r.last_has_extended_ts = has_extended_ts;
    _r.last_timestamp_delta = timestamp_delta;
    _r.last_msg_header = mh;
    
    AddChunk();

    if (_r.left_message_length == 0) {
        MethodStatus* st = NULL;
        if (ctx->service() != NULL) {
            pthread_once(&g_server_msg_status_once, InitServerMessageStatus);
            st = g_server_msg_status;
        } else {
            pthread_once(&g_client_msg_status_once, InitClientMessageStatus);
            st = g_client_msg_status;
        }
        if (st) {
            butil::Timer tm;
            tm.start();
            CHECK(st->OnRequested());
            const bool ret = OnMessage(bh, mh, &_r.msg_body, socket);
            tm.stop();
            st->OnResponded(ret, tm.u_elapsed());
        } else {
            (void)OnMessage(bh, mh, &_r.msg_body, socket);
        }
        _r.msg_body.clear();
        _r.left_message_length = mh.message_length;
        _r.first_chunk_of_message = true;
    } else {
        _r.first_chunk_of_message = false;
    }
    return MakeMessage(NULL);
}