flex/utils/remote/oss_storage.cc (258 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef BUILD_WITH_OSS
#include "flex/utils/remote/oss_storage.h"
#include <filesystem>
#include "flex/third_party/aliyun-oss-cpp-sdk/sdk/include/alibabacloud/oss/client/RetryStrategy.h"
#include "flex/utils/file_utils.h"
namespace gs {
void OSSConf::load_conf_from_env() {
if (accesskey_id_.empty()) {
const char* accesskey_id = std::getenv(kOSSAccessKeyId);
if (accesskey_id) {
accesskey_id_ = accesskey_id;
}
}
if (accesskey_secret_.empty()) {
const char* accesskey_secret = std::getenv(kOSSAccessKeySecret);
if (accesskey_secret) {
accesskey_secret_ = accesskey_secret;
}
}
if (endpoint_.empty()) {
const char* endpoint = std::getenv(kOSSEndpoint);
if (endpoint) {
endpoint_ = endpoint;
}
}
if (bucket_name_.empty()) {
const char* bucket_name = std::getenv(kOSSBucketName);
if (bucket_name) {
bucket_name_ = bucket_name;
}
}
if (std::getenv(kOSSConcurrency)) {
concurrency_ = std::stoi(std::getenv(kOSSConcurrency));
}
LOG(INFO) << "OSS concurrency: " << concurrency_;
}
class UserRetryStrategy : public AlibabaCloud::OSS::RetryStrategy {
public:
UserRetryStrategy(long maxRetries = 3, long scaleFactor = 300)
: m_scaleFactor(scaleFactor), m_maxRetries(maxRetries) {}
bool shouldRetry(const AlibabaCloud::OSS::Error& error,
long attemptedRetries) const {
if (attemptedRetries >= m_maxRetries)
return false;
long responseCode = error.Status();
// http code
if ((responseCode == 403 &&
error.Message().find("RequestTimeTooSkewed") != std::string::npos) ||
(responseCode > 499 && responseCode < 599)) {
return true;
} else {
switch (responseCode) {
// curl error code
case (AlibabaCloud::OSS::ERROR_CURL_BASE + 7): // CURLE_COULDNT_CONNECT
case (AlibabaCloud::OSS::ERROR_CURL_BASE + 18): // CURLE_PARTIAL_FILE
case (AlibabaCloud::OSS::ERROR_CURL_BASE + 23): // CURLE_WRITE_ERROR
case (AlibabaCloud::OSS::ERROR_CURL_BASE +
28): // CURLE_OPERATION_TIMEDOUT
case (AlibabaCloud::OSS::ERROR_CURL_BASE + 52): // CURLE_GOT_NOTHING
case (AlibabaCloud::OSS::ERROR_CURL_BASE + 55): // CURLE_SEND_ERROR
case (AlibabaCloud::OSS::ERROR_CURL_BASE + 56): // CURLE_RECV_ERROR
return true;
default:
break;
};
}
return false;
}
long calcDelayTimeMs(const AlibabaCloud::OSS::Error& error,
long attemptedRetries) const {
return (1 << attemptedRetries) * m_scaleFactor;
}
private:
long m_scaleFactor;
long m_maxRetries;
};
template <typename ResultT>
std::string oss_outcome_to_string(
const std::string& additional_info,
const AlibabaCloud::OSS::Outcome<AlibabaCloud::OSS::OssError, ResultT>&
outcome) {
return additional_info + ", Outcome: code: " + outcome.error().Code() +
", message: " + outcome.error().Message() +
", requestId: " + outcome.error().RequestId();
}
std::string object_summary_to_string(
const AlibabaCloud::OSS::ObjectSummary& summary) {
return "ObjectSummary: key: " + summary.Key() + ", ETag: " + summary.ETag() +
", size: " + std::to_string(summary.Size()) +
", lastModified: " + summary.LastModified() +
", storageClass: " + summary.StorageClass() +
", type: " + summary.Type() + ", owner: " + summary.Owner().Id() +
", restoreInfo: " + summary.RestoreInfo();
}
gs::Status OSSRemoteStorageUploader::Open() {
if (conf_.accesskey_id_.empty() || conf_.accesskey_secret_.empty()) {
conf_.load_conf_from_env();
}
auto defaultRetryStrategy = std::make_shared<UserRetryStrategy>(5);
conf_.client_conf_.retryStrategy = defaultRetryStrategy;
client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
conf_.endpoint_, conf_.accesskey_id_, conf_.accesskey_secret_,
conf_.client_conf_);
return Status::OK();
}
gs::Status OSSRemoteStorageUploader::Put(const std::string& local_path,
const std::string& remote_path,
bool override) {
LOG(INFO) << "OSS Put local file " << local_path << " to remote "
<< remote_path;
if (!client_ || local_path.empty() || remote_path.empty()) {
return gs::Status(gs::StatusCode::INVALID_ARGUMENT,
"OSS Put invalid argument");
}
// check local path is exist
if (!std::filesystem::exists(local_path)) {
LOG(ERROR) << "OSS Put local file " << local_path << " not exist";
return gs::Status(gs::StatusCode::INVALID_ARGUMENT,
"OSS Put local file not exist");
}
AlibabaCloud::OSS::UploadObjectRequest request(conf_.bucket_name_,
remote_path, local_path);
request.MetaData().addHeader("x-oss-forbid-overwrite", "true");
request.setPartSize(conf_.partition_size_);
request.setThreadNum(conf_.concurrency_); // Increase the thread number to
// improve the upload speed
auto outcome = client_->ResumableUploadObject(request);
if (!outcome.isSuccess()) {
std::string error_string = "OSS ResumableUploadObject from local " +
local_path + " to remote " + remote_path +
" failed, code: " + outcome.error().Code() +
", message: " + outcome.error().Message() +
", requestId: " + outcome.error().RequestId();
LOG(ERROR) << error_string;
return gs::Status(gs::StatusCode::IO_ERROR, error_string);
}
return Status::OK();
}
gs::Status OSSRemoteStorageUploader::Delete(const std::string& remote_path) {
AlibabaCloud::OSS::DeleteObjectRequest request(conf_.bucket_name_,
remote_path);
auto outcome = client_->DeleteObject(request);
if (!outcome.isSuccess()) {
std::string error_string = "OSS DeleteObject " + remote_path +
" failed, code: " + outcome.error().Code() +
", message: " + outcome.error().Message() +
", requestId: " + outcome.error().RequestId();
LOG(ERROR) << error_string;
return gs::Status(gs::StatusCode::IO_ERROR, error_string);
}
return Status::OK();
}
gs::Status OSSRemoteStorageUploader::Close() {
client_.reset();
return Status::OK();
}
/// OSSRemote storage reader
gs::Status OSSRemoteStorageDownloader::Open() {
if (conf_.accesskey_id_.empty() || conf_.accesskey_secret_.empty()) {
conf_.load_conf_from_env();
}
auto defaultRetryStrategy = std::make_shared<UserRetryStrategy>(5);
conf_.client_conf_.retryStrategy = defaultRetryStrategy;
client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
conf_.endpoint_, conf_.accesskey_id_, conf_.accesskey_secret_,
conf_.client_conf_);
return Status::OK();
}
gs::Status OSSRemoteStorageDownloader::Get(const std::string& remote_path,
const std::string& local_path) {
LOG(INFO) << "OSS Get remote file " << remote_path << " to local "
<< local_path;
if (local_path.empty() || remote_path.empty()) {
return gs::Status(
gs::StatusCode::INVALID_ARGUMENT,
"OSS Get invalid argument, local path or remote path is empty");
}
// check local etag file is exist and equal to remote oss etag
std::string etag_file = local_path + ".etag";
std::string local_etag, oss_etag;
if (std::filesystem::exists(local_path) &&
gs::read_string_from_file(etag_file, local_etag) &&
get_metadata_etag(remote_path, oss_etag) && !local_etag.empty() &&
!oss_etag.empty() && oss_etag == local_etag) {
LOG(INFO) << "OSS Get local file " << local_path << " is up to date";
return Status::OK();
}
AlibabaCloud::OSS::DownloadObjectRequest request(conf_.bucket_name_,
remote_path, local_path);
request.setPartSize(conf_.partition_size_);
request.setThreadNum(conf_.concurrency_); // Increase the thread number to
// improve the download speed
auto outcome = client_->ResumableDownloadObject(request);
if (!outcome.isSuccess()) {
std::string error_string = oss_outcome_to_string(
"OSS ResumableDownloadObject from remote " + remote_path +
" to local " + local_path + " failed",
outcome);
LOG(ERROR) << error_string;
return gs::Status(gs::StatusCode::IO_ERROR, error_string);
}
if (std::filesystem::exists(local_path)) {
// get size
uint64_t file_size = std::filesystem::file_size(local_path);
LOG(INFO) << "OSS Get local file " << local_path
<< " success, size: " << file_size;
} else {
LOG(ERROR) << "OSS Get local file " << local_path << " failed";
return gs::Status(gs::StatusCode::IO_ERROR, "OSS Get local file failed");
}
if (!(get_metadata_etag(remote_path, oss_etag) &&
gs::write_string_to_file(oss_etag, etag_file))) {
LOG(ERROR) << "OSS Get write etag file " << etag_file << " failed";
return gs::Status(gs::StatusCode::IO_ERROR,
"OSS Get write etag file failed");
}
return Status::OK();
}
gs::Status OSSRemoteStorageDownloader::List(
const std::string& remote_prefix, std::vector<std::string>& path_list) {
std::string nextMarker = "";
bool isTruncated = false;
do {
AlibabaCloud::OSS::ListObjectsRequest request(conf_.bucket_name_);
request.setPrefix(remote_prefix);
request.setMarker(nextMarker);
auto outcome = client_->ListObjects(request);
if (!outcome.isSuccess()) {
std::string error_string = oss_outcome_to_string(
"OSS ListObjects from remote " + remote_prefix + " failed", outcome);
LOG(ERROR) << error_string;
return gs::Status(gs::StatusCode::IO_ERROR, error_string);
}
for (const auto& object : outcome.result().ObjectSummarys()) {
LOG(INFO) << "OSS ListObject: " << object_summary_to_string(object);
path_list.push_back(object.Key());
}
nextMarker = outcome.result().NextMarker();
isTruncated = outcome.result().IsTruncated();
} while (isTruncated);
return Status::OK();
}
gs::Status OSSRemoteStorageDownloader::Close() { return Status::OK(); }
bool OSSRemoteStorageDownloader::get_metadata_etag(
const std::string& remote_path, std::string& etag) {
if (remote_path.empty()) {
LOG(ERROR)
<< "OSS get_metadata_etag invalid argument, remote path is empty";
return false;
}
auto outcome = client_->GetObjectMeta(conf_.bucket_name_, remote_path);
if (!outcome.isSuccess()) {
std::string error_string = oss_outcome_to_string(
"OSS GetObjectMeta from remote " + remote_path + " failed", outcome);
LOG(ERROR) << error_string;
return false;
} else {
auto metadata = outcome.result();
LOG(INFO) << "OSS GetObjectMeta " << remote_path
<< " success, etag: " << metadata.ETag();
etag = metadata.ETag();
}
return true;
}
} // namespace gs
#endif // BUILD_WITH_OSS