void ProcessSession::import()

in libminifi/src/core/ProcessSession.cpp [559:657]


void ProcessSession::import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) {
  std::shared_ptr<ResourceClaim> claim;
  std::shared_ptr<io::BaseStream> stream;
  std::shared_ptr<core::FlowFile> flowFile;

  std::vector<uint8_t> buffer(getpagesize());
  try {
    std::ifstream input{source, std::ios::in | std::ios::binary};
    logger_->log_debug("Opening %s", source);
    if (!input.is_open() || !input.good()) {
      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
    }
    if (offset != 0U) {
      input.seekg(offset, std::ifstream::beg);
      if (!input.good()) {
        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
      }
    }
    while (input.good()) {
      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
      std::streamsize read = input.gcount();
      if (read < 0) {
        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
      }
      if (read == 0) {
        logger_->log_trace("Finished reading input %s", source);
        break;
      } else {
        logging::LOG_TRACE(logger_) << "Read input of " << read;
      }
      uint8_t* begin = buffer.data();
      uint8_t* end = begin + read;
      while (true) {
        auto start_time = std::chrono::steady_clock::now();
        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
        const auto len = gsl::narrow<size_t>(delimiterPos - begin);

        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
        /*
         * We do not want to process the rest of the buffer after the last delimiter if
         *  - we have reached EOF in the file (we would discard it anyway)
         *  - there is nothing to process (the last character in the buffer is a delimiter)
         */
        if (delimiterPos == end && (input.eof() || len == 0)) {
          break;
        }

        /* Create claim and stream if needed and append data */
        if (claim == nullptr) {
          start_time = std::chrono::steady_clock::now();
          claim = content_session_->create();
        }
        if (stream == nullptr) {
          stream = content_session_->write(claim);
        }
        if (stream == nullptr) {
          logger_->log_error("Stream is null");
          throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for import");
        }
        if (stream->write(begin, len) != len) {
          logger_->log_error("Error while writing");
          stream->close();
          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
        }

        /* Create a FlowFile if we reached a delimiter */
        if (delimiterPos == end) {
          break;
        }
        flowFile = create();
        flowFile->setSize(stream->size());
        flowFile->setOffset(0);
        flowFile->setResourceClaim(claim);
        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
        stream->close();
        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
        provenance_report_->modifyContent(flowFile, details, duration);
        flows.push_back(flowFile);

        /* Reset these to start processing the next FlowFile with a clean slate */
        flowFile.reset();
        stream.reset();
        claim.reset();

        /* Skip delimiter */
        begin = delimiterPos + 1;
      }
    }
  } catch (const std::exception& exception) {
    logger_->log_debug("Caught Exception during ProcessSession::import, type: %s, what: %s", typeid(exception).name(), exception.what());
    throw;
  } catch (...) {
    logger_->log_debug("Caught Exception during ProcessSession::import, type: %s", getCurrentExceptionTypeName());
    throw;
  }
}