extensions/standard-processors/processors/SplitText.cpp (380 lines of code) (raw):

/** * @file SplitText.cpp * SplitText class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <algorithm> #include "SplitText.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" #include "core/FlowFile.h" #include "utils/ConfigurationUtils.h" #include "utils/gsl.h" #include "utils/ProcessorConfigUtils.h" #include "io/StreamPipe.h" namespace org::apache::nifi::minifi::processors { namespace detail { LineReader::LineReader(const std::shared_ptr<io::InputStream>& stream, const size_t buffer_size) : stream_(stream), buffer_size_(buffer_size) { if (!stream_ || stream_->size() == 0) { state_ = StreamReadState::EndOfStream; } } uint8_t LineReader::getEndLineSize(size_t newline_position) { gsl_Expects(buffer_.size() > newline_position); if (buffer_[newline_position] != '\n') { return 0; } if (newline_position == 0 || buffer_[newline_position - 1] != '\r') { return 1; } return 2; } void LineReader::setLastLineInfoAttributes(uint8_t endline_size, const std::optional<std::string>& starts_with) { const uint64_t size_from_beginning_of_stream = (current_buffer_count_ - 1) * buffer_size_ + buffer_offset_; if (last_line_info_) { LineInfo previous_line_info = *last_line_info_; last_line_info_->offset = previous_line_info.offset + previous_line_info.size; last_line_info_->size = size_from_beginning_of_stream - previous_line_info.offset - previous_line_info.size; last_line_info_->endline_size = endline_size; last_line_info_->matches_starts_with = true; } else { last_line_info_ = LineInfo{.offset = 0, .size = read_size_ - last_read_size_ + buffer_offset_, .endline_size = endline_size, .matches_starts_with = true}; } if (starts_with) { const auto last_line_info_begin = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_line_info_->offset); const auto last_line_info_end = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_line_info_->offset + starts_with->size()); last_line_info_->matches_starts_with = (last_line_info_->size >= starts_with->size() && std::equal(starts_with->begin(), starts_with->end(), last_line_info_begin, last_line_info_end)); } } bool LineReader::readNextBuffer() { buffer_offset_ = 0; last_read_size_ = (std::min)(gsl::narrow<size_t>(stream_->size() - read_size_), buffer_size_); const auto read_ret = stream_->read(as_writable_bytes(std::span(buffer_).subspan(0, last_read_size_))); if (io::isError(read_ret)) { state_ = StreamReadState::StreamReadError; return false; } read_size_ += read_ret; ++current_buffer_count_; return true; } std::optional<LineReader::LineInfo> LineReader::finalizeLineInfo(uint8_t endline_size, const std::optional<std::string>& starts_with) { setLastLineInfoAttributes(endline_size, starts_with); if (last_line_info_->size == 0) { return std::nullopt; } return last_line_info_; } std::optional<LineReader::LineInfo> LineReader::readNextLine(const std::optional<std::string>& starts_with) { if (state_ != StreamReadState::Ok) { return std::nullopt; } const auto isLastReadProcessed = [this]() { return last_read_size_ <= buffer_offset_; }; while (read_size_ < stream_->size() || !isLastReadProcessed()) { if (isLastReadProcessed() && !readNextBuffer()) { return std::nullopt; } const auto begin = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(buffer_offset_); const auto end = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_read_size_); auto endline_pos = std::find_if(begin, end, [](const auto& buffer_element) { return buffer_element == '\n'; }); if (endline_pos != end) { const auto line_length = std::distance(buffer_.begin(), endline_pos); buffer_offset_ = line_length + 1; return finalizeLineInfo(getEndLineSize(line_length), starts_with); } else { buffer_offset_ = last_read_size_; } } state_ = StreamReadState::EndOfStream; return finalizeLineInfo(0, starts_with); } } // namespace detail namespace { class SplitTextFragmentGenerator { public: struct Fragment { uint64_t text_line_count = 0; uint64_t processed_line_count = 0; uint64_t fragment_size = 0; uint64_t fragment_offset = 0; uint8_t endline_size = 0; }; SplitTextFragmentGenerator(const std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& split_text_config, size_t buffer_size); std::optional<Fragment> readNextFragment(); nonstd::expected<Fragment, const char*> readHeaderFragment(); [[nodiscard]] detail::StreamReadState getState() const { return line_reader_.getState(); } private: static void addLineToFragment(Fragment& fragment, const detail::LineReader::LineInfo& line); void finalizeFragmentOffset(Fragment& current_fragment); [[nodiscard]] bool lineSizeWouldExceedMaxFragmentSize(const detail::LineReader::LineInfo& line, uint64_t fragment_size) const; nonstd::expected<Fragment, const char*> createHeaderFragmentUsingLineCount(); nonstd::expected<Fragment, const char*> createHeaderFragmentUsingHeaderMarkerCharacters(); detail::LineReader line_reader_; // In case the read line would exceed the maximum fragment size, we need to buffer it for the next fragment std::optional<detail::LineReader::LineInfo> buffered_line_info_; uint64_t flow_file_offset_ = 0; const SplitTextConfiguration& split_text_config_; uint64_t header_fragment_size_ = 0; }; class ReadCallback { public: ReadCallback(std::shared_ptr<core::FlowFile> flow_file, const SplitTextConfiguration& split_text_config, core::ProcessSession& session, size_t buffer_size, std::shared_ptr<core::logging::Logger> logger); int64_t operator()(const std::shared_ptr<io::InputStream>& stream); std::optional<const char*> error; std::vector<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> results; private: void setAttributesOfDoneSegment(core::FlowFile& current_flow_file, uint64_t line_count); void createHeaderOnlyFragmentFlow(const SplitTextFragmentGenerator::Fragment& header_fragment); void mergeHeaderAndFragmentFlows(const std::shared_ptr<core::FlowFile>& header_flow, const SplitTextFragmentGenerator::Fragment& fragment, size_t fragment_trim_size); void createFragmentFlowWithoutHeader(const SplitTextFragmentGenerator::Fragment& fragment, size_t fragment_trim_size); std::shared_ptr<io::InputStream> stream_; std::shared_ptr<core::FlowFile> flow_file_; const SplitTextConfiguration& split_text_config_; core::ProcessSession& session_; size_t emitted_fragment_index_ = 1; const std::string fragment_identifier_ = utils::IdGenerator::getIdGenerator()->generate().to_string(); size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_; }; SplitTextFragmentGenerator::SplitTextFragmentGenerator(const std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& split_text_config, const size_t buffer_size) : line_reader_(stream, buffer_size), split_text_config_(split_text_config) { } void SplitTextFragmentGenerator::finalizeFragmentOffset(Fragment& current_fragment) { current_fragment.fragment_offset = flow_file_offset_; flow_file_offset_ += current_fragment.fragment_size; } void SplitTextFragmentGenerator::addLineToFragment(Fragment& current_fragment, const detail::LineReader::LineInfo& line) { if (line.endline_size == line.size) { // if line consists only of endline characters, we need to append the fragment trim size current_fragment.endline_size += line.endline_size; } else { current_fragment.endline_size = line.endline_size; } current_fragment.text_line_count += line.endline_size == line.size ? 0 : 1; current_fragment.fragment_size += line.size; } bool SplitTextFragmentGenerator::lineSizeWouldExceedMaxFragmentSize(const detail::LineReader::LineInfo& line, uint64_t fragment_size) const { return split_text_config_.maximum_fragment_size && fragment_size + line.size + header_fragment_size_ > split_text_config_.maximum_fragment_size.value(); } nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> SplitTextFragmentGenerator::createHeaderFragmentUsingLineCount() { Fragment header_fragment; for (uint64_t i = 0; i < split_text_config_.header_line_count; ++i) { auto line = line_reader_.readNextLine(); if (!line) { if (getState() == detail::StreamReadState::EndOfStream) { return nonstd::make_unexpected("The flow file's line count is less than the specified header line count!"); } else { return nonstd::make_unexpected("Error while reading flow file stream!"); } } if (lineSizeWouldExceedMaxFragmentSize(*line, header_fragment.fragment_size)) { return nonstd::make_unexpected("Header line would exceed the maximum fragment size!"); } addLineToFragment(header_fragment, *line); } flow_file_offset_ += header_fragment.fragment_size; header_fragment_size_ = header_fragment.fragment_size; return header_fragment; } nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> SplitTextFragmentGenerator::createHeaderFragmentUsingHeaderMarkerCharacters() { Fragment header_fragment; while (auto line = line_reader_.readNextLine(split_text_config_.header_line_marker_characters)) { if (line->size < split_text_config_.header_line_marker_characters->size() || !line->matches_starts_with) { buffered_line_info_ = line; break; } if (lineSizeWouldExceedMaxFragmentSize(*line, header_fragment.fragment_size)) { return nonstd::make_unexpected("Header line would exceed the maximum fragment size!"); } addLineToFragment(header_fragment, *line); } flow_file_offset_ += header_fragment.fragment_size; header_fragment_size_ = header_fragment.fragment_size; return header_fragment; } nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> SplitTextFragmentGenerator::readHeaderFragment() { gsl_Expects(flow_file_offset_ == 0 && (split_text_config_.header_line_count > 0 || split_text_config_.header_line_marker_characters)); if (split_text_config_.header_line_count > 0) { return createHeaderFragmentUsingLineCount(); } return createHeaderFragmentUsingHeaderMarkerCharacters(); } std::optional<SplitTextFragmentGenerator::Fragment> SplitTextFragmentGenerator::readNextFragment() { Fragment current_fragment; while (auto line = buffered_line_info_ ? buffered_line_info_ : line_reader_.readNextLine()) { buffered_line_info_.reset(); if (lineSizeWouldExceedMaxFragmentSize(*line, current_fragment.fragment_size)) { if (current_fragment.processed_line_count == 0) { // first fragment line would be bigger than maximum fragment size (we don't have any other line in the fragment yet) addLineToFragment(current_fragment, *line); } else { buffered_line_info_ = line; } finalizeFragmentOffset(current_fragment); return current_fragment; } ++current_fragment.processed_line_count; addLineToFragment(current_fragment, *line); if (split_text_config_.line_split_count == current_fragment.processed_line_count) { finalizeFragmentOffset(current_fragment); return current_fragment; } } if (current_fragment.fragment_size > 0) { finalizeFragmentOffset(current_fragment); return current_fragment; } return std::nullopt; } ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flow_file, const SplitTextConfiguration& split_text_config, core::ProcessSession& session, const size_t buffer_size, std::shared_ptr<core::logging::Logger> logger) : flow_file_(std::move(flow_file)), split_text_config_(split_text_config), session_(session), buffer_size_(buffer_size), logger_(std::move(logger)) { } void ReadCallback::setAttributesOfDoneSegment(core::FlowFile& current_flow_file, uint64_t line_count) { const std::string original_filename_or_uuid = flow_file_->getAttribute(core::SpecialFlowAttribute::FILENAME).value_or(flow_file_->getUUIDStr()); current_flow_file.setAttribute(core::SpecialFlowAttribute::FILENAME, original_filename_or_uuid + ".fragment." + fragment_identifier_ + "." + std::to_string(emitted_fragment_index_)); current_flow_file.setAttribute(SplitText::TextLineCountOutputAttribute.name, std::to_string(line_count)); current_flow_file.setAttribute(SplitText::FragmentSizeOutputAttribute.name, std::to_string(current_flow_file.getSize())); current_flow_file.setAttribute(SplitText::FragmentIdentifierOutputAttribute.name, fragment_identifier_); current_flow_file.setAttribute(SplitText::FragmentIndexOutputAttribute.name, std::to_string(emitted_fragment_index_)); current_flow_file.setAttribute(SplitText::SegmentOriginalFilenameOutputAttribute.name, flow_file_->getAttribute(core::SpecialFlowAttribute::FILENAME).value_or("")); ++emitted_fragment_index_; } void ReadCallback::createHeaderOnlyFragmentFlow(const SplitTextFragmentGenerator::Fragment& header_fragment) { gsl_Expects(split_text_config_.remove_trailing_new_lines); // This is only possible if the split fragment has no content and the endlines are trimmed auto header_only_flow = session_.clone(*flow_file_, gsl::narrow<int64_t>(header_fragment.fragment_offset), gsl::narrow<int64_t>(header_fragment.fragment_size - header_fragment.endline_size)); if (!header_only_flow) { logger_->log_error("Failed to clone header only fragment flow!"); return; } logger_->log_debug("Creating a header only fragment with fragment index: {} fragment size: {}", emitted_fragment_index_, header_only_flow->getSize()); setAttributesOfDoneSegment(*header_only_flow, 0); results.push_back(header_only_flow); } void ReadCallback::mergeHeaderAndFragmentFlows(const std::shared_ptr<core::FlowFile>& header_flow, const SplitTextFragmentGenerator::Fragment& fragment, size_t fragment_trim_size) { auto fragment_flow = session_.clone(*flow_file_, gsl::narrow<int64_t>(fragment.fragment_offset), gsl::narrow<int64_t>(fragment.fragment_size - fragment_trim_size)); if (!fragment_flow) { logger_->log_error("Failed to clone fragment flow!"); return; } auto merged_flow = session_.clone(*header_flow); // clone header to copy attributes if (!merged_flow) { logger_->log_error("Failed to clone merged fragment flow!"); return; } session_.write(merged_flow, [this, &fragment_flow, &header_flow](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t { auto header_write_result = session_.read(header_flow, [&output_stream](const std::shared_ptr<io::InputStream>& header_input_stream) -> int64_t { return internal::pipe(*header_input_stream, *output_stream); }); if (header_write_result < 0) { logger_->log_error("Failed to write header to fragment!"); return header_write_result; } return session_.read(fragment_flow, [&output_stream](const std::shared_ptr<io::InputStream>& fragment_input_stream) -> int64_t { return internal::pipe(*fragment_input_stream, *output_stream); }); }); logger_->log_debug("Creating fragment with header with fragment index: {} fragment size: {}", emitted_fragment_index_, merged_flow->getSize()); setAttributesOfDoneSegment(*merged_flow, fragment.text_line_count); results.push_back(merged_flow); session_.remove(fragment_flow); } void ReadCallback::createFragmentFlowWithoutHeader(const SplitTextFragmentGenerator::Fragment& fragment, size_t fragment_trim_size) { auto fragment_flow = session_.clone(*flow_file_, gsl::narrow<int64_t>(fragment.fragment_offset), gsl::narrow<int64_t>(fragment.fragment_size - fragment_trim_size)); if (!fragment_flow) { logger_->log_error("Failed to clone fragment flow without header!"); return; } logger_->log_debug("Creating fragment with header with fragment index: {} fragment size: {}", emitted_fragment_index_, fragment_flow->getSize()); setAttributesOfDoneSegment(*fragment_flow, fragment.text_line_count); results.push_back(fragment_flow); } int64_t ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) { SplitTextFragmentGenerator fragment_generator(stream, split_text_config_, buffer_size_); nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> header_fragment; std::shared_ptr<core::FlowFile> header_flow; // cache header flow file to avoid cloning it for each fragment if (split_text_config_.header_line_count > 0 || split_text_config_.header_line_marker_characters) { header_fragment = fragment_generator.readHeaderFragment(); if (!header_fragment) { error = header_fragment.error(); return gsl::narrow<int64_t>(flow_file_->getSize()); } header_flow = session_.clone(*flow_file_, gsl::narrow<int64_t>(header_fragment->fragment_offset), gsl::narrow<int64_t>(header_fragment->fragment_size)); if (!header_flow) { logger_->log_error("Failed to clone header flow!"); return -1; } } while (auto fragment = fragment_generator.readNextFragment()) { size_t fragment_trim_size = split_text_config_.remove_trailing_new_lines ? fragment->endline_size : 0; if (header_flow) { if (fragment->fragment_size - fragment_trim_size == 0) { createHeaderOnlyFragmentFlow(*header_fragment); } else { mergeHeaderAndFragmentFlows(header_flow, *fragment, fragment_trim_size); } } else if (fragment->fragment_size - fragment_trim_size != 0) { createFragmentFlowWithoutHeader(*fragment, fragment_trim_size); } } if (header_flow) { session_.remove(header_flow); } return fragment_generator.getState() == detail::StreamReadState::EndOfStream ? gsl::narrow<int64_t>(flow_file_->getSize()) : -1; } } // namespace void SplitText::initialize() { setSupportedProperties(Properties); setSupportedRelationships(Relationships); } void SplitText::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*sessionFactory*/) { buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); split_text_config_.line_split_count = utils::parseU64Property(context, LineSplitCount); logger_->log_debug("SplitText line split count: {}", split_text_config_.line_split_count); if (const auto max_fragment_data_size_value = utils::parseOptionalDataSizeProperty(context, MaximumFragmentSize)) { split_text_config_.maximum_fragment_size = *max_fragment_data_size_value; logger_->log_debug("SplitText maximum fragment size: {}", split_text_config_.maximum_fragment_size.value()); } if (split_text_config_.maximum_fragment_size && split_text_config_.maximum_fragment_size.value() == 0) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Maximum Fragment Size cannot be 0!"); } if (split_text_config_.line_split_count == 0 && !split_text_config_.maximum_fragment_size) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Line Split Count is set to 0, but Maximum Fragment Size is not set!"); } split_text_config_.header_line_count = utils::parseU64Property(context, HeaderLineCount); logger_->log_debug("SplitText header line count: {}", split_text_config_.header_line_count); split_text_config_.header_line_marker_characters = context.getProperty(HeaderLineMarkerCharacters) | utils::toOptional(); if (split_text_config_.header_line_marker_characters && split_text_config_.header_line_marker_characters->size() >= buffer_size_) { gsl_Expects(buffer_size_ >= 1); throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("SplitText header line marker characters length is larger than the maximum allowed: {} > {}", split_text_config_.header_line_marker_characters->size(), buffer_size_ - 1)); } if (split_text_config_.header_line_marker_characters) { logger_->log_debug("SplitText header line marker characters were set: {}", *split_text_config_.header_line_marker_characters); } split_text_config_.remove_trailing_new_lines = utils::parseBoolProperty(context, RemoveTrailingNewlines); logger_->log_debug("SplitText should remove trailing new lines: {}", split_text_config_.remove_trailing_new_lines); } void SplitText::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { std::shared_ptr<core::FlowFile> flow_file = session.get(); if (!flow_file) { context.yield(); return; } ReadCallback callback{flow_file, split_text_config_, session, buffer_size_, logger_}; session.read(flow_file, std::ref(callback)); if (callback.error) { logger_->log_error("Splitting flow file failed with error: {}", *callback.error); session.transfer(flow_file, Failure); } else { logger_->log_info("Splitting flow file '{}' (id: {}) resulted in {} fragments", flow_file->getName(), flow_file->getUUIDStr(), callback.results.size()); for (const auto& res : callback.results) { res->setAttribute(SplitText::FragmentCountOutputAttribute.name, std::to_string(callback.results.size())); session.transfer(res, Splits); } session.transfer(flow_file, Original); } } REGISTER_RESOURCE(SplitText, Processor); } // namespace org::apache::nifi::minifi::processors