in src/pipemode_op/Dataset/src/pipemode_dataset_op_kernel.cpp [187:217]
Status GetNextInternal(IteratorContext* ctx,
std::vector<Tensor>* out_tensors,
bool* end_of_sequence) override {
*end_of_sequence = false;
Tensor result_tensor(DT_STRING, TensorShape({}));
std::string* storage = &result_tensor.scalar<std::string>()();
try {
mutex_lock l(mu_);
auto start = std::chrono::high_resolution_clock::now();
if (record_reader_->ReadRecord(storage)) {
out_tensors->emplace_back(std::move(result_tensor));
} else {
*end_of_sequence = true;
}
auto end = std::chrono::high_resolution_clock::now();
auto delta_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
read_time_ += delta_ns;
read_bytes_ += storage->size();
records_read_++;
if (benchmark_records_interval_ != 0 && (records_read_ % benchmark_records_interval_ == 0)) {
std::cout << "PipeModeDatasetOp::Dataset::Iterator records: " << records_read_ << std::endl;
std::cout << "PipeModeDatasetOp::Dataset::Iterator records read_time_ns: " << delta_ns.count()
<< std::endl;
std::cout << "PipeModeDatasetOp::Dataset::Iterator records read_bytes: " << storage->size()
<< std::endl;
}
} catch(std::runtime_error& err) {
return Status(tensorflow::error::INTERNAL, err.what());
}
return Status::OK();
}