extensions/aws/s3/MultipartUploadStateStorage.cpp (105 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "MultipartUploadStateStorage.h" #include "utils/StringUtils.h" namespace org::apache::nifi::minifi::aws::s3 { void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) { std::unordered_map<std::string, std::string> stored_state; std::lock_guard<std::mutex> lock(state_manager_mutex_); state_manager_->get(stored_state); std::string state_key = bucket + "/" + key; stored_state[state_key + ".upload_id"] = state.upload_id; stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis()); stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts); stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size); stored_state[state_key + ".part_size"] = std::to_string(state.part_size); stored_state[state_key + ".full_size"] = std::to_string(state.full_size); stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags); state_manager_->set(stored_state); state_manager_->commit(); state_manager_->persist(); } std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const { std::unordered_map<std::string, std::string> state_map; { std::lock_guard<std::mutex> lock(state_manager_mutex_); if (!state_manager_->get(state_map)) { logger_->log_warn("No previous multipart upload state was associated with this processor."); return std::nullopt; } } std::string state_key = bucket + "/" + key; if (!state_map.contains(state_key + ".upload_id")) { logger_->log_warn("Multipart upload state was not found for key '%s'", state_key); return std::nullopt; } MultipartUploadState state; state.upload_id = state_map[state_key + ".upload_id"]; int64_t stored_upload_time = 0; core::Property::StringToInt(state_map[state_key + ".upload_time"], stored_upload_time); state.upload_time = Aws::Utils::DateTime(stored_upload_time); core::Property::StringToInt(state_map[state_key + ".uploaded_parts"], state.uploaded_parts); core::Property::StringToInt(state_map[state_key + ".uploaded_size"], state.uploaded_size); core::Property::StringToInt(state_map[state_key + ".part_size"], state.part_size); core::Property::StringToInt(state_map[state_key + ".full_size"], state.full_size); state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_map[state_key + ".uploaded_etags"], ";"); return state; } void MultipartUploadStateStorage::removeKey(const std::string& state_key, std::unordered_map<std::string, std::string>& state_map) { state_map.erase(state_key + ".upload_id"); state_map.erase(state_key + ".upload_time"); state_map.erase(state_key + ".uploaded_parts"); state_map.erase(state_key + ".uploaded_size"); state_map.erase(state_key + ".part_size"); state_map.erase(state_key + ".full_size"); state_map.erase(state_key + ".uploaded_etags"); } void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) { std::unordered_map<std::string, std::string> state_map; std::lock_guard<std::mutex> lock(state_manager_mutex_); if (!state_manager_->get(state_map)) { logger_->log_warn("No previous multipart upload state was associated with this processor."); return; } std::string state_key = bucket + "/" + key; if (!state_map.contains(state_key + ".upload_id")) { logger_->log_warn("Multipart upload state was not found for key '%s'", state_key); return; } removeKey(state_key, state_map); state_manager_->set(state_map); state_manager_->commit(); state_manager_->persist(); } void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) { std::unordered_map<std::string, std::string> state_map; std::lock_guard<std::mutex> lock(state_manager_mutex_); if (!state_manager_->get(state_map)) { logger_->log_warn("No previous multipart upload state was associated with this processor."); return; } auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold; std::vector<std::string> keys_to_remove; for (const auto& [property_key, value] : state_map) { std::string upload_time_suffix = ".upload_time"; if (!minifi::utils::StringUtils::endsWith(property_key, upload_time_suffix)) { continue; } int64_t stored_upload_time{}; if (!core::Property::StringToInt(value, stored_upload_time)) { logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value); continue; } auto upload_time = Aws::Utils::DateTime(stored_upload_time); if (upload_time < age_off_time) { auto state_key_and_property_name = property_key.substr(0, property_key.size() - upload_time_suffix.size()); keys_to_remove.push_back(state_key_and_property_name); } } for (const auto& key : keys_to_remove) { logger_->log_info("Removing local aged off multipart upload state with key '%s'", key); removeKey(key, state_map); } state_manager_->set(state_map); state_manager_->commit(); state_manager_->persist(); } } // namespace org::apache::nifi::minifi::aws::s3