bool RtmpChunkStream::OnCreateStream()

in src/brpc/policy/rtmp_protocol.cpp [2576:2713]


bool RtmpChunkStream::OnCreateStream(const RtmpMessageHeader& mh,
                                     AMFInputStream* istream,
                                     Socket* socket) {
    RtmpService* service = connection_context()->service();
    if (service == NULL) {
        RTMP_ERROR(socket, mh) << "Client should not receive `createStream'";
        return false;
    }
    double transaction_id = 0;
    if (!ReadAMFNumber(&transaction_id, istream)) {
        RTMP_ERROR(socket, mh) << "Fail to read createStream.TransactionId";
        return false;
    }
    bool is_publish = false;
    RtmpPublishType publish_type = RTMP_PUBLISH_LIVE;
    std::string stream_name;
    AMFObject cmd_obj;
    if (!ReadAMFObject(&cmd_obj, istream)) {
        RTMP_ERROR(socket, mh) << "Fail to read createStream.CommandObject";
        return false;
    }
    const AMFField* cmd_name_field = cmd_obj.Find("CommandName");
    if (cmd_name_field != NULL && cmd_name_field->IsString()) {
        is_publish = (cmd_name_field->AsString() == "publish");
    }
    const AMFField* stream_name_field = cmd_obj.Find("StreamName");
    if (stream_name_field != NULL && stream_name_field->IsString()) {
        stream_name_field->AsString().CopyToString(&stream_name);
    }
    if (is_publish) {
        const AMFField* publish_type_field = cmd_obj.Find("PublishType");
        if (publish_type_field != NULL && publish_type_field->IsString()) {
            Str2RtmpPublishType(publish_type_field->AsString(), &publish_type);
        }
    }
    RPC_VLOG << socket->remote_side() << "[" << mh.stream_id
             << "] createStream{transaction_id=" << transaction_id << '}';
    std::string error_text;
    butil::intrusive_ptr<RtmpServerStream> stream(
        service->NewStream(connection_context()->_connect_req));
    if (connection_context()->_connect_req.stream_multiplexing() &&
        stream != NULL) {
        stream->_client_supports_stream_multiplexing = true;
    }
    if (NULL == stream) {
        error_text = "Fail to create stream";
        LOG(ERROR) << error_text;
    } else {
        socket->ReAddress(&stream->_rtmpsock);
        if (!connection_context()->AddServerStream(stream.get())) {
            error_text = "Fail to add stream";
            LOG(ERROR) << error_text;
        } else {
            const int rc = bthread_id_create(&stream->_onfail_id, stream.get(),
                                             RtmpServerStream::RunOnFailed);
            if (rc) {
                LOG(ERROR) << "Fail to create RtmpServerStream._onfail_id: "
                           << berror(rc);
                stream->OnStopInternal();
                return false;
            }
            // Add a ref for RunOnFailed.
            butil::intrusive_ptr<RtmpServerStream>(stream).detach();
            socket->fail_me_at_server_stop();
            socket->NotifyOnFailed(stream->_onfail_id);
        }
    }
    // Respond createStream
    butil::IOBuf req_buf;
    {
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
        AMFOutputStream ostream(&zc_stream);
        WriteAMFString((!error_text.empty() ? RTMP_AMF0_COMMAND_ERROR :
                        RTMP_AMF0_COMMAND_RESULT), &ostream);
        WriteAMFNumber(transaction_id, &ostream);
        if (error_text.empty()) {
            if (!stream_name.empty()) {
                AMFObject cmd_obj;
                cmd_obj.SetBool("PlayOrPublishAccepted", true);
                WriteAMFObject(cmd_obj, &ostream);
            } else {
                WriteAMFNull(&ostream);
            }
            WriteAMFUint32(stream->stream_id(), &ostream);
        } else {
            WriteAMFNull(&ostream);
            RtmpInfo info;
            info.set_level(RTMP_INFO_LEVEL_ERROR);
             // TODO(gejun): Not sure about the code.
            info.set_code("NetConnection.CreateStream.Rejected");
            info.set_description(error_text);
            WriteAMFObject(info, &ostream);
        }
        CHECK(ostream.good());
    }
    SocketMessagePtr<RtmpUnsentMessage> msg(
        MakeUnsentControlMessage(
            RTMP_MESSAGE_COMMAND_AMF0, chunk_stream_id(), req_buf));
    if (WriteWithoutOvercrowded(socket, msg) != 0) {
        PLOG(WARNING) << socket->remote_side() << '[' << mh.stream_id
                      << "] Fail to respond createStream";
        // End the stream at server-side.
        const bthread_id_t id = stream->_onfail_id;
        if (id != INVALID_BTHREAD_ID) {
            bthread_id_error(id, 0);
        }
        return false;
    }
    if (!error_text.empty()) {
        return false;
    }
    if (stream_name.empty()) {
        return true;
    }
    butil::IOBuf cmd_buf;
    {
        butil::IOBufAsZeroCopyOutputStream zc_ostream(&cmd_buf);
        AMFOutputStream ostream(&zc_ostream);
        WriteAMFUint32(0, &ostream);  // TransactionId
        WriteAMFNull(&ostream);       // CommandObject
        WriteAMFString(stream_name, &ostream); // StreamName
        if (is_publish) {
            WriteAMFString(RtmpPublishType2Str(publish_type), &ostream);
        }
    }
    butil::IOBufAsZeroCopyInputStream zc_istream(cmd_buf);
    AMFInputStream cmd_istream(&zc_istream);
    RtmpMessageHeader header;
    header.timestamp = mh.timestamp;
    header.message_length = cmd_buf.size();
    header.message_type = RTMP_MESSAGE_COMMAND_AMF0;
    header.stream_id = stream->stream_id();
    if (is_publish) {
        return OnPublish(header, &cmd_istream, socket);
    } else {
        return OnPlay(header, &cmd_istream, socket);
    }
}