extensions/aws/s3/MultipartUploadStateStorage.cpp (105 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MultipartUploadStateStorage.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::aws::s3 {
void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
std::unordered_map<std::string, std::string> stored_state;
std::lock_guard<std::mutex> lock(state_manager_mutex_);
state_manager_->get(stored_state);
std::string state_key = bucket + "/" + key;
stored_state[state_key + ".upload_id"] = state.upload_id;
stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
state_manager_->set(stored_state);
state_manager_->commit();
state_manager_->persist();
}
std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
std::unordered_map<std::string, std::string> state_map;
{
std::lock_guard<std::mutex> lock(state_manager_mutex_);
if (!state_manager_->get(state_map)) {
logger_->log_warn("No previous multipart upload state was associated with this processor.");
return std::nullopt;
}
}
std::string state_key = bucket + "/" + key;
if (!state_map.contains(state_key + ".upload_id")) {
logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
return std::nullopt;
}
MultipartUploadState state;
state.upload_id = state_map[state_key + ".upload_id"];
int64_t stored_upload_time = 0;
core::Property::StringToInt(state_map[state_key + ".upload_time"], stored_upload_time);
state.upload_time = Aws::Utils::DateTime(stored_upload_time);
core::Property::StringToInt(state_map[state_key + ".uploaded_parts"], state.uploaded_parts);
core::Property::StringToInt(state_map[state_key + ".uploaded_size"], state.uploaded_size);
core::Property::StringToInt(state_map[state_key + ".part_size"], state.part_size);
core::Property::StringToInt(state_map[state_key + ".full_size"], state.full_size);
state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_map[state_key + ".uploaded_etags"], ";");
return state;
}
void MultipartUploadStateStorage::removeKey(const std::string& state_key, std::unordered_map<std::string, std::string>& state_map) {
state_map.erase(state_key + ".upload_id");
state_map.erase(state_key + ".upload_time");
state_map.erase(state_key + ".uploaded_parts");
state_map.erase(state_key + ".uploaded_size");
state_map.erase(state_key + ".part_size");
state_map.erase(state_key + ".full_size");
state_map.erase(state_key + ".uploaded_etags");
}
void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
std::unordered_map<std::string, std::string> state_map;
std::lock_guard<std::mutex> lock(state_manager_mutex_);
if (!state_manager_->get(state_map)) {
logger_->log_warn("No previous multipart upload state was associated with this processor.");
return;
}
std::string state_key = bucket + "/" + key;
if (!state_map.contains(state_key + ".upload_id")) {
logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
return;
}
removeKey(state_key, state_map);
state_manager_->set(state_map);
state_manager_->commit();
state_manager_->persist();
}
void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
std::unordered_map<std::string, std::string> state_map;
std::lock_guard<std::mutex> lock(state_manager_mutex_);
if (!state_manager_->get(state_map)) {
logger_->log_warn("No previous multipart upload state was associated with this processor.");
return;
}
auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
std::vector<std::string> keys_to_remove;
for (const auto& [property_key, value] : state_map) {
std::string upload_time_suffix = ".upload_time";
if (!minifi::utils::StringUtils::endsWith(property_key, upload_time_suffix)) {
continue;
}
int64_t stored_upload_time{};
if (!core::Property::StringToInt(value, stored_upload_time)) {
logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value);
continue;
}
auto upload_time = Aws::Utils::DateTime(stored_upload_time);
if (upload_time < age_off_time) {
auto state_key_and_property_name = property_key.substr(0, property_key.size() - upload_time_suffix.size());
keys_to_remove.push_back(state_key_and_property_name);
}
}
for (const auto& key : keys_to_remove) {
logger_->log_info("Removing local aged off multipart upload state with key '%s'", key);
removeKey(key, state_map);
}
state_manager_->set(state_map);
state_manager_->commit();
state_manager_->persist();
}
} // namespace org::apache::nifi::minifi::aws::s3