in extensions/libarchive/ManipulateArchive.cpp [95:190]
void ManipulateArchive::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
std::shared_ptr<core::FlowFile> flowFile = session.get();
if (!flowFile) {
return;
}
ArchiveMetadata archiveMetadata;
utils::file::FileManager file_man;
session.read(flowFile, FocusArchiveEntry::ReadCallback{this, &file_man, &archiveMetadata});
auto entries_end = archiveMetadata.entryMetadata.end();
auto target_position = archiveMetadata.find(targetEntry_);
if (target_position == entries_end && operation_ != OPERATION_TOUCH) {
logger_->log_warn("ManipulateArchive could not find entry {} to {}!",
targetEntry_, operation_);
session.transfer(flowFile, Failure);
return;
} else {
logger_->log_info("ManipulateArchive found {} for {}.",
targetEntry_, operation_);
}
if (!destination_.empty()) {
auto dest_position = archiveMetadata.find(destination_);
if (dest_position != entries_end) {
logger_->log_warn("ManipulateArchive cannot perform {} to existing destination_ {}!",
operation_, destination_);
session.transfer(flowFile, Failure);
return;
}
}
auto position = entries_end;
// Small speedup for when neither before nor after are provided or needed
if ((!before_.empty() || !after_.empty()) && operation_ != OPERATION_REMOVE) {
std::string positionEntry = after_.empty() ? before_ : after_;
position = archiveMetadata.find(positionEntry);
if (position == entries_end)
logger_->log_warn("ManipulateArchive could not find entry {} to "
"perform {} {}; appending to end of archive...",
positionEntry, operation_,
after_.empty() ? "before" : "after");
else
logger_->log_info("ManipulateArchive found entry {} to {} {}.",
positionEntry, operation_,
after_.empty() ? "before" : "after");
if (!after_.empty() && position != entries_end)
position++;
}
if (operation_ == OPERATION_REMOVE) {
std::filesystem::remove((*target_position).tmpFileName);
target_position = archiveMetadata.eraseEntry(target_position);
} else if (operation_ == OPERATION_COPY) {
ArchiveEntryMetadata copy = *target_position;
// Copy tmp file
const auto origTmpFileName = copy.tmpFileName;
const auto newTmpFileName = file_man.unique_file(false);
copy.tmpFileName = newTmpFileName;
std::ifstream src(origTmpFileName, std::ios::binary);
std::ofstream dst(newTmpFileName, std::ios::binary);
dst << src.rdbuf();
copy.entryName = destination_;
archiveMetadata.entryMetadata.insert(position, copy);
} else if (operation_ == OPERATION_MOVE) {
ArchiveEntryMetadata moveEntry = *target_position;
target_position = archiveMetadata.eraseEntry(target_position);
moveEntry.entryName = destination_;
archiveMetadata.entryMetadata.insert(position, moveEntry);
} else if (operation_ == OPERATION_TOUCH) {
ArchiveEntryMetadata touchEntry;
touchEntry.entryName = destination_;
touchEntry.entryType = AE_IFREG;
touchEntry.entrySize = 0;
touchEntry.entryMTime = time(nullptr);
touchEntry.entryMTimeNsec = 0;
touchEntry.entryUID = 0;
touchEntry.entryGID = 0;
touchEntry.entryPerm = 0777;
archiveMetadata.entryMetadata.insert(position, touchEntry);
}
session.write(flowFile, UnfocusArchiveEntry::WriteCallback{&archiveMetadata});
session.transfer(flowFile, Success);
}