in src/brpc/policy/rtmp_protocol.cpp [1426:1647]
ParseResult RtmpChunkStream::Feed(const RtmpBasicHeader& bh,
butil::IOBuf* source, Socket* socket) {
// Parse message header. Notice that basic header is still in source.
uint32_t header_len = bh.header_length;
bool has_extended_ts = false;
uint32_t timestamp_delta = 0;
RtmpMessageHeader mh;
uint32_t cur_chunk_size = 0;
RtmpContext* ctx = connection_context();
const uint32_t chunk_size_in = ctx->_chunk_size_in;
switch (bh.fmt) {
case RTMP_CHUNK_TYPE0: {
// Type 0 chunk headers are 11 bytes long. This type MUST be
// used at the start of a chunk stream, and whenever the stream
// timestamp goes backward (e.g., because of a backward seek).
const uint32_t MSG_HEADER_LEN = 11u;
header_len += MSG_HEADER_LEN;
if (source->size() < header_len) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
char buf[header_len + 4/*extended ts*/];
const char* p = (const char*)source->fetch(buf, header_len)
+ bh.header_length;
mh.timestamp = ReadBigEndian3Bytes(p);
if (mh.timestamp == 0xFFFFFFu) {
header_len += 4u;
if (source->size() < header_len) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
p = (const char*)source->fetch(buf, header_len)
+ bh.header_length; // fetch again.
mh.timestamp = ReadBigEndian4Bytes(p + MSG_HEADER_LEN);
has_extended_ts = true;
}
timestamp_delta = mh.timestamp;
mh.message_length = ReadBigEndian3Bytes(p + 3);
_r.left_message_length = mh.message_length;
cur_chunk_size = std::min(chunk_size_in, _r.left_message_length);
if (source->size() < header_len + cur_chunk_size) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
_r.left_message_length -= cur_chunk_size;
mh.message_type = p[6];
// NOTE: stream_id is little endian.
char* pmsid = (char*)&mh.stream_id;
pmsid[0] = p[7];
pmsid[1] = p[8];
pmsid[2] = p[9];
pmsid[3] = p[10];
} break;
case RTMP_CHUNK_TYPE1: {
// Type 1 chunk headers are 7 bytes long. The message stream ID is
// not included; this chunk takes the same stream ID as the
// preceding chunk. Streams with variable-sized messages
// (for example, many video formats) SHOULD use this format for
// the first chunk of each new message after the first.
const uint32_t MSG_HEADER_LEN = 7u;
header_len += MSG_HEADER_LEN;
if (source->size() < header_len) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
char buf[header_len + 4/*extended ts*/];
const char* p = (const char*)source->fetch(buf, header_len)
+ bh.header_length;
timestamp_delta = ReadBigEndian3Bytes(p);
if (timestamp_delta == 0xFFFFFFu) {
header_len += 4u;
if (source->size() < header_len) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
p = (const char*)source->fetch(buf, header_len)
+ bh.header_length; // fetch again.
timestamp_delta = ReadBigEndian4Bytes(p + MSG_HEADER_LEN);
has_extended_ts = true;
}
if (!_r.last_msg_header.is_valid()) {
LOG(ERROR) << "No last message in chunk_stream=" << _cs_id
<< " for ChunkType1";
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
mh.timestamp = _r.last_msg_header.timestamp + timestamp_delta;
mh.message_length = ReadBigEndian3Bytes(p + 3);
_r.left_message_length = mh.message_length;
cur_chunk_size = std::min(chunk_size_in, _r.left_message_length);
if (source->size() < header_len + cur_chunk_size) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
_r.left_message_length -= cur_chunk_size;
mh.message_type = p[6];
mh.stream_id = _r.last_msg_header.stream_id;
} break;
case RTMP_CHUNK_TYPE2: {
// Type 2 chunk headers are 3 bytes long. Neither the stream ID
// nor the message length is included; this chunk has the same
// stream ID and message length as the preceding chunk. Streams
// with constant-sized messages (for example, some audio and data
// formats) SHOULD use this format for the first chunk of each
// message after the first.
const uint32_t MSG_HEADER_LEN = 3u;
header_len += MSG_HEADER_LEN;
if (source->size() < header_len) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
char buf[header_len + 4/*extended ts*/];
const char* p = (const char*)source->fetch(buf, header_len)
+ bh.header_length;
timestamp_delta = ReadBigEndian3Bytes(p);
if (timestamp_delta == 0xFFFFFFu) {
header_len += 4u;
if (source->size() < header_len) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
p = (const char*)source->fetch(buf, header_len)
+ bh.header_length; // fetch again.
timestamp_delta = ReadBigEndian4Bytes(p + MSG_HEADER_LEN);
has_extended_ts = true;
}
if (!_r.last_msg_header.is_valid()) {
LOG(ERROR) << "No last message in chunk_stream=" << _cs_id
<< " for ChunkType2";
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
mh.timestamp = _r.last_msg_header.timestamp + timestamp_delta;
mh.message_length = _r.last_msg_header.message_length;
cur_chunk_size = std::min(chunk_size_in, _r.left_message_length);
if (source->size() < header_len + cur_chunk_size) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
_r.left_message_length -= cur_chunk_size;
mh.message_type = _r.last_msg_header.message_type;
mh.stream_id = _r.last_msg_header.stream_id;
} break;
case RTMP_CHUNK_TYPE3: {
// Type 3 chunks have no message header, everything inherits from
// previous chunks.
if (!_r.last_has_extended_ts) {
timestamp_delta = _r.last_timestamp_delta;
} else {
header_len += 4u;
if (source->size() < header_len) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
char buf[header_len];
const char* p = (const char*)source->fetch(buf, header_len)
+ bh.header_length;
timestamp_delta = ReadBigEndian4Bytes(p);
has_extended_ts = true;
// librtmp may not set extended timestamp for non-first type-3
// chunks of a message. Assume that the extended timestamp exists,
// if the timestamp does not match the ones in previous chunks
// (of the message), rewind the parsing cursor.
if (!_r.first_chunk_of_message &&
timestamp_delta > 0 &&
timestamp_delta != _r.last_timestamp_delta) {
header_len -= 4;
timestamp_delta = _r.last_timestamp_delta;
}
}
if (!_r.last_msg_header.is_valid()) {
LOG(ERROR) << "No last message in chunk_stream=" << _cs_id
<< " for ChunkType3";
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
mh.timestamp = _r.last_msg_header.timestamp;
if (_r.first_chunk_of_message) {
// Only the first type-3 chunk adds the delta.
mh.timestamp += timestamp_delta;
}
mh.message_length = _r.last_msg_header.message_length;
cur_chunk_size = std::min(chunk_size_in, _r.left_message_length);
if (source->size() < header_len + cur_chunk_size) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
_r.left_message_length -= cur_chunk_size;
mh.message_type = _r.last_msg_header.message_type;
mh.stream_id = _r.last_msg_header.stream_id;
} break;
} // switch(fmt)
source->pop_front(header_len);
source->cutn(&_r.msg_body, cur_chunk_size);
ctx->AddReceivedBytes(socket, header_len + cur_chunk_size);
const int vlvl = RPC_VLOG_LEVEL + 2;
VLOG(vlvl) << socket->remote_side()
<< ": Chunk{chunk_stream_id=" << bh.chunk_stream_id
<< " fmt=" << bh.fmt
<< " body_size=" << cur_chunk_size << '}';
_r.last_has_extended_ts = has_extended_ts;
_r.last_timestamp_delta = timestamp_delta;
_r.last_msg_header = mh;
AddChunk();
if (_r.left_message_length == 0) {
MethodStatus* st = NULL;
if (ctx->service() != NULL) {
pthread_once(&g_server_msg_status_once, InitServerMessageStatus);
st = g_server_msg_status;
} else {
pthread_once(&g_client_msg_status_once, InitClientMessageStatus);
st = g_client_msg_status;
}
if (st) {
butil::Timer tm;
tm.start();
CHECK(st->OnRequested());
const bool ret = OnMessage(bh, mh, &_r.msg_body, socket);
tm.stop();
st->OnResponded(ret, tm.u_elapsed());
} else {
(void)OnMessage(bh, mh, &_r.msg_body, socket);
}
_r.msg_body.clear();
_r.left_message_length = mh.message_length;
_r.first_chunk_of_message = true;
} else {
_r.first_chunk_of_message = false;
}
return MakeMessage(NULL);
}