extensions/libarchive/FocusArchiveEntry.cpp (165 lines of code) (raw):
/**
* @file FocusArchiveEntry.cpp
* FocusArchiveEntry class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FocusArchiveEntry.h"
#include <archive.h>
#include <archive_entry.h>
#include <cstring>
#include <array>
#include <string>
#include <memory>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "Exception.h"
#include "SmartArchivePtrs.h"
#include "utils/ConfigurationUtils.h"
#include "utils/gsl.h"
namespace {
inline constexpr auto BUFFER_SIZE = org::apache::nifi::minifi::utils::configuration::DEFAULT_BUFFER_SIZE;
} // namespace
namespace org::apache::nifi::minifi::processors {
std::shared_ptr<utils::IdGenerator> FocusArchiveEntry::id_generator_ = utils::IdGenerator::getIdGenerator();
void FocusArchiveEntry::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void FocusArchiveEntry::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
auto flowFile = session.get();
if (!flowFile) {
return;
}
utils::file::FileManager file_man;
// Extract archive contents
ArchiveMetadata archiveMetadata;
archiveMetadata.focusedEntry = context.getProperty(Path).value_or("");
flowFile->getAttribute("filename", archiveMetadata.archiveName);
session.read(flowFile, ReadCallback{this, &file_man, &archiveMetadata});
// For each extracted entry, import & stash to key
std::string targetEntryStashKey;
std::string targetEntry;
for (auto &entryMetadata : archiveMetadata.entryMetadata) {
if (entryMetadata.entryType == AE_IFREG) {
logger_->log_info("FocusArchiveEntry importing {} from {}", entryMetadata.entryName, entryMetadata.tmpFileName);
session.import(entryMetadata.tmpFileName.string(), flowFile, false, 0);
utils::Identifier stashKeyUuid = id_generator_->generate();
logger_->log_debug("FocusArchiveEntry generated stash key {} for entry {}", stashKeyUuid.to_string(), entryMetadata.entryName);
entryMetadata.stashKey = stashKeyUuid.to_string();
if (entryMetadata.entryName == archiveMetadata.focusedEntry) {
targetEntryStashKey = entryMetadata.stashKey;
}
// Stash the content
session.stash(entryMetadata.stashKey, flowFile);
}
}
// Restore target archive entry
if (!targetEntryStashKey.empty()) {
session.restore(targetEntryStashKey, flowFile);
} else {
logger_->log_warn("FocusArchiveEntry failed to locate target entry: {}",
archiveMetadata.focusedEntry.c_str());
}
// Set new/updated lens stack to attribute
{
ArchiveStack archiveStack;
std::string existingLensStack;
if (flowFile->getAttribute("lens.archive.stack", existingLensStack)) {
logger_->log_info("FocusArchiveEntry loading existing lens context");
try {
archiveStack.loadJsonString(existingLensStack);
} catch (Exception &exception) {
logger_->log_debug("{}", exception.what());
context.yield();
return;
}
}
archiveStack.push(archiveMetadata);
flowFile->setAttribute("lens.archive.stack", archiveStack.toJsonString());
}
// Update filename attribute to that of focused entry
std::size_t found = archiveMetadata.focusedEntry.find_last_of("/\\");
std::string path = archiveMetadata.focusedEntry.substr(0, found);
std::string name = archiveMetadata.focusedEntry.substr(found + 1);
flowFile->setAttribute("filename", name);
flowFile->setAttribute("path", path);
flowFile->setAttribute("absolute.path", archiveMetadata.focusedEntry);
// Transfer to the relationship
session.transfer(flowFile, Success);
}
struct FocusArchiveEntryReadData {
std::shared_ptr<io::InputStream> stream;
core::Processor *processor = nullptr;
std::array<std::byte, BUFFER_SIZE> buf{};
};
// Read callback which reads from the flowfile stream
la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, const void **buf) {
auto data = static_cast<FocusArchiveEntryReadData *>(d);
*buf = data->buf.data();
size_t read = 0;
size_t last_read = 0;
do {
last_read = data->stream->read(data->buf);
read += last_read;
} while (data->processor->isRunning() && last_read > 0 && !io::isError(last_read) && read < BUFFER_SIZE);
if (!data->processor->isRunning()) {
archive_set_error(a, EINTR, "Processor shut down during read");
return -1;
}
return gsl::narrow<la_ssize_t>(read);
}
int64_t FocusArchiveEntry::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) const {
auto input_archive = processors::archive_read_unique_ptr{archive_read_new()};
struct archive_entry *entry = nullptr;
int64_t nlen = 0;
FocusArchiveEntryReadData data;
data.stream = stream;
data.processor = proc_;
archive_read_support_format_all(input_archive.get());
archive_read_support_filter_all(input_archive.get());
// Read each item in the archive
if (archive_read_open(input_archive.get(), &data, ok_cb, read_cb, ok_cb)) {
logger_->log_error("FocusArchiveEntry can't open due to archive error: {}", archive_error_string(input_archive.get()));
return nlen;
}
while (proc_->isRunning()) {
auto res = archive_read_next_header(input_archive.get(), &entry);
if (res == ARCHIVE_EOF) {
break;
}
if (res < ARCHIVE_OK) {
logger_->log_error("FocusArchiveEntry can't read header due to archive error: {}", archive_error_string(input_archive.get()));
return nlen;
}
if (res < ARCHIVE_WARN) {
logger_->log_warn("FocusArchiveEntry got archive warning while reading header: {}", archive_error_string(input_archive.get()));
return nlen;
}
auto entryName = archive_entry_pathname(entry);
_archiveMetadata->archiveFormatName.assign(archive_format_name(input_archive.get()));
_archiveMetadata->archiveFormat = archive_format(input_archive.get());
// Record entry metadata
auto entryType = archive_entry_filetype(entry);
ArchiveEntryMetadata metadata;
metadata.entryName = entryName;
metadata.entryType = entryType;
metadata.entryPerm = archive_entry_perm(entry);
metadata.entrySize = archive_entry_size(entry);
metadata.entryUID = archive_entry_uid(entry);
metadata.entryGID = archive_entry_gid(entry);
metadata.entryMTime = archive_entry_mtime(entry);
metadata.entryMTimeNsec = archive_entry_mtime_nsec(entry);
logger_->log_info("FocusArchiveEntry entry type of {} is: {}", entryName, metadata.entryType);
logger_->log_info("FocusArchiveEntry entry perm of {} is: {}", entryName, metadata.entryPerm);
// Write content to tmp file
if (entryType == AE_IFREG) {
auto tmpFileName = file_man_->unique_file(true);
metadata.tmpFileName = tmpFileName;
metadata.entryType = entryType;
logger_->log_info("FocusArchiveEntry extracting {} to: {}", entryName, tmpFileName);
auto fd = fopen(tmpFileName.string().c_str(), "w");
if (archive_entry_size(entry) > 0) {
#ifdef WIN32
nlen += archive_read_data_into_fd(input_archive.get(), _fileno(fd));
#else
nlen += archive_read_data_into_fd(input_archive.get(), fileno(fd));
#endif
}
(void)fclose(fd);
}
_archiveMetadata->entryMetadata.push_back(metadata);
}
return nlen;
}
FocusArchiveEntry::ReadCallback::ReadCallback(core::Processor *processor, utils::file::FileManager *file_man, ArchiveMetadata *archiveMetadata)
: file_man_(file_man),
proc_(processor),
_archiveMetadata(archiveMetadata) {
}
REGISTER_RESOURCE(FocusArchiveEntry, Processor);
} // namespace org::apache::nifi::minifi::processors