utils/include/http/HTTPCallback.h (125 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <deque> #include <mutex> #include <vector> #include <string> #include <memory> #include <utility> #include <condition_variable> #include "core/logging/LoggerFactory.h" #include "utils/ByteArrayCallback.h" #include "http/BaseHTTPClient.h" namespace org::apache::nifi::minifi::http { /** * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently. * This is a rewrite based on the contract inferred from this class's usage in http::HTTPClient * through HTTPStream and the non-buggy part of the behaviour of the original class. * Based on these: * - this class provides a mechanism through which chunks of data can be inserted on a producer thread, while a * consumer thread simultaneously reads this stream of data in CURLOPT_READFUNCTION to supply a POST or PUT request * body with data utilizing HTTP chunked transfer encoding * - once a chunk of data is completely processed, we can discard it (i.e. the consumer will not seek backwards) * - if we expect that more data will be available, but there is none available at the current time, we should block * the consumer thread until either new data becomes available, or we are closed, signaling that there will be no * new data * - we signal that we have provided all data by returning a nullptr from getBuffer. After this no further calls asking * for data should be made on us * - we keep a current buffer and change this buffer once the consumer requests an offset which can no longer be served * by the current buffer * - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify * the current buffer */ class HttpStreamingCallback final : public HTTPUploadByteArrayInputCallback { public: void close() override { logger_->log_trace("close() called"); std::unique_lock<std::mutex> lock(mutex_); is_alive_ = false; cv.notify_all(); } void seek(size_t pos) override { logger_->log_trace("seek(pos: {}) called", pos); std::unique_lock<std::mutex> lock(mutex_); seekInner(lock, pos); } int64_t operator()(const std::shared_ptr<io::InputStream>& stream) override { std::vector<std::byte> vec; if (stream->size() > 0) { vec.resize(stream->size()); stream->read(vec); } return processInner(std::move(vec)); } int64_t process(const uint8_t* data, size_t size) { std::vector<std::byte> vec; vec.resize(size); memcpy(vec.data(), data, size); return processInner(std::move(vec)); } void write(std::string content) override { (void) processInner(utils::span_to<std::vector>(as_bytes(std::span(content)))); } std::byte* getBuffer(size_t pos) override { logger_->log_trace("getBuffer(pos: {}) called", pos); std::unique_lock<std::mutex> lock(mutex_); seekInner(lock, pos); if (ptr_ == nullptr) { return nullptr; } size_t relative_pos = pos - current_buffer_start_; current_pos_ = pos; return ptr_ + relative_pos; } size_t getRemaining(size_t pos) override { logger_->log_trace("getRemaining(pos: {}) called", pos); std::unique_lock<std::mutex> lock(mutex_); seekInner(lock, pos); return total_bytes_loaded_ - pos; } size_t getBufferSize() override { logger_->log_trace("getBufferSize() called"); std::unique_lock<std::mutex> lock(mutex_); // This is needed to make sure that the first buffer is loaded seekInner(lock, current_pos_); return total_bytes_loaded_; } private: /** * Loads the next available buffer * @param lock unique_lock which *must* own the lock */ inline void loadNextBuffer(std::unique_lock<std::mutex>& lock) { cv.wait(lock, [&] { return !byte_arrays_.empty() || !is_alive_; }); if (byte_arrays_.empty()) { logger_->log_trace("loadNextBuffer() ran out of buffers"); ptr_ = nullptr; } else { current_vec_ = std::move(byte_arrays_.front()); byte_arrays_.pop_front(); ptr_ = current_vec_.data(); current_buffer_start_ = total_bytes_loaded_; current_pos_ = current_buffer_start_; total_bytes_loaded_ += current_vec_.size(); logger_->log_trace("loadNextBuffer() loaded new buffer, ptr_: {}, size: {}, current_buffer_start_: {}, current_pos_: {}, total_bytes_loaded_: {}", static_cast<void*>(ptr_), current_vec_.size(), current_buffer_start_, current_pos_, total_bytes_loaded_); } } /** * Common implementation for placing a buffer into the queue * @param vec the buffer to be inserted * @return the number of bytes processed (the size of vec) */ int64_t processInner(std::vector<std::byte>&& vec) { size_t size = vec.size(); logger_->log_trace("processInner() called, vec.data(): {}, vec.size(): {}", static_cast<void*>(vec.data()), size); if (size == 0U) { return 0U; } std::unique_lock<std::mutex> lock(mutex_); byte_arrays_.emplace_back(std::move(vec)); cv.notify_all(); return size; } /** * Seeks to the specified position * @param lock unique_lock which *must* own the lock * @param pos position to seek to */ void seekInner(std::unique_lock<std::mutex>& lock, size_t pos) { logger_->log_trace("seekInner() called, current_pos_: {}, pos: {}", current_pos_, pos); if (pos < current_pos_) { const std::string errstr = "Seeking backwards is not supported, tried to seek from " + std::to_string(current_pos_) + " to " + std::to_string(pos); logger_->log_error("{}", errstr); throw std::logic_error(errstr); } while ((pos - current_buffer_start_) >= current_vec_.size()) { loadNextBuffer(lock); if (ptr_ == nullptr) { break; } } } std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<HttpStreamingCallback>::getLogger(); std::mutex mutex_; std::condition_variable cv; bool is_alive_{true}; size_t total_bytes_loaded_{0U}; size_t current_buffer_start_{0U}; size_t current_pos_{0U}; std::deque<std::vector<std::byte>> byte_arrays_; std::vector<std::byte> current_vec_; std::byte* ptr_{nullptr}; }; } // namespace org::apache::nifi::minifi::http