in libminifi/src/core/ProcessSession.cpp [592:693]
void ProcessSessionImpl::import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) {
std::shared_ptr<ResourceClaim> claim;
std::shared_ptr<io::BaseStream> stream;
std::shared_ptr<core::FlowFile> flowFile;
std::vector<uint8_t> buffer(getpagesize());
try {
std::ifstream input{source, std::ios::in | std::ios::binary};
logger_->log_debug("Opening {}", source);
if (!input.is_open() || !input.good()) {
throw Exception(FILE_OPERATION_EXCEPTION, utils::string::join_pack("File Import Error: failed to open file \'", source, "\'"));
}
if (offset != 0U) {
input.seekg(gsl::narrow<std::streamoff>(offset), std::ifstream::beg);
if (!input.good()) {
logger_->log_error("Seeking to {} failed for file {} (does file/filesystem support seeking?)", offset, source);
throw Exception(FILE_OPERATION_EXCEPTION, utils::string::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
}
}
while (input.good()) {
input.read(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(buffer.size()));
std::streamsize read = input.gcount();
if (read < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
}
if (read == 0) {
logger_->log_trace("Finished reading input {}", source);
break;
} else {
logger_->log_trace("Read input of {}", read);
}
uint8_t* begin = buffer.data();
uint8_t* end = begin + read;
while (true) {
auto start_time = std::chrono::steady_clock::now();
uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
const auto len = gsl::narrow<size_t>(delimiterPos - begin);
logger_->log_trace("Read input of {} length is {} is at end? {}", read, len, delimiterPos == end);
/*
* We do not want to process the rest of the buffer after the last delimiter if
* - we have reached EOF in the file (we would discard it anyway)
* - there is nothing to process (the last character in the buffer is a delimiter)
*/
if (delimiterPos == end && (input.eof() || len == 0)) {
break;
}
/* Create claim and stream if needed and append data */
if (claim == nullptr) {
start_time = std::chrono::steady_clock::now();
claim = content_session_->create();
}
if (stream == nullptr) {
stream = content_session_->write(claim);
}
if (stream == nullptr) {
logger_->log_error("Stream is null");
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for import");
}
if (stream->write(begin, len) != len) {
logger_->log_error("Error while writing");
stream->close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
}
/* Create a FlowFile if we reached a delimiter */
if (delimiterPos == end) {
break;
}
flowFile = create();
flowFile->setSize(stream->size());
flowFile->setOffset(0);
flowFile->setResourceClaim(claim);
logger_->log_debug("Import offset {} length {} into content {}, FlowFile UUID {}",
flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
stream->close();
if (metrics_) {
metrics_->bytesWritten() += stream->size();
}
std::string details = process_context_->getProcessor().getName() + " modify flow record content " + flowFile->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(*flowFile, details, duration);
flows.push_back(flowFile);
/* Reset these to start processing the next FlowFile with a clean slate */
flowFile.reset();
stream.reset();
claim.reset();
/* Skip delimiter */
begin = delimiterPos + 1;
}
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during ProcessSession::import, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during ProcessSession::import, type: {}", getCurrentExceptionTypeName());
throw;
}
}