in src/brpc/policy/rtmp_protocol.cpp [3302:3411]
bool RtmpChunkStream::OnPause(const RtmpMessageHeader& mh,
AMFInputStream* istream,
Socket* socket) {
if (!connection_context()->is_server_side()) {
RTMP_ERROR(socket, mh) << "Client should not receive `pause'";
return false;
}
uint32_t transaction_id = 0;
if (!ReadAMFUint32(&transaction_id, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read pause.TransactionId";
return false;
}
if (!ReadAMFNull(istream)) { // command object
RTMP_ERROR(socket, mh) << "Fail to read pause.CommandObject";
return false;
}
bool pause_or_unpause = true;
if (!ReadAMFBool(&pause_or_unpause, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read pause/unpause flag";
return false;
}
double milliseconds = 0;
if (!ReadAMFNumber(&milliseconds, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read pause.milliSeconds";
return false;
}
butil::intrusive_ptr<RtmpStreamBase> stream;
if (!connection_context()->FindMessageStream(mh.stream_id, &stream)) {
RTMP_WARNING(socket, mh) << "Fail to find stream_id=" << mh.stream_id;
return false;
}
if (stream->_paused == pause_or_unpause) {
if (pause_or_unpause) {
RTMP_ERROR(socket, mh) << "Pause an already paused stream";
} else {
RTMP_ERROR(socket, mh) << "Unpause an already unpaused stream";
}
return false;
}
int rc = static_cast<RtmpServerStream*>(stream.get())->OnPause(
pause_or_unpause, milliseconds);
// Send back status.
butil::IOBuf req_buf;
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
if (rc == 0) {
WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
WriteAMFUint32(0, &ostream);
WriteAMFNull(&ostream);
RtmpInfo info;
if (pause_or_unpause) {
info.set_code(RTMP_STATUS_CODE_STREAM_PAUSE);
} else {
info.set_code(RTMP_STATUS_CODE_STREAM_UNPAUSE);
}
info.set_level(RTMP_INFO_LEVEL_STATUS);
info.set_description("Paused stream.");
WriteAMFObject(info, &ostream);
CHECK(ostream.good());
} else {
WriteAMFString(RTMP_AMF0_COMMAND_ERROR, &ostream);
WriteAMFNumber(0, &ostream);
WriteAMFNull(&ostream);
RtmpInfo info;
if (pause_or_unpause) {
info.set_code(RTMP_STATUS_CODE_STREAM_PAUSE);
} else {
info.set_code(RTMP_STATUS_CODE_STREAM_UNPAUSE);
}
info.set_level(RTMP_INFO_LEVEL_ERROR);
info.set_description(pause_or_unpause ? "Fail to pause" :
"Fail to unpause");
WriteAMFObject(info, &ostream);
CHECK(ostream.good());
}
}
SocketMessagePtr<RtmpUnsentMessage> msg1(new RtmpUnsentMessage);
msg1->header.message_length = req_buf.size();
msg1->header.message_type = RTMP_MESSAGE_COMMAND_AMF0;
msg1->header.stream_id = mh.stream_id;
msg1->chunk_stream_id = chunk_stream_id();
msg1->body = req_buf;
// StreamEOF(pause) or StreamBegin(unpause)
char cntl_buf[6];
char* p = cntl_buf;
if (pause_or_unpause) {
WriteBigEndian2Bytes(&p, RTMP_USER_CONTROL_EVENT_STREAM_EOF);
} else {
WriteBigEndian2Bytes(&p, RTMP_USER_CONTROL_EVENT_STREAM_BEGIN);
}
WriteBigEndian4Bytes(&p, mh.stream_id);
RtmpUnsentMessage* msg2 = MakeUnsentControlMessage(
RTMP_MESSAGE_USER_CONTROL, cntl_buf, sizeof(cntl_buf));
msg1->next.reset(msg2);
if (WriteWithoutOvercrowded(socket, msg1) != 0) {
PLOG(WARNING) << socket->remote_side() << '[' << mh.stream_id
<< "] Fail to respond " << (pause_or_unpause ? "pause" : "unpause");
return false;
}
if (rc == 0) {
stream->_paused = pause_or_unpause;
return true;
}
return false;
}