torchaudio/csrc/ffmpeg/streamer.cpp (238 lines of code) (raw):
#include <torchaudio/csrc/ffmpeg/ffmpeg.h>
#include <torchaudio/csrc/ffmpeg/streamer.h>
#include <chrono>
#include <sstream>
#include <stdexcept>
#include <thread>
namespace torchaudio {
namespace ffmpeg {
using KeyType = StreamProcessor::KeyType;
//////////////////////////////////////////////////////////////////////////////
// Helper methods
//////////////////////////////////////////////////////////////////////////////
void Streamer::validate_open_stream() const {
if (!pFormatContext)
throw std::runtime_error("Stream is not open.");
}
void Streamer::validate_src_stream_index(int i) const {
validate_open_stream();
if (i < 0 || i >= static_cast<int>(pFormatContext->nb_streams))
throw std::out_of_range("Source stream index out of range");
}
void Streamer::validate_output_stream_index(int i) const {
if (i < 0 || i >= static_cast<int>(stream_indices.size()))
throw std::out_of_range("Output stream index out of range");
}
void Streamer::validate_src_stream_type(int i, AVMediaType type) {
validate_src_stream_index(i);
if (pFormatContext->streams[i]->codecpar->codec_type != type) {
std::ostringstream oss;
oss << "Stream " << i << " is not " << av_get_media_type_string(type)
<< " stream.";
throw std::runtime_error(oss.str());
}
}
//////////////////////////////////////////////////////////////////////////////
// Initialization / resource allocations
//////////////////////////////////////////////////////////////////////////////
Streamer::Streamer(
const std::string& src,
const std::string& device,
const std::map<std::string, std::string>& option)
: pFormatContext(src, device, option) {
processors =
std::vector<std::unique_ptr<StreamProcessor>>(pFormatContext->nb_streams);
for (int i = 0; i < pFormatContext->nb_streams; ++i) {
switch (pFormatContext->streams[i]->codecpar->codec_type) {
case AVMEDIA_TYPE_AUDIO:
case AVMEDIA_TYPE_VIDEO:
break;
default:
pFormatContext->streams[i]->discard = AVDISCARD_ALL;
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Query methods
////////////////////////////////////////////////////////////////////////////////
int Streamer::num_src_streams() const {
return pFormatContext->nb_streams;
}
SrcStreamInfo Streamer::get_src_stream_info(int i) const {
validate_src_stream_index(i);
AVStream* stream = pFormatContext->streams[i];
AVCodecParameters* codecpar = stream->codecpar;
SrcStreamInfo ret;
ret.media_type = codecpar->codec_type;
ret.bit_rate = codecpar->bit_rate;
const AVCodecDescriptor* desc = avcodec_descriptor_get(codecpar->codec_id);
if (desc) {
ret.codec_name = desc->name;
ret.codec_long_name = desc->long_name;
}
switch (codecpar->codec_type) {
case AVMEDIA_TYPE_AUDIO:
ret.fmt_name =
av_get_sample_fmt_name(static_cast<AVSampleFormat>(codecpar->format));
ret.sample_rate = static_cast<double>(codecpar->sample_rate);
ret.num_channels = codecpar->channels;
break;
case AVMEDIA_TYPE_VIDEO:
ret.fmt_name =
av_get_pix_fmt_name(static_cast<AVPixelFormat>(codecpar->format));
ret.width = codecpar->width;
ret.height = codecpar->height;
ret.frame_rate = av_q2d(stream->r_frame_rate);
break;
default:;
}
return ret;
}
int Streamer::num_out_streams() const {
return stream_indices.size();
}
OutputStreamInfo Streamer::get_out_stream_info(int i) const {
validate_output_stream_index(i);
OutputStreamInfo ret;
int i_src = stream_indices[i].first;
KeyType key = stream_indices[i].second;
ret.source_index = i_src;
ret.filter_description = processors[i_src]->get_filter_description(key);
return ret;
}
int Streamer::find_best_audio_stream() const {
return av_find_best_stream(
pFormatContext, AVMEDIA_TYPE_AUDIO, -1, -1, NULL, 0);
}
int Streamer::find_best_video_stream() const {
return av_find_best_stream(
pFormatContext, AVMEDIA_TYPE_VIDEO, -1, -1, NULL, 0);
}
bool Streamer::is_buffer_ready() const {
for (const auto& it : processors) {
if (it && !it->is_buffer_ready()) {
return false;
}
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
// Configure methods
////////////////////////////////////////////////////////////////////////////////
void Streamer::seek(double timestamp) {
if (timestamp < 0) {
throw std::invalid_argument("timestamp must be non-negative.");
}
int64_t ts = static_cast<int64_t>(timestamp * AV_TIME_BASE);
int ret = avformat_seek_file(pFormatContext, -1, INT64_MIN, ts, INT64_MAX, 0);
if (ret < 0) {
throw std::runtime_error("Failed to seek. (" + av_err2string(ret) + ".)");
}
for (const auto& it : processors) {
if (it) {
it->flush();
}
}
}
void Streamer::add_audio_stream(
int i,
int frames_per_chunk,
int num_chunks,
std::string filter_desc) {
add_stream(
i,
AVMEDIA_TYPE_AUDIO,
frames_per_chunk,
num_chunks,
std::move(filter_desc));
}
void Streamer::add_video_stream(
int i,
int frames_per_chunk,
int num_chunks,
std::string filter_desc) {
add_stream(
i,
AVMEDIA_TYPE_VIDEO,
frames_per_chunk,
num_chunks,
std::move(filter_desc));
}
void Streamer::add_stream(
int i,
AVMediaType media_type,
int frames_per_chunk,
int num_chunks,
std::string filter_desc) {
validate_src_stream_type(i, media_type);
AVStream* stream = pFormatContext->streams[i];
stream->discard = AVDISCARD_DEFAULT;
if (!processors[i])
processors[i] = std::make_unique<StreamProcessor>(stream->codecpar);
int key = processors[i]->add_stream(
stream->time_base,
stream->codecpar,
frames_per_chunk,
num_chunks,
std::move(filter_desc));
stream_indices.push_back(std::make_pair<>(i, key));
}
void Streamer::remove_stream(int i) {
validate_output_stream_index(i);
auto it = stream_indices.begin() + i;
int iP = it->first;
processors[iP]->remove_stream(it->second);
stream_indices.erase(it);
// Check if the processor is still refered and if not, disable the processor
bool still_used = false;
for (auto& p : stream_indices) {
still_used |= (iP == p.first);
if (still_used)
break;
}
if (!still_used)
processors[iP].reset(NULL);
}
////////////////////////////////////////////////////////////////////////////////
// Stream methods
////////////////////////////////////////////////////////////////////////////////
// Note
// return value (to be finalized)
// 0: caller should keep calling this function
// 1: It's done, caller should stop calling
// <0: Some error happened
int Streamer::process_packet() {
int ret = av_read_frame(pFormatContext, pPacket);
if (ret == AVERROR_EOF) {
ret = drain();
return (ret < 0) ? ret : 1;
}
if (ret < 0)
return ret;
AutoPacketUnref packet{pPacket};
auto& processor = processors[pPacket->stream_index];
if (!processor)
return 0;
ret = processor->process_packet(packet);
return (ret < 0) ? ret : 0;
}
// Similar to `process_packet()`, but in case process_packet returns EAGAIN,
// it keeps retrying until timeout happens,
//
// timeout and backoff is given in millisecond
int Streamer::process_packet_block(double timeout, double backoff) {
auto dead_line = [&]() {
// If timeout < 0, then it repeats forever
if (timeout < 0) {
return std::chrono::time_point<std::chrono::steady_clock>::max();
}
auto timeout_ = static_cast<int64_t>(1000 * timeout);
return std::chrono::steady_clock::now() +
std::chrono::microseconds{timeout_};
}();
std::chrono::microseconds sleep{static_cast<int64_t>(1000 * backoff)};
while (true) {
int ret = process_packet();
if (ret != AVERROR(EAGAIN)) {
return ret;
}
if (dead_line < std::chrono::steady_clock::now()) {
return ret;
}
// FYI: ffmpeg sleeps 10 milli seconds if the read happens in a separate
// thread
// https://github.com/FFmpeg/FFmpeg/blob/b0f8dbb0cacc45a19f18c043afc706d7d26bef74/fftools/ffmpeg.c#L3952
// https://github.com/FFmpeg/FFmpeg/blob/b0f8dbb0cacc45a19f18c043afc706d7d26bef74/fftools/ffmpeg.c#L4542
//
std::this_thread::sleep_for(sleep);
}
}
// <0: Some error happened.
int Streamer::drain() {
int ret = 0, tmp = 0;
for (auto& p : processors) {
if (p) {
tmp = p->process_packet(NULL);
if (tmp < 0)
ret = tmp;
}
}
return ret;
}
std::vector<c10::optional<torch::Tensor>> Streamer::pop_chunks() {
std::vector<c10::optional<torch::Tensor>> ret;
for (auto& i : stream_indices) {
ret.push_back(processors[i.first]->pop_chunk(i.second));
}
return ret;
}
} // namespace ffmpeg
} // namespace torchaudio