void ProcessSessionImpl::import()

in libminifi/src/core/ProcessSession.cpp [592:693]


void ProcessSessionImpl::import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) {
  std::shared_ptr<ResourceClaim> claim;
  std::shared_ptr<io::BaseStream> stream;
  std::shared_ptr<core::FlowFile> flowFile;

  std::vector<uint8_t> buffer(getpagesize());
  try {
    std::ifstream input{source, std::ios::in | std::ios::binary};
    logger_->log_debug("Opening {}", source);
    if (!input.is_open() || !input.good()) {
      throw Exception(FILE_OPERATION_EXCEPTION, utils::string::join_pack("File Import Error: failed to open file \'", source, "\'"));
    }
    if (offset != 0U) {
      input.seekg(gsl::narrow<std::streamoff>(offset), std::ifstream::beg);
      if (!input.good()) {
        logger_->log_error("Seeking to {} failed for file {} (does file/filesystem support seeking?)", offset, source);
        throw Exception(FILE_OPERATION_EXCEPTION, utils::string::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
      }
    }
    while (input.good()) {
      input.read(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(buffer.size()));
      std::streamsize read = input.gcount();
      if (read < 0) {
        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
      }
      if (read == 0) {
        logger_->log_trace("Finished reading input {}", source);
        break;
      } else {
        logger_->log_trace("Read input of {}", read);
      }
      uint8_t* begin = buffer.data();
      uint8_t* end = begin + read;
      while (true) {
        auto start_time = std::chrono::steady_clock::now();
        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
        const auto len = gsl::narrow<size_t>(delimiterPos - begin);

        logger_->log_trace("Read input of {} length is {} is at end? {}", read, len, delimiterPos == end);
        /*
         * We do not want to process the rest of the buffer after the last delimiter if
         *  - we have reached EOF in the file (we would discard it anyway)
         *  - there is nothing to process (the last character in the buffer is a delimiter)
         */
        if (delimiterPos == end && (input.eof() || len == 0)) {
          break;
        }

        /* Create claim and stream if needed and append data */
        if (claim == nullptr) {
          start_time = std::chrono::steady_clock::now();
          claim = content_session_->create();
        }
        if (stream == nullptr) {
          stream = content_session_->write(claim);
        }
        if (stream == nullptr) {
          logger_->log_error("Stream is null");
          throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for import");
        }
        if (stream->write(begin, len) != len) {
          logger_->log_error("Error while writing");
          stream->close();
          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
        }

        /* Create a FlowFile if we reached a delimiter */
        if (delimiterPos == end) {
          break;
        }
        flowFile = create();
        flowFile->setSize(stream->size());
        flowFile->setOffset(0);
        flowFile->setResourceClaim(claim);
        logger_->log_debug("Import offset {} length {} into content {}, FlowFile UUID {}",
            flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
        stream->close();
        if (metrics_) {
          metrics_->bytesWritten() += stream->size();
        }
        std::string details = process_context_->getProcessor().getName() + " modify flow record content " + flowFile->getUUIDStr();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
        provenance_report_->modifyContent(*flowFile, details, duration);
        flows.push_back(flowFile);

        /* Reset these to start processing the next FlowFile with a clean slate */
        flowFile.reset();
        stream.reset();
        claim.reset();

        /* Skip delimiter */
        begin = delimiterPos + 1;
      }
    }
  } catch (const std::exception& exception) {
    logger_->log_debug("Caught Exception during ProcessSession::import, type: {}, what: {}", typeid(exception).name(), exception.what());
    throw;
  } catch (...) {
    logger_->log_debug("Caught Exception during ProcessSession::import, type: {}", getCurrentExceptionTypeName());
    throw;
  }
}