void ManipulateArchive::onTrigger()

in extensions/libarchive/ManipulateArchive.cpp [95:190]


void ManipulateArchive::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
    std::shared_ptr<core::FlowFile> flowFile = session.get();

    if (!flowFile) {
        return;
    }

    ArchiveMetadata archiveMetadata;
    utils::file::FileManager file_man;

    session.read(flowFile, FocusArchiveEntry::ReadCallback{this, &file_man, &archiveMetadata});

    auto entries_end = archiveMetadata.entryMetadata.end();

    auto target_position = archiveMetadata.find(targetEntry_);

    if (target_position == entries_end && operation_ != OPERATION_TOUCH) {
        logger_->log_warn("ManipulateArchive could not find entry {} to {}!",
                          targetEntry_, operation_);
        session.transfer(flowFile, Failure);
        return;
    } else {
        logger_->log_info("ManipulateArchive found {} for {}.",
                          targetEntry_, operation_);
    }

    if (!destination_.empty()) {
        auto dest_position = archiveMetadata.find(destination_);
        if (dest_position != entries_end) {
            logger_->log_warn("ManipulateArchive cannot perform {} to existing destination_ {}!",
                              operation_, destination_);
            session.transfer(flowFile, Failure);
            return;
        }
    }

    auto position = entries_end;

    // Small speedup for when neither before nor after are provided or needed
    if ((!before_.empty() || !after_.empty()) && operation_ != OPERATION_REMOVE) {
        std::string positionEntry = after_.empty() ? before_ : after_;
        position = archiveMetadata.find(positionEntry);

        if (position == entries_end)
            logger_->log_warn("ManipulateArchive could not find entry {} to "
                              "perform {} {}; appending to end of archive...",
                              positionEntry, operation_,
                              after_.empty() ? "before" : "after");

        else
            logger_->log_info("ManipulateArchive found entry {} to {} {}.",
                              positionEntry, operation_,
                              after_.empty() ? "before" : "after");

        if (!after_.empty() && position != entries_end)
            position++;
    }

    if (operation_ == OPERATION_REMOVE) {
        std::filesystem::remove((*target_position).tmpFileName);
        target_position = archiveMetadata.eraseEntry(target_position);
    } else if (operation_ == OPERATION_COPY) {
        ArchiveEntryMetadata copy = *target_position;

        // Copy tmp file
        const auto origTmpFileName = copy.tmpFileName;
        const auto newTmpFileName = file_man.unique_file(false);
        copy.tmpFileName = newTmpFileName;
        std::ifstream src(origTmpFileName, std::ios::binary);
        std::ofstream dst(newTmpFileName, std::ios::binary);
        dst << src.rdbuf();
        copy.entryName = destination_;

        archiveMetadata.entryMetadata.insert(position, copy);
    } else if (operation_ == OPERATION_MOVE) {
        ArchiveEntryMetadata moveEntry = *target_position;
        target_position = archiveMetadata.eraseEntry(target_position);
        moveEntry.entryName = destination_;
        archiveMetadata.entryMetadata.insert(position, moveEntry);
    } else if (operation_ == OPERATION_TOUCH) {
        ArchiveEntryMetadata touchEntry;
        touchEntry.entryName = destination_;
        touchEntry.entryType = AE_IFREG;
        touchEntry.entrySize = 0;
        touchEntry.entryMTime = time(nullptr);
        touchEntry.entryMTimeNsec = 0;
        touchEntry.entryUID = 0;
        touchEntry.entryGID = 0;
        touchEntry.entryPerm = 0777;

        archiveMetadata.entryMetadata.insert(position, touchEntry);
    }

    session.write(flowFile, UnfocusArchiveEntry::WriteCallback{&archiveMetadata});
    session.transfer(flowFile, Success);
}