extensions/aws/s3/MultipartUploadStateStorage.cpp (105 lines of code) (raw):
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "MultipartUploadStateStorage.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::aws::s3 {
void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
  std::unordered_map<std::string, std::string> stored_state;
  std::lock_guard<std::mutex> lock(state_manager_mutex_);
  state_manager_->get(stored_state);
  std::string state_key = bucket + "/" + key;
  stored_state[state_key + ".upload_id"] = state.upload_id;
  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
  state_manager_->set(stored_state);
  state_manager_->commit();
  state_manager_->persist();
}
std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
  std::unordered_map<std::string, std::string> state_map;
  {
    std::lock_guard<std::mutex> lock(state_manager_mutex_);
    if (!state_manager_->get(state_map)) {
      logger_->log_warn("No previous multipart upload state was associated with this processor.");
      return std::nullopt;
    }
  }
  std::string state_key = bucket + "/" + key;
  if (!state_map.contains(state_key + ".upload_id")) {
    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
    return std::nullopt;
  }
  MultipartUploadState state;
  state.upload_id = state_map[state_key + ".upload_id"];
  int64_t stored_upload_time = 0;
  core::Property::StringToInt(state_map[state_key + ".upload_time"], stored_upload_time);
  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
  core::Property::StringToInt(state_map[state_key + ".uploaded_parts"], state.uploaded_parts);
  core::Property::StringToInt(state_map[state_key + ".uploaded_size"], state.uploaded_size);
  core::Property::StringToInt(state_map[state_key + ".part_size"], state.part_size);
  core::Property::StringToInt(state_map[state_key + ".full_size"], state.full_size);
  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_map[state_key + ".uploaded_etags"], ";");
  return state;
}
void MultipartUploadStateStorage::removeKey(const std::string& state_key, std::unordered_map<std::string, std::string>& state_map) {
  state_map.erase(state_key + ".upload_id");
  state_map.erase(state_key + ".upload_time");
  state_map.erase(state_key + ".uploaded_parts");
  state_map.erase(state_key + ".uploaded_size");
  state_map.erase(state_key + ".part_size");
  state_map.erase(state_key + ".full_size");
  state_map.erase(state_key + ".uploaded_etags");
}
void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
  std::unordered_map<std::string, std::string> state_map;
  std::lock_guard<std::mutex> lock(state_manager_mutex_);
  if (!state_manager_->get(state_map)) {
    logger_->log_warn("No previous multipart upload state was associated with this processor.");
    return;
  }
  std::string state_key = bucket + "/" + key;
  if (!state_map.contains(state_key + ".upload_id")) {
    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
    return;
  }
  removeKey(state_key, state_map);
  state_manager_->set(state_map);
  state_manager_->commit();
  state_manager_->persist();
}
void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
  std::unordered_map<std::string, std::string> state_map;
  std::lock_guard<std::mutex> lock(state_manager_mutex_);
  if (!state_manager_->get(state_map)) {
    logger_->log_warn("No previous multipart upload state was associated with this processor.");
    return;
  }
  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
  std::vector<std::string> keys_to_remove;
  for (const auto& [property_key, value] : state_map) {
    std::string upload_time_suffix = ".upload_time";
    if (!minifi::utils::StringUtils::endsWith(property_key, upload_time_suffix)) {
      continue;
    }
    int64_t stored_upload_time{};
    if (!core::Property::StringToInt(value, stored_upload_time)) {
      logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value);
      continue;
    }
    auto upload_time = Aws::Utils::DateTime(stored_upload_time);
    if (upload_time < age_off_time) {
      auto state_key_and_property_name = property_key.substr(0, property_key.size() - upload_time_suffix.size());
      keys_to_remove.push_back(state_key_and_property_name);
    }
  }
  for (const auto& key : keys_to_remove) {
    logger_->log_info("Removing local aged off multipart upload state with key '%s'", key);
    removeKey(key, state_map);
  }
  state_manager_->set(state_map);
  state_manager_->commit();
  state_manager_->persist();
}
}  // namespace org::apache::nifi::minifi::aws::s3