extensions/standard-processors/processors/SplitText.cpp (380 lines of code) (raw):
/**
* @file SplitText.cpp
* SplitText class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include "SplitText.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "core/FlowFile.h"
#include "utils/ConfigurationUtils.h"
#include "utils/gsl.h"
#include "utils/ProcessorConfigUtils.h"
#include "io/StreamPipe.h"
namespace org::apache::nifi::minifi::processors {
namespace detail {
LineReader::LineReader(const std::shared_ptr<io::InputStream>& stream, const size_t buffer_size)
: stream_(stream),
buffer_size_(buffer_size) {
if (!stream_ || stream_->size() == 0) {
state_ = StreamReadState::EndOfStream;
}
}
uint8_t LineReader::getEndLineSize(size_t newline_position) {
gsl_Expects(buffer_.size() > newline_position);
if (buffer_[newline_position] != '\n') {
return 0;
}
if (newline_position == 0 || buffer_[newline_position - 1] != '\r') {
return 1;
}
return 2;
}
void LineReader::setLastLineInfoAttributes(uint8_t endline_size, const std::optional<std::string>& starts_with) {
const uint64_t size_from_beginning_of_stream = (current_buffer_count_ - 1) * buffer_size_ + buffer_offset_;
if (last_line_info_) {
LineInfo previous_line_info = *last_line_info_;
last_line_info_->offset = previous_line_info.offset + previous_line_info.size;
last_line_info_->size = size_from_beginning_of_stream - previous_line_info.offset - previous_line_info.size;
last_line_info_->endline_size = endline_size;
last_line_info_->matches_starts_with = true;
} else {
last_line_info_ = LineInfo{.offset = 0, .size = read_size_ - last_read_size_ + buffer_offset_, .endline_size = endline_size, .matches_starts_with = true};
}
if (starts_with) {
const auto last_line_info_begin = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_line_info_->offset);
const auto last_line_info_end = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_line_info_->offset + starts_with->size());
last_line_info_->matches_starts_with = (last_line_info_->size >= starts_with->size() && std::equal(starts_with->begin(), starts_with->end(), last_line_info_begin, last_line_info_end));
}
}
bool LineReader::readNextBuffer() {
buffer_offset_ = 0;
last_read_size_ = (std::min)(gsl::narrow<size_t>(stream_->size() - read_size_), buffer_size_);
const auto read_ret = stream_->read(as_writable_bytes(std::span(buffer_).subspan(0, last_read_size_)));
if (io::isError(read_ret)) {
state_ = StreamReadState::StreamReadError;
return false;
}
read_size_ += read_ret;
++current_buffer_count_;
return true;
}
std::optional<LineReader::LineInfo> LineReader::finalizeLineInfo(uint8_t endline_size, const std::optional<std::string>& starts_with) {
setLastLineInfoAttributes(endline_size, starts_with);
if (last_line_info_->size == 0) {
return std::nullopt;
}
return last_line_info_;
}
std::optional<LineReader::LineInfo> LineReader::readNextLine(const std::optional<std::string>& starts_with) {
if (state_ != StreamReadState::Ok) {
return std::nullopt;
}
const auto isLastReadProcessed = [this]() { return last_read_size_ <= buffer_offset_; };
while (read_size_ < stream_->size() || !isLastReadProcessed()) {
if (isLastReadProcessed() && !readNextBuffer()) {
return std::nullopt;
}
const auto begin = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(buffer_offset_);
const auto end = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_read_size_);
auto endline_pos = std::find_if(begin, end, [](const auto& buffer_element) { return buffer_element == '\n'; });
if (endline_pos != end) {
const auto line_length = std::distance(buffer_.begin(), endline_pos);
buffer_offset_ = line_length + 1;
return finalizeLineInfo(getEndLineSize(line_length), starts_with);
} else {
buffer_offset_ = last_read_size_;
}
}
state_ = StreamReadState::EndOfStream;
return finalizeLineInfo(0, starts_with);
}
} // namespace detail
namespace {
class SplitTextFragmentGenerator {
public:
struct Fragment {
uint64_t text_line_count = 0;
uint64_t processed_line_count = 0;
uint64_t fragment_size = 0;
uint64_t fragment_offset = 0;
uint8_t endline_size = 0;
};
SplitTextFragmentGenerator(const std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& split_text_config, size_t buffer_size);
std::optional<Fragment> readNextFragment();
nonstd::expected<Fragment, const char*> readHeaderFragment();
[[nodiscard]] detail::StreamReadState getState() const { return line_reader_.getState(); }
private:
static void addLineToFragment(Fragment& fragment, const detail::LineReader::LineInfo& line);
void finalizeFragmentOffset(Fragment& current_fragment);
[[nodiscard]] bool lineSizeWouldExceedMaxFragmentSize(const detail::LineReader::LineInfo& line, uint64_t fragment_size) const;
nonstd::expected<Fragment, const char*> createHeaderFragmentUsingLineCount();
nonstd::expected<Fragment, const char*> createHeaderFragmentUsingHeaderMarkerCharacters();
detail::LineReader line_reader_;
// In case the read line would exceed the maximum fragment size, we need to buffer it for the next fragment
std::optional<detail::LineReader::LineInfo> buffered_line_info_;
uint64_t flow_file_offset_ = 0;
const SplitTextConfiguration& split_text_config_;
uint64_t header_fragment_size_ = 0;
};
class ReadCallback {
public:
ReadCallback(std::shared_ptr<core::FlowFile> flow_file, const SplitTextConfiguration& split_text_config,
core::ProcessSession& session, size_t buffer_size, std::shared_ptr<core::logging::Logger> logger);
int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
std::optional<const char*> error;
std::vector<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> results;
private:
void setAttributesOfDoneSegment(core::FlowFile& current_flow_file, uint64_t line_count);
void createHeaderOnlyFragmentFlow(const SplitTextFragmentGenerator::Fragment& header_fragment);
void mergeHeaderAndFragmentFlows(const std::shared_ptr<core::FlowFile>& header_flow, const SplitTextFragmentGenerator::Fragment& fragment, size_t fragment_trim_size);
void createFragmentFlowWithoutHeader(const SplitTextFragmentGenerator::Fragment& fragment, size_t fragment_trim_size);
std::shared_ptr<io::InputStream> stream_;
std::shared_ptr<core::FlowFile> flow_file_;
const SplitTextConfiguration& split_text_config_;
core::ProcessSession& session_;
size_t emitted_fragment_index_ = 1;
const std::string fragment_identifier_ = utils::IdGenerator::getIdGenerator()->generate().to_string();
size_t buffer_size_{};
std::shared_ptr<core::logging::Logger> logger_;
};
SplitTextFragmentGenerator::SplitTextFragmentGenerator(const std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& split_text_config, const size_t buffer_size)
: line_reader_(stream, buffer_size),
split_text_config_(split_text_config) {
}
void SplitTextFragmentGenerator::finalizeFragmentOffset(Fragment& current_fragment) {
current_fragment.fragment_offset = flow_file_offset_;
flow_file_offset_ += current_fragment.fragment_size;
}
void SplitTextFragmentGenerator::addLineToFragment(Fragment& current_fragment, const detail::LineReader::LineInfo& line) {
if (line.endline_size == line.size) { // if line consists only of endline characters, we need to append the fragment trim size
current_fragment.endline_size += line.endline_size;
} else {
current_fragment.endline_size = line.endline_size;
}
current_fragment.text_line_count += line.endline_size == line.size ? 0 : 1;
current_fragment.fragment_size += line.size;
}
bool SplitTextFragmentGenerator::lineSizeWouldExceedMaxFragmentSize(const detail::LineReader::LineInfo& line, uint64_t fragment_size) const {
return split_text_config_.maximum_fragment_size && fragment_size + line.size + header_fragment_size_ > split_text_config_.maximum_fragment_size.value();
}
nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> SplitTextFragmentGenerator::createHeaderFragmentUsingLineCount() {
Fragment header_fragment;
for (uint64_t i = 0; i < split_text_config_.header_line_count; ++i) {
auto line = line_reader_.readNextLine();
if (!line) {
if (getState() == detail::StreamReadState::EndOfStream) {
return nonstd::make_unexpected("The flow file's line count is less than the specified header line count!");
} else {
return nonstd::make_unexpected("Error while reading flow file stream!");
}
}
if (lineSizeWouldExceedMaxFragmentSize(*line, header_fragment.fragment_size)) {
return nonstd::make_unexpected("Header line would exceed the maximum fragment size!");
}
addLineToFragment(header_fragment, *line);
}
flow_file_offset_ += header_fragment.fragment_size;
header_fragment_size_ = header_fragment.fragment_size;
return header_fragment;
}
nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> SplitTextFragmentGenerator::createHeaderFragmentUsingHeaderMarkerCharacters() {
Fragment header_fragment;
while (auto line = line_reader_.readNextLine(split_text_config_.header_line_marker_characters)) {
if (line->size < split_text_config_.header_line_marker_characters->size() || !line->matches_starts_with) {
buffered_line_info_ = line;
break;
}
if (lineSizeWouldExceedMaxFragmentSize(*line, header_fragment.fragment_size)) {
return nonstd::make_unexpected("Header line would exceed the maximum fragment size!");
}
addLineToFragment(header_fragment, *line);
}
flow_file_offset_ += header_fragment.fragment_size;
header_fragment_size_ = header_fragment.fragment_size;
return header_fragment;
}
nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> SplitTextFragmentGenerator::readHeaderFragment() {
gsl_Expects(flow_file_offset_ == 0 && (split_text_config_.header_line_count > 0 || split_text_config_.header_line_marker_characters));
if (split_text_config_.header_line_count > 0) {
return createHeaderFragmentUsingLineCount();
}
return createHeaderFragmentUsingHeaderMarkerCharacters();
}
std::optional<SplitTextFragmentGenerator::Fragment> SplitTextFragmentGenerator::readNextFragment() {
Fragment current_fragment;
while (auto line = buffered_line_info_ ? buffered_line_info_ : line_reader_.readNextLine()) {
buffered_line_info_.reset();
if (lineSizeWouldExceedMaxFragmentSize(*line, current_fragment.fragment_size)) {
if (current_fragment.processed_line_count == 0) { // first fragment line would be bigger than maximum fragment size (we don't have any other line in the fragment yet)
addLineToFragment(current_fragment, *line);
} else {
buffered_line_info_ = line;
}
finalizeFragmentOffset(current_fragment);
return current_fragment;
}
++current_fragment.processed_line_count;
addLineToFragment(current_fragment, *line);
if (split_text_config_.line_split_count == current_fragment.processed_line_count) {
finalizeFragmentOffset(current_fragment);
return current_fragment;
}
}
if (current_fragment.fragment_size > 0) {
finalizeFragmentOffset(current_fragment);
return current_fragment;
}
return std::nullopt;
}
ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flow_file, const SplitTextConfiguration& split_text_config,
core::ProcessSession& session, const size_t buffer_size, std::shared_ptr<core::logging::Logger> logger)
: flow_file_(std::move(flow_file)),
split_text_config_(split_text_config),
session_(session),
buffer_size_(buffer_size),
logger_(std::move(logger)) {
}
void ReadCallback::setAttributesOfDoneSegment(core::FlowFile& current_flow_file, uint64_t line_count) {
const std::string original_filename_or_uuid = flow_file_->getAttribute(core::SpecialFlowAttribute::FILENAME).value_or(flow_file_->getUUIDStr());
current_flow_file.setAttribute(core::SpecialFlowAttribute::FILENAME, original_filename_or_uuid + ".fragment." + fragment_identifier_ + "." + std::to_string(emitted_fragment_index_));
current_flow_file.setAttribute(SplitText::TextLineCountOutputAttribute.name, std::to_string(line_count));
current_flow_file.setAttribute(SplitText::FragmentSizeOutputAttribute.name, std::to_string(current_flow_file.getSize()));
current_flow_file.setAttribute(SplitText::FragmentIdentifierOutputAttribute.name, fragment_identifier_);
current_flow_file.setAttribute(SplitText::FragmentIndexOutputAttribute.name, std::to_string(emitted_fragment_index_));
current_flow_file.setAttribute(SplitText::SegmentOriginalFilenameOutputAttribute.name, flow_file_->getAttribute(core::SpecialFlowAttribute::FILENAME).value_or(""));
++emitted_fragment_index_;
}
void ReadCallback::createHeaderOnlyFragmentFlow(const SplitTextFragmentGenerator::Fragment& header_fragment) {
gsl_Expects(split_text_config_.remove_trailing_new_lines); // This is only possible if the split fragment has no content and the endlines are trimmed
auto header_only_flow = session_.clone(*flow_file_, gsl::narrow<int64_t>(header_fragment.fragment_offset), gsl::narrow<int64_t>(header_fragment.fragment_size - header_fragment.endline_size));
if (!header_only_flow) {
logger_->log_error("Failed to clone header only fragment flow!");
return;
}
logger_->log_debug("Creating a header only fragment with fragment index: {} fragment size: {}", emitted_fragment_index_, header_only_flow->getSize());
setAttributesOfDoneSegment(*header_only_flow, 0);
results.push_back(header_only_flow);
}
void ReadCallback::mergeHeaderAndFragmentFlows(const std::shared_ptr<core::FlowFile>& header_flow, const SplitTextFragmentGenerator::Fragment& fragment, size_t fragment_trim_size) {
auto fragment_flow = session_.clone(*flow_file_, gsl::narrow<int64_t>(fragment.fragment_offset), gsl::narrow<int64_t>(fragment.fragment_size - fragment_trim_size));
if (!fragment_flow) {
logger_->log_error("Failed to clone fragment flow!");
return;
}
auto merged_flow = session_.clone(*header_flow); // clone header to copy attributes
if (!merged_flow) {
logger_->log_error("Failed to clone merged fragment flow!");
return;
}
session_.write(merged_flow, [this, &fragment_flow, &header_flow](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
auto header_write_result = session_.read(header_flow, [&output_stream](const std::shared_ptr<io::InputStream>& header_input_stream) -> int64_t {
return internal::pipe(*header_input_stream, *output_stream);
});
if (header_write_result < 0) {
logger_->log_error("Failed to write header to fragment!");
return header_write_result;
}
return session_.read(fragment_flow, [&output_stream](const std::shared_ptr<io::InputStream>& fragment_input_stream) -> int64_t {
return internal::pipe(*fragment_input_stream, *output_stream);
});
});
logger_->log_debug("Creating fragment with header with fragment index: {} fragment size: {}", emitted_fragment_index_, merged_flow->getSize());
setAttributesOfDoneSegment(*merged_flow, fragment.text_line_count);
results.push_back(merged_flow);
session_.remove(fragment_flow);
}
void ReadCallback::createFragmentFlowWithoutHeader(const SplitTextFragmentGenerator::Fragment& fragment, size_t fragment_trim_size) {
auto fragment_flow = session_.clone(*flow_file_, gsl::narrow<int64_t>(fragment.fragment_offset), gsl::narrow<int64_t>(fragment.fragment_size - fragment_trim_size));
if (!fragment_flow) {
logger_->log_error("Failed to clone fragment flow without header!");
return;
}
logger_->log_debug("Creating fragment with header with fragment index: {} fragment size: {}", emitted_fragment_index_, fragment_flow->getSize());
setAttributesOfDoneSegment(*fragment_flow, fragment.text_line_count);
results.push_back(fragment_flow);
}
int64_t ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
SplitTextFragmentGenerator fragment_generator(stream, split_text_config_, buffer_size_);
nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> header_fragment;
std::shared_ptr<core::FlowFile> header_flow; // cache header flow file to avoid cloning it for each fragment
if (split_text_config_.header_line_count > 0 || split_text_config_.header_line_marker_characters) {
header_fragment = fragment_generator.readHeaderFragment();
if (!header_fragment) {
error = header_fragment.error();
return gsl::narrow<int64_t>(flow_file_->getSize());
}
header_flow = session_.clone(*flow_file_, gsl::narrow<int64_t>(header_fragment->fragment_offset), gsl::narrow<int64_t>(header_fragment->fragment_size));
if (!header_flow) {
logger_->log_error("Failed to clone header flow!");
return -1;
}
}
while (auto fragment = fragment_generator.readNextFragment()) {
size_t fragment_trim_size = split_text_config_.remove_trailing_new_lines ? fragment->endline_size : 0;
if (header_flow) {
if (fragment->fragment_size - fragment_trim_size == 0) {
createHeaderOnlyFragmentFlow(*header_fragment);
} else {
mergeHeaderAndFragmentFlows(header_flow, *fragment, fragment_trim_size);
}
} else if (fragment->fragment_size - fragment_trim_size != 0) {
createFragmentFlowWithoutHeader(*fragment, fragment_trim_size);
}
}
if (header_flow) {
session_.remove(header_flow);
}
return fragment_generator.getState() == detail::StreamReadState::EndOfStream ? gsl::narrow<int64_t>(flow_file_->getSize()) : -1;
}
} // namespace
void SplitText::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void SplitText::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*sessionFactory*/) {
buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration());
split_text_config_.line_split_count = utils::parseU64Property(context, LineSplitCount);
logger_->log_debug("SplitText line split count: {}", split_text_config_.line_split_count);
if (const auto max_fragment_data_size_value = utils::parseOptionalDataSizeProperty(context, MaximumFragmentSize)) {
split_text_config_.maximum_fragment_size = *max_fragment_data_size_value;
logger_->log_debug("SplitText maximum fragment size: {}", split_text_config_.maximum_fragment_size.value());
}
if (split_text_config_.maximum_fragment_size && split_text_config_.maximum_fragment_size.value() == 0) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Maximum Fragment Size cannot be 0!");
}
if (split_text_config_.line_split_count == 0 && !split_text_config_.maximum_fragment_size) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Line Split Count is set to 0, but Maximum Fragment Size is not set!");
}
split_text_config_.header_line_count = utils::parseU64Property(context, HeaderLineCount);
logger_->log_debug("SplitText header line count: {}", split_text_config_.header_line_count);
split_text_config_.header_line_marker_characters = context.getProperty(HeaderLineMarkerCharacters) | utils::toOptional();
if (split_text_config_.header_line_marker_characters && split_text_config_.header_line_marker_characters->size() >= buffer_size_) {
gsl_Expects(buffer_size_ >= 1);
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("SplitText header line marker characters length is larger than the maximum allowed: {} > {}",
split_text_config_.header_line_marker_characters->size(), buffer_size_ - 1));
}
if (split_text_config_.header_line_marker_characters) {
logger_->log_debug("SplitText header line marker characters were set: {}", *split_text_config_.header_line_marker_characters);
}
split_text_config_.remove_trailing_new_lines = utils::parseBoolProperty(context, RemoveTrailingNewlines);
logger_->log_debug("SplitText should remove trailing new lines: {}", split_text_config_.remove_trailing_new_lines);
}
void SplitText::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
std::shared_ptr<core::FlowFile> flow_file = session.get();
if (!flow_file) {
context.yield();
return;
}
ReadCallback callback{flow_file, split_text_config_, session, buffer_size_, logger_};
session.read(flow_file, std::ref(callback));
if (callback.error) {
logger_->log_error("Splitting flow file failed with error: {}", *callback.error);
session.transfer(flow_file, Failure);
} else {
logger_->log_info("Splitting flow file '{}' (id: {}) resulted in {} fragments", flow_file->getName(), flow_file->getUUIDStr(), callback.results.size());
for (const auto& res : callback.results) {
res->setAttribute(SplitText::FragmentCountOutputAttribute.name, std::to_string(callback.results.size()));
session.transfer(res, Splits);
}
session.transfer(flow_file, Original);
}
}
REGISTER_RESOURCE(SplitText, Processor);
} // namespace org::apache::nifi::minifi::processors