bool RtmpChunkStream::OnPause()

in src/brpc/policy/rtmp_protocol.cpp [3302:3411]


bool RtmpChunkStream::OnPause(const RtmpMessageHeader& mh,
                              AMFInputStream* istream,
                              Socket* socket) {
    if (!connection_context()->is_server_side()) {
        RTMP_ERROR(socket, mh) << "Client should not receive `pause'";
        return false;
    }
    uint32_t transaction_id = 0;
    if (!ReadAMFUint32(&transaction_id, istream)) {
        RTMP_ERROR(socket, mh) << "Fail to read pause.TransactionId";
        return false;
    }
    if (!ReadAMFNull(istream)) { // command object
        RTMP_ERROR(socket, mh) << "Fail to read pause.CommandObject";
        return false;
    }
    bool pause_or_unpause = true;
    if (!ReadAMFBool(&pause_or_unpause, istream)) {
        RTMP_ERROR(socket, mh) << "Fail to read pause/unpause flag";
        return false;
    }
    double milliseconds = 0;
    if (!ReadAMFNumber(&milliseconds, istream)) {
        RTMP_ERROR(socket, mh) << "Fail to read pause.milliSeconds";
        return false;
    }

    butil::intrusive_ptr<RtmpStreamBase> stream;
    if (!connection_context()->FindMessageStream(mh.stream_id, &stream)) {
        RTMP_WARNING(socket, mh) << "Fail to find stream_id=" << mh.stream_id;
        return false;
    }
    if (stream->_paused == pause_or_unpause) {
        if (pause_or_unpause) {
            RTMP_ERROR(socket, mh) << "Pause an already paused stream";
        } else {
            RTMP_ERROR(socket, mh) << "Unpause an already unpaused stream";
        }
        return false;
    }
    int rc = static_cast<RtmpServerStream*>(stream.get())->OnPause(
        pause_or_unpause, milliseconds);

    // Send back status.
    butil::IOBuf req_buf;
    {
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
        AMFOutputStream ostream(&zc_stream);
        if (rc == 0) {
            WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
            WriteAMFUint32(0, &ostream);
            WriteAMFNull(&ostream);
            RtmpInfo info;
            if (pause_or_unpause) {
                info.set_code(RTMP_STATUS_CODE_STREAM_PAUSE);
            } else {
                info.set_code(RTMP_STATUS_CODE_STREAM_UNPAUSE);
            }
            info.set_level(RTMP_INFO_LEVEL_STATUS);
            info.set_description("Paused stream.");
            WriteAMFObject(info, &ostream);
            CHECK(ostream.good());
        } else {
            WriteAMFString(RTMP_AMF0_COMMAND_ERROR, &ostream);
            WriteAMFNumber(0, &ostream);
            WriteAMFNull(&ostream);
            RtmpInfo info;
            if (pause_or_unpause) {
                info.set_code(RTMP_STATUS_CODE_STREAM_PAUSE);
            } else {
                info.set_code(RTMP_STATUS_CODE_STREAM_UNPAUSE);
            }
            info.set_level(RTMP_INFO_LEVEL_ERROR);
            info.set_description(pause_or_unpause ? "Fail to pause" :
                                 "Fail to unpause");
            WriteAMFObject(info, &ostream);
            CHECK(ostream.good());
        }
    }
    SocketMessagePtr<RtmpUnsentMessage> msg1(new RtmpUnsentMessage);
    msg1->header.message_length = req_buf.size();
    msg1->header.message_type = RTMP_MESSAGE_COMMAND_AMF0;
    msg1->header.stream_id = mh.stream_id;
    msg1->chunk_stream_id = chunk_stream_id();
    msg1->body = req_buf;

    // StreamEOF(pause) or StreamBegin(unpause)
    char cntl_buf[6];
    char* p = cntl_buf;
    if (pause_or_unpause) {
        WriteBigEndian2Bytes(&p, RTMP_USER_CONTROL_EVENT_STREAM_EOF);
    } else {
        WriteBigEndian2Bytes(&p, RTMP_USER_CONTROL_EVENT_STREAM_BEGIN);
    }
    WriteBigEndian4Bytes(&p, mh.stream_id);
    RtmpUnsentMessage* msg2 = MakeUnsentControlMessage(
        RTMP_MESSAGE_USER_CONTROL, cntl_buf, sizeof(cntl_buf));
    msg1->next.reset(msg2);
    
    if (WriteWithoutOvercrowded(socket, msg1) != 0) {
        PLOG(WARNING) << socket->remote_side() << '[' << mh.stream_id
                      << "] Fail to respond " << (pause_or_unpause ? "pause" : "unpause");
        return false;
    }
    if (rc == 0) {
        stream->_paused = pause_or_unpause;
        return true;
    }
    return false;
}