in src/cc/actorpool.cc [107:145]
void enqueue(QueueItem item) {
if (check_inputs_) {
bool is_empty = true;
item.tensors.for_each([this, &is_empty](const torch::Tensor& tensor) {
is_empty = false;
if (tensor.dim() <= batch_dim_) {
throw py::value_error(
"Enqueued tensors must have more than batch_dim == " +
std::to_string(batch_dim_) + " dimensions, but got " +
std::to_string(tensor.dim()));
}
});
if (is_empty) {
throw py::value_error("Cannot enqueue empty vector of tensors");
}
}
bool should_notify = false;
{
std::unique_lock<std::mutex> lock(mu_);
// Block when maximum_queue_size is reached.
while (maximum_queue_size_ != std::nullopt && !is_closed_ &&
deque_.size() >= *maximum_queue_size_) {
can_enqueue_.wait(lock);
}
if (is_closed_) {
throw ClosedBatchingQueue("Enqueue to closed queue");
}
deque_.push_back(std::move(item));
should_notify = deque_.size() >= minimum_batch_size_;
}
if (should_notify) {
enough_inputs_.notify_one();
}
}