void MetaServiceImpl::alter_storage_vault()

in cloud/src/meta-service/meta_service_resource.cpp [903:1199]


void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* controller,
                                          const AlterObjStoreInfoRequest* request,
                                          AlterObjStoreInfoResponse* response,
                                          ::google::protobuf::Closure* done) {
    std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region, role_arn, external_id;
    bool use_path_style;
    EncryptionInfoPB encryption_info;
    AkSkPair cipher_ak_sk_pair;
    RPC_PREPROCESS(alter_storage_vault);
    switch (request->op()) {
    case AlterObjStoreInfoRequest::ADD_S3_VAULT:
    case AlterObjStoreInfoRequest::DROP_S3_VAULT: {
        auto tmp_desc = ObjectStorageDesc {ak,       sk,
                                           bucket,   prefix,
                                           endpoint, external_endpoint,
                                           region,   use_path_style,
                                           role_arn, external_id};
        if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info,
                                             cipher_ak_sk_pair)) {
            return;
        }
    } break;
    case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: {
        // It should at least has one hdfs info or obj info inside storage vault
        if ((!request->has_vault())) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "storage vault is set " + proto_to_json(*request);
            return;
        }
        break;
    }
    case AlterObjStoreInfoRequest::ADD_HDFS_INFO:
    case AlterObjStoreInfoRequest::DROP_HDFS_INFO: {
        if (!request->has_vault() || !request->vault().has_name()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "hdfs info is not found " + proto_to_json(*request);
            return;
        }
    } break;
    case AlterObjStoreInfoRequest::SET_DEFAULT_VAULT: {
        if (!request->has_vault() || !request->vault().has_name()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "hdfs info is not found " + proto_to_json(*request);
            return;
        }
        break;
    }
    case AlterObjStoreInfoRequest::ALTER_S3_VAULT:
        break;
    case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT:
        break;
    case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT:
        break;
    case AlterObjStoreInfoRequest::UNKNOWN: {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "Unknown alter info " + proto_to_json(*request);
        return;
    } break;
    default:
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "Unknown alter obj store info, request info " + proto_to_json(*request);
        LOG_WARNING("Unknown alter obj store info, request info {}", request->DebugString());
        return;
    }

    // TODO(dx): check s3 info right

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(alter_obj_store_info)
    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    std::unique_ptr<Transaction> txn;
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    if (instance.status() == InstanceInfoPB::DELETED) {
        code = MetaServiceCode::CLUSTER_NOT_FOUND;
        msg = "instance status has been set delete, plz check it";
        return;
    }

    switch (request->op()) {
    case AlterObjStoreInfoRequest::ADD_S3_VAULT: {
        if (!instance.enable_storage_vault()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "Storage vault doesn't support storage vault";
            return;
        }
        auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info();
        if (!obj.has_provider()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 conf lease provider info";
            return;
        }
        if (instance.obj_info().size() >= 10) {
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = "this instance history has greater than 10 objs, please new another instance";
            return;
        }
        // ATTN: prefix may be empty
        if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty() ||
            endpoint.empty() || region.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 conf info err, please check it";
            return;
        }

        auto& objs = instance.obj_info();
        for (auto& it : objs) {
            if (bucket == it.bucket() && prefix == it.prefix() && endpoint == it.endpoint() &&
                region == it.region() && ak == it.ak() && sk == it.sk() &&
                obj.provider() == it.provider() && external_endpoint == it.external_endpoint()) {
                // err, anything not changed
                code = MetaServiceCode::INVALID_ARGUMENT;
                msg = "original obj infos has a same conf, please check it";
                return;
            }
        }
        // calc id
        auto tmp_tuple = ObjectStorageDesc {ak,       sk,
                                            bucket,   prefix,
                                            endpoint, external_endpoint,
                                            region,   use_path_style,
                                            role_arn, external_id};
        ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance,
                                                             encryption_info, cipher_ak_sk_pair);
        if (instance.storage_vault_names().end() !=
            std::find_if(instance.storage_vault_names().begin(),
                         instance.storage_vault_names().end(),
                         [&](const std::string& candidate_name) {
                             return candidate_name == request->vault().name();
                         })) {
            code = MetaServiceCode::ALREADY_EXISTED;
            msg = fmt::format("vault_name={} already created", request->vault().name());
            return;
        }
        StorageVaultPB vault;
        vault.set_id(last_item.id());
        vault.set_name(request->vault().name());
        *instance.mutable_resource_ids()->Add() = vault.id();
        *instance.mutable_storage_vault_names()->Add() = vault.name();
        vault.mutable_obj_info()->MergeFrom(last_item);
        auto vault_key = storage_vault_key({instance.instance_id(), last_item.id()});
        txn->put(vault_key, vault.SerializeAsString());
        if (request->has_set_as_default_storage_vault() &&
            request->set_as_default_storage_vault()) {
            response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
            set_default_vault_log_helper(instance, vault.name(), vault.id());
            instance.set_default_storage_vault_id(vault.id());
            instance.set_default_storage_vault_name(vault.name());
        }
        response->set_storage_vault_id(vault.id());
        LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault.id(),
                 vault.name(), hex(vault_key));
    } break;
    case AlterObjStoreInfoRequest::ADD_HDFS_INFO: {
        if (auto ret = add_vault_into_instance(
                    instance, txn.get(), const_cast<StorageVaultPB&>(request->vault()), code, msg);
            ret != 0) {
            return;
        }
        if (request->has_set_as_default_storage_vault() &&
            request->set_as_default_storage_vault()) {
            response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
            set_default_vault_log_helper(instance, *instance.storage_vault_names().rbegin(),
                                         *instance.resource_ids().rbegin());
            instance.set_default_storage_vault_id(*instance.resource_ids().rbegin());
            instance.set_default_storage_vault_name(*instance.storage_vault_names().rbegin());
        }
        response->set_storage_vault_id(request->vault().id());
        break;
    }
    case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: {
        // If the resource ids is empty then it would be the first vault
        if (!instance.resource_ids().empty()) {
            std::stringstream ss;
            code = MetaServiceCode::INVALID_ARGUMENT;
            ss << "Default vault can not be modified";
            msg = ss.str();
            return;
        }
        if (auto ret = add_vault_into_instance(
                    instance, txn.get(), const_cast<StorageVaultPB&>(request->vault()), code, msg);
            ret != 0) {
            return;
        }
        return;
    }
    case AlterObjStoreInfoRequest::DROP_HDFS_INFO: {
        if (auto ret = remove_hdfs_storage_vault(instance, txn.get(), request->vault(), code, msg);
            ret != 0) {
            return;
        }
        break;
    }
    case AlterObjStoreInfoRequest::SET_DEFAULT_VAULT: {
        const auto& name = request->vault().name();
        auto name_itr = std::find_if(instance.storage_vault_names().begin(),
                                     instance.storage_vault_names().end(),
                                     [&](const auto& vault_name) { return name == vault_name; });
        if (name_itr == instance.storage_vault_names().end()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            ss << "invalid storage vault name, name =" << name;
            msg = ss.str();
            return;
        }
        auto pos = name_itr - instance.storage_vault_names().begin();
        std::string vault_id = instance.resource_ids().begin()[pos];
        response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
        set_default_vault_log_helper(instance, name, vault_id);
        instance.set_default_storage_vault_id(vault_id);
        instance.set_default_storage_vault_name(name);
        response->set_storage_vault_id(vault_id);
        break;
    }
    case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT: {
        LOG_INFO("unset instance's default vault, instance id {}, previous default vault {}, id {}",
                 instance.instance_id(), instance.default_storage_vault_name(),
                 instance.default_storage_vault_id());
        instance.clear_default_storage_vault_id();
        instance.clear_default_storage_vault_name();
        break;
    }
    case AlterObjStoreInfoRequest::ALTER_S3_VAULT: {
        alter_s3_storage_vault(instance, txn, request->vault(), code, msg, response);
        break;
    }
    case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: {
        alter_hdfs_storage_vault(instance, txn, request->vault(), code, msg, response);
        break;
    }
    case AlterObjStoreInfoRequest::DROP_S3_VAULT:
        [[fallthrough]];
    default: {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid request op, op=" << request->op();
        msg = ss.str();
        return;
    }
    }

    LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size()
              << " s3 history info, and instance = " << proto_to_json(instance);

    val = instance.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return;
    }

    txn->put(key, val);
    LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }
}