in cloud/src/meta-service/meta_service_resource.cpp [903:1199]
void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* controller,
const AlterObjStoreInfoRequest* request,
AlterObjStoreInfoResponse* response,
::google::protobuf::Closure* done) {
std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region, role_arn, external_id;
bool use_path_style;
EncryptionInfoPB encryption_info;
AkSkPair cipher_ak_sk_pair;
RPC_PREPROCESS(alter_storage_vault);
switch (request->op()) {
case AlterObjStoreInfoRequest::ADD_S3_VAULT:
case AlterObjStoreInfoRequest::DROP_S3_VAULT: {
auto tmp_desc = ObjectStorageDesc {ak, sk,
bucket, prefix,
endpoint, external_endpoint,
region, use_path_style,
role_arn, external_id};
if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info,
cipher_ak_sk_pair)) {
return;
}
} break;
case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: {
// It should at least has one hdfs info or obj info inside storage vault
if ((!request->has_vault())) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "storage vault is set " + proto_to_json(*request);
return;
}
break;
}
case AlterObjStoreInfoRequest::ADD_HDFS_INFO:
case AlterObjStoreInfoRequest::DROP_HDFS_INFO: {
if (!request->has_vault() || !request->vault().has_name()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "hdfs info is not found " + proto_to_json(*request);
return;
}
} break;
case AlterObjStoreInfoRequest::SET_DEFAULT_VAULT: {
if (!request->has_vault() || !request->vault().has_name()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "hdfs info is not found " + proto_to_json(*request);
return;
}
break;
}
case AlterObjStoreInfoRequest::ALTER_S3_VAULT:
break;
case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT:
break;
case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT:
break;
case AlterObjStoreInfoRequest::UNKNOWN: {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "Unknown alter info " + proto_to_json(*request);
return;
} break;
default:
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "Unknown alter obj store info, request info " + proto_to_json(*request);
LOG_WARNING("Unknown alter obj store info, request info {}", request->DebugString());
return;
}
// TODO(dx): check s3 info right
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
if (cloud_unique_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "cloud unique id not set";
return;
}
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
return;
}
RPC_RATE_LIMIT(alter_obj_store_info)
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
LOG(WARNING) << msg << " err=" << err;
return;
}
err = txn->get(key, &val);
LOG(INFO) << "get instance_key=" << hex(key);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
msg = ss.str();
return;
}
InstanceInfoPB instance;
if (!instance.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse InstanceInfoPB";
return;
}
if (instance.status() == InstanceInfoPB::DELETED) {
code = MetaServiceCode::CLUSTER_NOT_FOUND;
msg = "instance status has been set delete, plz check it";
return;
}
switch (request->op()) {
case AlterObjStoreInfoRequest::ADD_S3_VAULT: {
if (!instance.enable_storage_vault()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "Storage vault doesn't support storage vault";
return;
}
auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info();
if (!obj.has_provider()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "s3 conf lease provider info";
return;
}
if (instance.obj_info().size() >= 10) {
code = MetaServiceCode::UNDEFINED_ERR;
msg = "this instance history has greater than 10 objs, please new another instance";
return;
}
// ATTN: prefix may be empty
if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty() ||
endpoint.empty() || region.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "s3 conf info err, please check it";
return;
}
auto& objs = instance.obj_info();
for (auto& it : objs) {
if (bucket == it.bucket() && prefix == it.prefix() && endpoint == it.endpoint() &&
region == it.region() && ak == it.ak() && sk == it.sk() &&
obj.provider() == it.provider() && external_endpoint == it.external_endpoint()) {
// err, anything not changed
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "original obj infos has a same conf, please check it";
return;
}
}
// calc id
auto tmp_tuple = ObjectStorageDesc {ak, sk,
bucket, prefix,
endpoint, external_endpoint,
region, use_path_style,
role_arn, external_id};
ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance,
encryption_info, cipher_ak_sk_pair);
if (instance.storage_vault_names().end() !=
std::find_if(instance.storage_vault_names().begin(),
instance.storage_vault_names().end(),
[&](const std::string& candidate_name) {
return candidate_name == request->vault().name();
})) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already created", request->vault().name());
return;
}
StorageVaultPB vault;
vault.set_id(last_item.id());
vault.set_name(request->vault().name());
*instance.mutable_resource_ids()->Add() = vault.id();
*instance.mutable_storage_vault_names()->Add() = vault.name();
vault.mutable_obj_info()->MergeFrom(last_item);
auto vault_key = storage_vault_key({instance.instance_id(), last_item.id()});
txn->put(vault_key, vault.SerializeAsString());
if (request->has_set_as_default_storage_vault() &&
request->set_as_default_storage_vault()) {
response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
set_default_vault_log_helper(instance, vault.name(), vault.id());
instance.set_default_storage_vault_id(vault.id());
instance.set_default_storage_vault_name(vault.name());
}
response->set_storage_vault_id(vault.id());
LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault.id(),
vault.name(), hex(vault_key));
} break;
case AlterObjStoreInfoRequest::ADD_HDFS_INFO: {
if (auto ret = add_vault_into_instance(
instance, txn.get(), const_cast<StorageVaultPB&>(request->vault()), code, msg);
ret != 0) {
return;
}
if (request->has_set_as_default_storage_vault() &&
request->set_as_default_storage_vault()) {
response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
set_default_vault_log_helper(instance, *instance.storage_vault_names().rbegin(),
*instance.resource_ids().rbegin());
instance.set_default_storage_vault_id(*instance.resource_ids().rbegin());
instance.set_default_storage_vault_name(*instance.storage_vault_names().rbegin());
}
response->set_storage_vault_id(request->vault().id());
break;
}
case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: {
// If the resource ids is empty then it would be the first vault
if (!instance.resource_ids().empty()) {
std::stringstream ss;
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "Default vault can not be modified";
msg = ss.str();
return;
}
if (auto ret = add_vault_into_instance(
instance, txn.get(), const_cast<StorageVaultPB&>(request->vault()), code, msg);
ret != 0) {
return;
}
return;
}
case AlterObjStoreInfoRequest::DROP_HDFS_INFO: {
if (auto ret = remove_hdfs_storage_vault(instance, txn.get(), request->vault(), code, msg);
ret != 0) {
return;
}
break;
}
case AlterObjStoreInfoRequest::SET_DEFAULT_VAULT: {
const auto& name = request->vault().name();
auto name_itr = std::find_if(instance.storage_vault_names().begin(),
instance.storage_vault_names().end(),
[&](const auto& vault_name) { return name == vault_name; });
if (name_itr == instance.storage_vault_names().end()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid storage vault name, name =" << name;
msg = ss.str();
return;
}
auto pos = name_itr - instance.storage_vault_names().begin();
std::string vault_id = instance.resource_ids().begin()[pos];
response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
set_default_vault_log_helper(instance, name, vault_id);
instance.set_default_storage_vault_id(vault_id);
instance.set_default_storage_vault_name(name);
response->set_storage_vault_id(vault_id);
break;
}
case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT: {
LOG_INFO("unset instance's default vault, instance id {}, previous default vault {}, id {}",
instance.instance_id(), instance.default_storage_vault_name(),
instance.default_storage_vault_id());
instance.clear_default_storage_vault_id();
instance.clear_default_storage_vault_name();
break;
}
case AlterObjStoreInfoRequest::ALTER_S3_VAULT: {
alter_s3_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: {
alter_hdfs_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::DROP_S3_VAULT:
[[fallthrough]];
default: {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid request op, op=" << request->op();
msg = ss.str();
return;
}
}
LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size()
<< " s3 history info, and instance = " << proto_to_json(instance);
val = instance.SerializeAsString();
if (val.empty()) {
msg = "failed to serialize";
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
return;
}
txn->put(key, val);
LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}
}