torchaudio/csrc/ffmpeg/stream_processor.cpp (82 lines of code) (raw):
#include <torchaudio/csrc/ffmpeg/stream_processor.h>
#include <stdexcept>
namespace torchaudio {
namespace ffmpeg {
using KeyType = StreamProcessor::KeyType;
StreamProcessor::StreamProcessor(AVCodecParameters* codecpar)
: decoder(codecpar) {}
////////////////////////////////////////////////////////////////////////////////
// Configurations
////////////////////////////////////////////////////////////////////////////////
KeyType StreamProcessor::add_stream(
AVRational input_time_base,
AVCodecParameters* codecpar,
int frames_per_chunk,
int num_chunks,
std::string filter_description) {
switch (codecpar->codec_type) {
case AVMEDIA_TYPE_AUDIO:
case AVMEDIA_TYPE_VIDEO:
break;
default:
throw std::runtime_error("Only Audio and Video are supported");
}
KeyType key = current_key++;
sinks.emplace(
std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(
input_time_base,
codecpar,
frames_per_chunk,
num_chunks,
std::move(filter_description)));
decoder_time_base = av_q2d(input_time_base);
return key;
}
void StreamProcessor::remove_stream(KeyType key) {
sinks.erase(key);
}
////////////////////////////////////////////////////////////////////////////////
// Query methods
////////////////////////////////////////////////////////////////////////////////
std::string StreamProcessor::get_filter_description(KeyType key) const {
return sinks.at(key).filter.get_description();
}
bool StreamProcessor::is_buffer_ready() const {
for (const auto& it : sinks) {
if (!it.second.is_buffer_ready()) {
return false;
}
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
// The streaming process
////////////////////////////////////////////////////////////////////////////////
// 0: some kind of success
// <0: Some error happened
int StreamProcessor::process_packet(AVPacket* packet) {
int ret = decoder.process_packet(packet);
while (ret >= 0) {
ret = decoder.get_frame(pFrame1);
// AVERROR(EAGAIN) means that new input data is required to return new
// output.
if (ret == AVERROR(EAGAIN))
return 0;
if (ret == AVERROR_EOF)
return send_frame(NULL);
if (ret < 0)
return ret;
send_frame(pFrame1);
av_frame_unref(pFrame1);
}
return ret;
}
void StreamProcessor::flush() {
decoder.flush_buffer();
for (auto& ite : sinks) {
ite.second.flush();
}
}
// 0: some kind of success
// <0: Some error happened
int StreamProcessor::send_frame(AVFrame* pFrame) {
int ret = 0;
for (auto& ite : sinks) {
int ret2 = ite.second.process_frame(pFrame);
if (ret2 < 0)
ret = ret2;
}
return ret;
}
////////////////////////////////////////////////////////////////////////////////
// Retrieval
////////////////////////////////////////////////////////////////////////////////
c10::optional<torch::Tensor> StreamProcessor::pop_chunk(KeyType key) {
return sinks.at(key).buffer->pop_chunk();
}
} // namespace ffmpeg
} // namespace torchaudio