Status GetNextInternal()

in src/pipemode_op/Dataset/src/pipemode_dataset_op_kernel.cpp [187:217]


            Status GetNextInternal(IteratorContext* ctx,
                                 std::vector<Tensor>* out_tensors,
                                 bool* end_of_sequence) override {
                *end_of_sequence = false;
                Tensor result_tensor(DT_STRING, TensorShape({}));
                std::string* storage = &result_tensor.scalar<std::string>()();
                try {
                    mutex_lock l(mu_);
                    auto start = std::chrono::high_resolution_clock::now();
                    if (record_reader_->ReadRecord(storage)) {
                        out_tensors->emplace_back(std::move(result_tensor));
                    } else {
                        *end_of_sequence = true;
                    }
                    auto end = std::chrono::high_resolution_clock::now();
                    auto delta_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
                    read_time_ += delta_ns;
                    read_bytes_ += storage->size();
                    records_read_++;
                    if (benchmark_records_interval_ != 0 && (records_read_ % benchmark_records_interval_ == 0)) {
                        std::cout << "PipeModeDatasetOp::Dataset::Iterator records: " << records_read_  << std::endl;
                        std::cout << "PipeModeDatasetOp::Dataset::Iterator records read_time_ns: " << delta_ns.count()
                            << std::endl;
                        std::cout << "PipeModeDatasetOp::Dataset::Iterator records read_bytes: " << storage->size()
                            << std::endl;
                    }
                } catch(std::runtime_error& err) {
                    return Status(tensorflow::error::INTERNAL, err.what());
                }
                return Status::OK();
            }