in src/brpc/policy/rtmp_protocol.cpp [2576:2713]
bool RtmpChunkStream::OnCreateStream(const RtmpMessageHeader& mh,
AMFInputStream* istream,
Socket* socket) {
RtmpService* service = connection_context()->service();
if (service == NULL) {
RTMP_ERROR(socket, mh) << "Client should not receive `createStream'";
return false;
}
double transaction_id = 0;
if (!ReadAMFNumber(&transaction_id, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read createStream.TransactionId";
return false;
}
bool is_publish = false;
RtmpPublishType publish_type = RTMP_PUBLISH_LIVE;
std::string stream_name;
AMFObject cmd_obj;
if (!ReadAMFObject(&cmd_obj, istream)) {
RTMP_ERROR(socket, mh) << "Fail to read createStream.CommandObject";
return false;
}
const AMFField* cmd_name_field = cmd_obj.Find("CommandName");
if (cmd_name_field != NULL && cmd_name_field->IsString()) {
is_publish = (cmd_name_field->AsString() == "publish");
}
const AMFField* stream_name_field = cmd_obj.Find("StreamName");
if (stream_name_field != NULL && stream_name_field->IsString()) {
stream_name_field->AsString().CopyToString(&stream_name);
}
if (is_publish) {
const AMFField* publish_type_field = cmd_obj.Find("PublishType");
if (publish_type_field != NULL && publish_type_field->IsString()) {
Str2RtmpPublishType(publish_type_field->AsString(), &publish_type);
}
}
RPC_VLOG << socket->remote_side() << "[" << mh.stream_id
<< "] createStream{transaction_id=" << transaction_id << '}';
std::string error_text;
butil::intrusive_ptr<RtmpServerStream> stream(
service->NewStream(connection_context()->_connect_req));
if (connection_context()->_connect_req.stream_multiplexing() &&
stream != NULL) {
stream->_client_supports_stream_multiplexing = true;
}
if (NULL == stream) {
error_text = "Fail to create stream";
LOG(ERROR) << error_text;
} else {
socket->ReAddress(&stream->_rtmpsock);
if (!connection_context()->AddServerStream(stream.get())) {
error_text = "Fail to add stream";
LOG(ERROR) << error_text;
} else {
const int rc = bthread_id_create(&stream->_onfail_id, stream.get(),
RtmpServerStream::RunOnFailed);
if (rc) {
LOG(ERROR) << "Fail to create RtmpServerStream._onfail_id: "
<< berror(rc);
stream->OnStopInternal();
return false;
}
// Add a ref for RunOnFailed.
butil::intrusive_ptr<RtmpServerStream>(stream).detach();
socket->fail_me_at_server_stop();
socket->NotifyOnFailed(stream->_onfail_id);
}
}
// Respond createStream
butil::IOBuf req_buf;
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString((!error_text.empty() ? RTMP_AMF0_COMMAND_ERROR :
RTMP_AMF0_COMMAND_RESULT), &ostream);
WriteAMFNumber(transaction_id, &ostream);
if (error_text.empty()) {
if (!stream_name.empty()) {
AMFObject cmd_obj;
cmd_obj.SetBool("PlayOrPublishAccepted", true);
WriteAMFObject(cmd_obj, &ostream);
} else {
WriteAMFNull(&ostream);
}
WriteAMFUint32(stream->stream_id(), &ostream);
} else {
WriteAMFNull(&ostream);
RtmpInfo info;
info.set_level(RTMP_INFO_LEVEL_ERROR);
// TODO(gejun): Not sure about the code.
info.set_code("NetConnection.CreateStream.Rejected");
info.set_description(error_text);
WriteAMFObject(info, &ostream);
}
CHECK(ostream.good());
}
SocketMessagePtr<RtmpUnsentMessage> msg(
MakeUnsentControlMessage(
RTMP_MESSAGE_COMMAND_AMF0, chunk_stream_id(), req_buf));
if (WriteWithoutOvercrowded(socket, msg) != 0) {
PLOG(WARNING) << socket->remote_side() << '[' << mh.stream_id
<< "] Fail to respond createStream";
// End the stream at server-side.
const bthread_id_t id = stream->_onfail_id;
if (id != INVALID_BTHREAD_ID) {
bthread_id_error(id, 0);
}
return false;
}
if (!error_text.empty()) {
return false;
}
if (stream_name.empty()) {
return true;
}
butil::IOBuf cmd_buf;
{
butil::IOBufAsZeroCopyOutputStream zc_ostream(&cmd_buf);
AMFOutputStream ostream(&zc_ostream);
WriteAMFUint32(0, &ostream); // TransactionId
WriteAMFNull(&ostream); // CommandObject
WriteAMFString(stream_name, &ostream); // StreamName
if (is_publish) {
WriteAMFString(RtmpPublishType2Str(publish_type), &ostream);
}
}
butil::IOBufAsZeroCopyInputStream zc_istream(cmd_buf);
AMFInputStream cmd_istream(&zc_istream);
RtmpMessageHeader header;
header.timestamp = mh.timestamp;
header.message_length = cmd_buf.size();
header.message_type = RTMP_MESSAGE_COMMAND_AMF0;
header.stream_id = stream->stream_id();
if (is_publish) {
return OnPublish(header, &cmd_istream, socket);
} else {
return OnPlay(header, &cmd_istream, socket);
}
}