in torchvision/csrc/io/decoder/decoder.cpp [500:614]
int Decoder::getFrame(size_t workingTimeInMs) {
if (inRange_.none()) {
return ENODATA;
}
// decode frames until cache is full and leave thread
// once decode() method gets called and grab some bytes
// run this method again
// init package
AVPacket avPacket;
av_init_packet(&avPacket);
avPacket.data = nullptr;
avPacket.size = 0;
auto end = std::chrono::steady_clock::now() +
std::chrono::milliseconds(workingTimeInMs);
// return true if elapsed time less than timeout
auto watcher = [end]() -> bool {
return std::chrono::steady_clock::now() <= end;
};
int result = 0;
size_t decodingErrors = 0;
bool decodedFrame = false;
while (!interrupted_ && inRange_.any() && !decodedFrame && watcher()) {
result = av_read_frame(inputCtx_, &avPacket);
if (result == AVERROR(EAGAIN)) {
VLOG(4) << "Decoder is busy...";
std::this_thread::yield();
result = 0; // reset error, EAGAIN is not an error at all
continue;
} else if (result == AVERROR_EOF) {
flushStreams();
VLOG(1) << "End of stream";
result = ENODATA;
break;
} else if (result < 0) {
flushStreams();
LOG(ERROR) << "Error detected: " << Util::generateErrorDesc(result);
break;
}
// get stream
auto stream = findByIndex(avPacket.stream_index);
if (stream == nullptr || !inRange_.test(stream->getIndex())) {
av_packet_unref(&avPacket);
continue;
}
size_t numConsecutiveNoBytes = 0;
// it can be only partial decoding of the package bytes
do {
// decode package
bool gotFrame = false;
bool hasMsg = false;
// packet either got consumed completely or not at all
if ((result = processPacket(
stream, &avPacket, &gotFrame, &hasMsg, params_.fastSeek)) < 0) {
LOG(ERROR) << "processPacket failed with code: " << result;
break;
}
if (!gotFrame && params_.maxProcessNoBytes != 0 &&
++numConsecutiveNoBytes > params_.maxProcessNoBytes) {
LOG(ERROR) << "uuid=" << params_.loggingUuid
<< " exceeding max amount of consecutive no bytes";
break;
}
if (result > 0) {
numConsecutiveNoBytes = 0;
}
decodedFrame |= hasMsg;
} while (result == 0);
// post loop check
if (result < 0) {
if (params_.maxPackageErrors != 0 && // check errors
++decodingErrors >= params_.maxPackageErrors) { // reached the limit
LOG(ERROR) << "uuid=" << params_.loggingUuid
<< " exceeding max amount of consecutive package errors";
break;
}
} else {
decodingErrors = 0; // reset on success
}
result = 0;
av_packet_unref(&avPacket);
}
av_packet_unref(&avPacket);
VLOG(2) << "Interrupted loop"
<< ", interrupted_ " << interrupted_ << ", inRange_.any() "
<< inRange_.any() << ", decodedFrame " << decodedFrame << ", result "
<< result;
// loop can be terminated, either by:
// 1. explcitly iterrupted
// 2. terminated by workable timeout
// 3. unrecoverable error or ENODATA (end of stream)
// 4. decoded frames pts are out of the specified range
// 5. success decoded frame
if (interrupted_) {
return EINTR;
}
if (result != 0) {
return result;
}
if (inRange_.none()) {
return ENODATA;
}
return 0;
}