in libminifi/src/core/ProcessSession.cpp [559:657]
void ProcessSession::import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) {
std::shared_ptr<ResourceClaim> claim;
std::shared_ptr<io::BaseStream> stream;
std::shared_ptr<core::FlowFile> flowFile;
std::vector<uint8_t> buffer(getpagesize());
try {
std::ifstream input{source, std::ios::in | std::ios::binary};
logger_->log_debug("Opening %s", source);
if (!input.is_open() || !input.good()) {
throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
}
if (offset != 0U) {
input.seekg(offset, std::ifstream::beg);
if (!input.good()) {
logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
}
}
while (input.good()) {
input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
std::streamsize read = input.gcount();
if (read < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
}
if (read == 0) {
logger_->log_trace("Finished reading input %s", source);
break;
} else {
logging::LOG_TRACE(logger_) << "Read input of " << read;
}
uint8_t* begin = buffer.data();
uint8_t* end = begin + read;
while (true) {
auto start_time = std::chrono::steady_clock::now();
uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
const auto len = gsl::narrow<size_t>(delimiterPos - begin);
logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
/*
* We do not want to process the rest of the buffer after the last delimiter if
* - we have reached EOF in the file (we would discard it anyway)
* - there is nothing to process (the last character in the buffer is a delimiter)
*/
if (delimiterPos == end && (input.eof() || len == 0)) {
break;
}
/* Create claim and stream if needed and append data */
if (claim == nullptr) {
start_time = std::chrono::steady_clock::now();
claim = content_session_->create();
}
if (stream == nullptr) {
stream = content_session_->write(claim);
}
if (stream == nullptr) {
logger_->log_error("Stream is null");
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for import");
}
if (stream->write(begin, len) != len) {
logger_->log_error("Error while writing");
stream->close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
}
/* Create a FlowFile if we reached a delimiter */
if (delimiterPos == end) {
break;
}
flowFile = create();
flowFile->setSize(stream->size());
flowFile->setOffset(0);
flowFile->setResourceClaim(claim);
logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
<< ", FlowFile UUID " << flowFile->getUUIDStr();
stream->close();
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(flowFile, details, duration);
flows.push_back(flowFile);
/* Reset these to start processing the next FlowFile with a clean slate */
flowFile.reset();
stream.reset();
claim.reset();
/* Skip delimiter */
begin = delimiterPos + 1;
}
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during ProcessSession::import, type: %s, what: %s", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during ProcessSession::import, type: %s", getCurrentExceptionTypeName());
throw;
}
}