in src/brpc/policy/rtmp_protocol.cpp [2745:2923]
bool RtmpChunkStream::OnPlay(const RtmpMessageHeader& mh,
AMFInputStream* istream,
Socket* socket) {
if (!connection_context()->is_server_side()) {
RTMP_ERROR(socket, mh) << "Client should not receive `play'";
return false;
}
uint32_t transaction_id = 0;
if (!ReadAMFUint32(&transaction_id, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read play.TransactionId";
return false;
}
// ffmpeg send non-zero transaction_id for play.
if (!ReadAMFNull(istream)) {
RTMP_ERROR(socket, mh) << "Fail to read play.CommandObject";
return false;
}
RtmpPlayOptions play_opt; // inner fields are initialized with defaults.
if (!ReadAMFString(&play_opt.stream_name, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read play.StreamName";
return false;
}
// start/duration/reset are optional, check emptiness of the stream before
// calling ReadAMFXXX which prints log for failed branches.
if (!istream->check_emptiness()) {
if (!ReadAMFNumber(&play_opt.start, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read play.Start";
return false;
}
}
if (!istream->check_emptiness()) {
if (!ReadAMFNumber(&play_opt.duration, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read play.Duration";
return false;
}
}
if (!istream->check_emptiness()) {
if (!ReadAMFBool(&play_opt.reset, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read play.Reset";
return false;
}
}
RPC_VLOG << socket->remote_side() << "[" << mh.stream_id
<< "] play{transaction_id=" << transaction_id
<< " stream_name=" << play_opt.stream_name
<< " start=" << play_opt.start
<< " duration=" << play_opt.duration
<< " reset=" << play_opt.reset << '}';
butil::IOBuf req_buf;
TemporaryArrayBuilder<SocketMessagePtr<RtmpUnsentMessage>, 5> msgs;
// TODO(gejun): RTMP spec sends StreamIsRecorded before StreamBegin
// however SRS does not.
// StreamBegin
{
char cntl_buf[6];
char* p = cntl_buf;
WriteBigEndian2Bytes(&p, RTMP_USER_CONTROL_EVENT_STREAM_BEGIN);
WriteBigEndian4Bytes(&p, mh.stream_id);
msgs.push().reset(MakeUnsentControlMessage(
RTMP_MESSAGE_USER_CONTROL, cntl_buf, sizeof(cntl_buf)));
}
// Play.Reset
if (play_opt.reset) {
// According to RTMP spec: NetStream.Play.Reset is sent only if the
// play command sent by the client has set the reset flag.
req_buf.clear();
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
WriteAMFUint32(0, &ostream);
WriteAMFNull(&ostream);
RtmpInfo info;
info.set_code(RTMP_STATUS_CODE_PLAY_RESET);
info.set_level(RTMP_INFO_LEVEL_STATUS);
info.set_description("Reset " + play_opt.stream_name);
WriteAMFObject(info, &ostream);
}
RtmpUnsentMessage* msg = new RtmpUnsentMessage;
msg->header.message_length = req_buf.size();
msg->header.message_type = RTMP_MESSAGE_COMMAND_AMF0;
msg->header.stream_id = mh.stream_id;
msg->chunk_stream_id = chunk_stream_id();
msg->body = req_buf;
msgs.push().reset(msg);
}
// Play.Start
req_buf.clear();
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
WriteAMFUint32(0, &ostream);
WriteAMFNull(&ostream);
RtmpInfo info;
info.set_code(RTMP_STATUS_CODE_PLAY_START);
info.set_level(RTMP_INFO_LEVEL_STATUS);
info.set_description("Start playing " + play_opt.stream_name);
WriteAMFObject(info, &ostream);
}
RtmpUnsentMessage* msg2 = new RtmpUnsentMessage;
msg2->header.message_length = req_buf.size();
msg2->header.message_type = RTMP_MESSAGE_COMMAND_AMF0;
msg2->header.stream_id = mh.stream_id;
msg2->chunk_stream_id = chunk_stream_id();
msg2->body = req_buf;
msgs.push().reset(msg2);
// |RtmpSampleAccess(true, true)
req_buf.clear();
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_SAMPLE_ACCESS, &ostream);
WriteAMFBool(true, &ostream);
WriteAMFBool(true, &ostream);
}
RtmpUnsentMessage* msg3 = new RtmpUnsentMessage;
msg3->header.message_length = req_buf.size();
msg3->header.message_type = RTMP_MESSAGE_DATA_AMF0;
msg3->header.stream_id = mh.stream_id;
msg3->chunk_stream_id = chunk_stream_id();
msg3->body = req_buf;
msgs.push().reset(msg3);
// onStatus(NetStream.Data.Start)
req_buf.clear();
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
RtmpInfo info;
info.set_code(RTMP_STATUS_CODE_DATA_START);
WriteAMFObject(info, &ostream);
}
RtmpUnsentMessage* msg4 = new RtmpUnsentMessage;
msg4->header.message_length = req_buf.size();
msg4->header.message_type = RTMP_MESSAGE_DATA_AMF0;
msg4->header.stream_id = mh.stream_id;
msg4->chunk_stream_id = chunk_stream_id();
msg4->body = req_buf;
msgs.push().reset(msg4);
butil::intrusive_ptr<RtmpStreamBase> stream_guard;
if (!connection_context()->FindMessageStream(mh.stream_id, &stream_guard)) {
RTMP_WARNING(socket, mh) << "Fail to find stream_id=" << mh.stream_id;
return false;
}
// Change the chunk_stream_id of the server stream to be same with play,
// so that laterly user can call SendXXXMessage successfully.
stream_guard->_chunk_stream_id = chunk_stream_id();
RtmpServerStream* stream = static_cast<RtmpServerStream*>(stream_guard.get());
for (size_t i = msgs.size(); i > 1; --i) {
msgs[i-2]->next.reset(msgs[i-1].release());
}
if (WriteWithoutOvercrowded(socket, msgs[0]) != 0) {
PLOG(WARNING) << socket->remote_side() << '[' << mh.stream_id
<< "] Fail to respond play";
return false;
}
// cyberplayer sends play instead of unpause (and send closeStream instead
// of pause). play automatically unpauses
if (stream->_paused) {
stream->_paused = false;
RPC_VLOG << "Trigger unpause";
stream->OnPause(false, 0);
}
// Call user's callback.
OnPlayContinuation* done = new OnPlayContinuation;
done->player_stream.reset(stream, false/*don't add ref*/);
stream_guard.detach();
done->player_stream->OnPlay(play_opt, &done->status, done);
return true;
}