bool RtmpChunkStream::OnPlay()

in src/brpc/policy/rtmp_protocol.cpp [2745:2923]


bool RtmpChunkStream::OnPlay(const RtmpMessageHeader& mh,
                             AMFInputStream* istream,
                             Socket* socket) {
    if (!connection_context()->is_server_side()) {
        RTMP_ERROR(socket, mh) << "Client should not receive `play'";
        return false;
    }
    uint32_t transaction_id = 0;
    if (!ReadAMFUint32(&transaction_id, istream)) {
        RTMP_ERROR(socket, mh) << "Fail to read play.TransactionId";
        return false;
    }
    // ffmpeg send non-zero transaction_id for play.
    
    if (!ReadAMFNull(istream)) {
        RTMP_ERROR(socket, mh) << "Fail to read play.CommandObject";
        return false;
    }
    RtmpPlayOptions play_opt; // inner fields are initialized with defaults.
    if (!ReadAMFString(&play_opt.stream_name, istream)) {
        RTMP_ERROR(socket, mh) << "Fail to read play.StreamName";
        return false;
    }
    // start/duration/reset are optional, check emptiness of the stream before
    // calling ReadAMFXXX which prints log for failed branches.
    if (!istream->check_emptiness()) {
        if (!ReadAMFNumber(&play_opt.start, istream)) {
            RTMP_ERROR(socket, mh) << "Fail to read play.Start";
            return false;
        }
    }
    if (!istream->check_emptiness()) {
        if (!ReadAMFNumber(&play_opt.duration, istream)) {
            RTMP_ERROR(socket, mh) << "Fail to read play.Duration";
            return false;
        }
    }
    if (!istream->check_emptiness()) {
        if (!ReadAMFBool(&play_opt.reset, istream)) {
            RTMP_ERROR(socket, mh) << "Fail to read play.Reset";
            return false;
        }
    }
    RPC_VLOG << socket->remote_side() << "[" << mh.stream_id
             << "] play{transaction_id=" << transaction_id
             << " stream_name=" << play_opt.stream_name
             << " start=" << play_opt.start
             << " duration=" << play_opt.duration
             << " reset=" << play_opt.reset << '}';

    butil::IOBuf req_buf;
    TemporaryArrayBuilder<SocketMessagePtr<RtmpUnsentMessage>, 5> msgs;
    
    // TODO(gejun): RTMP spec sends StreamIsRecorded before StreamBegin
    // however SRS does not.
    // StreamBegin
    {
        char cntl_buf[6];
        char* p = cntl_buf;
        WriteBigEndian2Bytes(&p, RTMP_USER_CONTROL_EVENT_STREAM_BEGIN);
        WriteBigEndian4Bytes(&p, mh.stream_id);
        msgs.push().reset(MakeUnsentControlMessage(
                              RTMP_MESSAGE_USER_CONTROL, cntl_buf, sizeof(cntl_buf)));
    }
    // Play.Reset
    if (play_opt.reset) {
        // According to RTMP spec: NetStream.Play.Reset is sent only if the
        // play command sent by the client has set the reset flag. 
        req_buf.clear();
        {
            butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
            AMFOutputStream ostream(&zc_stream);
            WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
            WriteAMFUint32(0, &ostream);
            WriteAMFNull(&ostream);
            RtmpInfo info;
            info.set_code(RTMP_STATUS_CODE_PLAY_RESET);
            info.set_level(RTMP_INFO_LEVEL_STATUS);
            info.set_description("Reset " + play_opt.stream_name);
            WriteAMFObject(info, &ostream);
        }
        RtmpUnsentMessage* msg = new RtmpUnsentMessage;
        msg->header.message_length = req_buf.size();
        msg->header.message_type = RTMP_MESSAGE_COMMAND_AMF0;
        msg->header.stream_id = mh.stream_id;
        msg->chunk_stream_id = chunk_stream_id();
        msg->body = req_buf;
        msgs.push().reset(msg);
    }
    
    // Play.Start
    req_buf.clear();
    {
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
        AMFOutputStream ostream(&zc_stream);
        WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
        WriteAMFUint32(0, &ostream);
        WriteAMFNull(&ostream);
        RtmpInfo info;
        info.set_code(RTMP_STATUS_CODE_PLAY_START);
        info.set_level(RTMP_INFO_LEVEL_STATUS);
        info.set_description("Start playing " + play_opt.stream_name);
        WriteAMFObject(info, &ostream);
    }
    RtmpUnsentMessage* msg2 = new RtmpUnsentMessage;
    msg2->header.message_length = req_buf.size();
    msg2->header.message_type = RTMP_MESSAGE_COMMAND_AMF0;
    msg2->header.stream_id = mh.stream_id;
    msg2->chunk_stream_id = chunk_stream_id();
    msg2->body = req_buf;
    msgs.push().reset(msg2);

    // |RtmpSampleAccess(true, true)
    req_buf.clear();
    {
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
        AMFOutputStream ostream(&zc_stream);
        WriteAMFString(RTMP_AMF0_SAMPLE_ACCESS, &ostream);
        WriteAMFBool(true, &ostream);
        WriteAMFBool(true, &ostream);
    }
    RtmpUnsentMessage* msg3 = new RtmpUnsentMessage;
    msg3->header.message_length = req_buf.size();
    msg3->header.message_type = RTMP_MESSAGE_DATA_AMF0;
    msg3->header.stream_id = mh.stream_id;
    msg3->chunk_stream_id = chunk_stream_id();
    msg3->body = req_buf;
    msgs.push().reset(msg3);

    // onStatus(NetStream.Data.Start)
    req_buf.clear();
    {
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
        AMFOutputStream ostream(&zc_stream);
        WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
        RtmpInfo info;
        info.set_code(RTMP_STATUS_CODE_DATA_START);
        WriteAMFObject(info, &ostream);
    }
    RtmpUnsentMessage* msg4 = new RtmpUnsentMessage;
    msg4->header.message_length = req_buf.size();
    msg4->header.message_type = RTMP_MESSAGE_DATA_AMF0;
    msg4->header.stream_id = mh.stream_id;
    msg4->chunk_stream_id = chunk_stream_id();
    msg4->body = req_buf;
    msgs.push().reset(msg4);

    butil::intrusive_ptr<RtmpStreamBase> stream_guard;
    if (!connection_context()->FindMessageStream(mh.stream_id, &stream_guard)) {
        RTMP_WARNING(socket, mh) << "Fail to find stream_id=" << mh.stream_id;
        return false;
    }
    // Change the chunk_stream_id of the server stream to be same with play,
    // so that laterly user can call SendXXXMessage successfully.
    stream_guard->_chunk_stream_id = chunk_stream_id();
    RtmpServerStream* stream = static_cast<RtmpServerStream*>(stream_guard.get());

    for (size_t i = msgs.size(); i > 1; --i) {
        msgs[i-2]->next.reset(msgs[i-1].release());
    }
    if (WriteWithoutOvercrowded(socket, msgs[0]) != 0) {
        PLOG(WARNING) << socket->remote_side() << '[' << mh.stream_id
                      << "] Fail to respond play";
        return false;
    }
    // cyberplayer sends play instead of unpause (and send closeStream instead
    // of pause). play automatically unpauses
    if (stream->_paused) {
        stream->_paused = false;
        RPC_VLOG << "Trigger unpause";
        stream->OnPause(false, 0);
    }
    // Call user's callback.
    OnPlayContinuation* done = new OnPlayContinuation;
    done->player_stream.reset(stream, false/*don't add ref*/);
    stream_guard.detach();
    done->player_stream->OnPlay(play_opt, &done->status, done);
    return true;
}