perfkitbenchmarker/providers/ibmcloud/ibmcloud_manager.py (724 lines of code) (raw):

# Copyright 2020 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing base rest api constructs to run on IBM Cloud.""" import io import json import logging import os import time from typing import Any, Dict import urllib.parse from perfkitbenchmarker import vm_util import requests IMAGE_UPLOAD_ATTEMPTS = 5 SPEC_PATHS = [ 'public_gateways', 'security_groups', 'floating_ips', 'network_acls', ] PLURALS = { 'bgpconf': 'bgpconf', 'edgerouter': 'edgerouter', 'policy': 'policies', } API_VERSION_DATE = '2022-03-29' GENERATION = 'generation=' HTTP_TIMEOUT_CONNECT = 10 HTTP_TIMEOUT = 120 def RoundRobin(l1, l2): list1 = [] for i, item in enumerate(l1): if l2 and i < len(l2): list1.append(item) list1.append(l2[i]) else: list1.append(item) break return list1 def Plural(x): return PLURALS.get(x, x + 's') class IbmCloudRequestError(Exception): pass class IbmCloud: """Base object that handles IBM Cloud REST api call requests.""" def __init__( self, endpoint=None, url=None, account=None, apikey=None, vm_creator=False, token=None, verbose=False, version='v1', silent=False, force=False, trace=False, ): # For Token self._endpoint = endpoint or os.environ.get('IBMCLOUD_AUTH_ENDPOINT') self._url = url or os.environ.get('IBMCLOUD_ENDPOINT') self._acct = account or os.environ.get('IBMCLOUD_ACCOUNT_ID') self._apikey = apikey or os.environ.get('IBMCLOUD_APIKEY') self._token = token or os.environ.get('IBMCLOUD_TOKEN') self._token_time = time.time() self.version = version self._verbose = verbose self._silent = silent self._force = force self._trace = trace self._vm_creator = vm_creator self._generation = ( 'version=' + API_VERSION_DATE + '&' + GENERATION + (os.environ.get('IBMCLOUD_GENERATION') or '2') ) if not self._url: raise IbmCloudRequestError( 'url has to specified either in the initialization of the ' 'client or via an environment variable ("IBMCLOUD_ENDPOINT")' ) if not self._acct: raise IbmCloudRequestError( 'acct has to specified either in the initialization of the ' 'client or via an environment variable ("IBMCLOUD_ACCOUNT_ID")' ) if not self._apikey: raise IbmCloudRequestError( 'apikey has to specified either in the initialization of the ' 'client or via an environment variable ("IBMCLOUD_APIKEY")' ) parsed_url = urllib.parse.urlparse(self._url) self._netloc = parsed_url.netloc or parsed_url.path.split('/', 1)[0] self._scheme = parsed_url.scheme or 'http' self._search_accts = [self._acct, None, 'system'] # new token if not set, force set or if running for more than 50 min if ( not self._token or self._force or self._token_time < (time.time() - 3000) ): self.SetToken() @vm_util.Retry(max_retries=3, timeout=vm_util.DEFAULT_TIMEOUT) def GetToken(self): """Get a user token.""" token = None req_data = { 'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'response_type': 'cloud_iam', 'apikey': self._apikey, } count = 1 while token is None: if not self._silent: logging.info( 'Sending a POST request to get a token: %s', self._endpoint ) resp = requests.request( 'POST', self._endpoint, data=req_data, headers=None, timeout=(HTTP_TIMEOUT_CONNECT, HTTP_TIMEOUT), ) if resp is not None: if self._verbose: logging.info('response=%s', str(resp)) if resp.text: logging.info('response text=%s', resp.text) if resp.status_code >= 200 and resp.status_code <= 299: resp = json.loads(resp.text) if 'access_token' in resp: token = resp['access_token'] if self._verbose: logging.info('token: %s', token) break else: logging.error( 'POST: %s, response status_code: %s, resp: %s', self._endpoint, resp.status_code, str(resp), ) else: logging.info('Request POST: %s, no response returned', self._endpoint) count += 1 if count > 3: break time.sleep(5) return token def Token(self): """Returns the token.""" return self._token def SetToken(self): """Gets a new token.""" try: self._token = self.GetToken() self._token_time = time.time() except Exception as err: raise IbmCloudRequestError('Authorization failed: %s' % err) def Request( self, method, path, data=None, headers=None, timeout=None, session=None ): """Constructs base rest api calls to run against IBM Cloud regional api server. Args: method: one of GET, POST, PUT, PATCH or DELETE. path: rest api uri. data: body data. headers: header data if any. timeout: timeout for the request session: session object if available. Returns: request response. """ path = f'/{self.version}/{path}' if 'limit' in path: path += '&' + self._generation else: path += '?' + self._generation h = {'Authorization': 'Bearer ' + self._token} if self._trace: h['trace'] = 'on' if data is not None and not isinstance(data, io.IOBase): h['Content-Type'] = 'application/json' data = json.dumps(data) if headers: h.update(headers) if self._verbose: h['trace'] = 'on' curl = '>>> curl -X %s' % method for k, v in h.items(): curl += ' -H "%s: %s"' % (k, v) if data: curl += ' -d ' + '"' + str(data).replace('"', '\\"') + '"' curl += ' %s://%s%s' % (self._scheme, self._netloc, path) logging.info(curl) if data: logging.info('==== DATA ====') logging.info(data) res = None response_code = None response_text = None request_id = None try: data_timeout = timeout if timeout is not None else HTTP_TIMEOUT if session is None: res = requests.request( method, self._url + path, data=data, headers=h, timeout=(HTTP_TIMEOUT_CONNECT, data_timeout), ) # verify=False else: res = session.request( method, self._url + path, data=data, headers=h, timeout=(HTTP_TIMEOUT_CONNECT, data_timeout), ) if res is not None: request_id = ( res.headers['X-Request-Id'] if 'X-Request-Id' in res.headers else None ) if self._verbose: logging.info( 'Request %s:, %s, request id: %s ', method, path, request_id ) logging.info('response=%s', str(res)) if res.text: logging.info('response text=%s', res.text) response_code = res.status_code if res.status_code < 200 or res.status_code > 299: logging.error( 'Request %s:, %s, response status_code: %s, res: %s', method, path, res.status_code, str(res), ) try: if 'errors' in res.text: restext = json.loads(res.text) logging.info( ' *** %s -- %s\n\n', restext['errors'][0]['code'], restext['errors'][0]['message'], ) except Exception: # pylint: disable=broad-except pass response_text = str(res.text) else: logging.info('Request %s: %s, no response returned', method, path) except requests.exceptions.RequestException as e: logging.exception('RequestException: %s', e) if res and res.status_code != 204: try: res = json.loads(res.text) except: raise IbmCloudRequestError('Request %s failed: %s' % (path, str(res))) if self._verbose: logging.info('==== RESPONSE ====') logging.info(json.dumps(str(res), indent=2)) if self._vm_creator: return res, response_code, response_text, request_id else: return res def __getattr__(self, name): """Handles non standard action method names with underscores.""" if '_' not in name: raise AttributeError(name) action, path = name.split('_', 1) if path in SPEC_PATHS: paths = path else: paths = path.split('_') if action in ['create', 'show', 'delete']: if paths in SPEC_PATHS: paths = [paths] # must be a list else: paths = [Plural(p) for p in paths] else: paths = [Plural(p) for p in paths[:-1]] + [paths[-1]] if action == 'create': def Create(*args, **kwargs): path = '/'.join(RoundRobin(paths, args[:-1])) return self.Request('POST', path, args[-1], **kwargs) return Create if action in ['list', 'show']: def Show(*args, **kwargs): path = '/'.join(RoundRobin(paths, args)) return self.Request('GET', path, **kwargs) return Show if action == 'delete': def Delete(*args, **kwargs): path = '/'.join(RoundRobin(paths, args)) return self.Request('DELETE', path, **kwargs) return Delete raise AttributeError(name) class BaseManager: """Base class that handles IBM Cloud REST api call requests. Use the derived classes for each resource type. """ # This value should be overwritten by derived classes. _type = 'base' def __init__(self, genesis): self._g = genesis def GetUri(self, item) -> str: return '%ss' % self._type + '/' + item def GetUris(self, item): return '%s' % self._type + '/' + item def GetProfileUri(self): return '%s' % self._type + '/profiles' def Create(self, **kwargs): create_method = getattr(self._g, 'create_%s' % self._type) return create_method(kwargs) def List(self, **kwargs): list_method = getattr(self._g, 'list_%ss' % self._type) return list_method(**kwargs) or {} def Show(self, item): item_uri = self.GetUri(item) return self._g.Request('GET', item_uri) def Delete(self, item): item_uri = self.GetUri(item) return self._g.Request('DELETE', item_uri) def SetToken(self): self._g.set_token() def Token(self): return self._g.Token() class InstanceManager(BaseManager): """Handles requests on instances.""" _type = 'instance' def Create( self, name, imageid, profile, vpcid, zone=None, key=None, # pytype: disable=signature-mismatch # overriding-parameter-count-checks subnet=None, networks=None, resource_group=None, iops=None, capacity=None, user_data=None, session=None, **kwargs, ) -> Dict[str, Any]: """Construct and send a vm create request. Args: name: name of the vm. imageid: image id to use. profile: machine_type. vpcid: vpc id. zone: zone name. key: regional ssh key id. subnet: subnet id. networks: optional. additional subnet ids. resource_group: optional. iops: iops on the boot volume. capacity: boot volume size. user_data: user data for windows. session: session id if used. **kwargs: additonal args. Returns: request reponse. Raises: IbmCloudRequestError: if instance not found. """ req_data = { 'name': name, 'image': {'id': imageid}, 'vpc': {'id': vpcid}, 'profile': {'name': profile}, } if zone: req_data['zone'] = {'name': zone} if key: req_data['keys'] = [{'id': key}] if resource_group: req_data['resource_group'] = [{'id': resource_group}] if subnet: req_data['primary_network_interface'] = {'subnet': {'id': subnet}} if networks: req_data['network_interfaces'] = [] networkids = [str(item) for item in networks.split(',')] for subnet in networkids: req_data['network_interfaces'].append({'subnet': {'id': subnet}}) bootvol_attr = {} # attributes for the boot volume keycrn = kwargs.get('encryption_key', None) if capacity: bootvol_attr['capacity'] = capacity if iops: bootvol_attr['iops'] = iops if keycrn: bootvol_attr['encryption_key'] = {'crn': keycrn} if bootvol_attr: bootvol_attr['profile'] = {'name': 'general-purpose'} req_data['boot_volume_attachment'] = {'volume': bootvol_attr} data_volume = kwargs.get('external_data_volume', None) if data_volume: req_data['volume_attachments'] = [{'volume': {'id': data_volume}}] if user_data: req_data['user_data'] = user_data instance = self._g.create_instance(req_data, session=session) if not instance: raise IbmCloudRequestError('Failed to create instance') return instance def Start(self, instance) -> Dict[str, Any]: """Send a vm start request.""" inst_uri = self.GetUri(instance) + '/actions' return self._g.Request('POST', inst_uri, {'type': 'start'}) def Show(self, instance) -> Dict[str, Any]: """Send a vm get request.""" inst_uri = self.GetUri(instance) return self._g.Request('GET', inst_uri) def ShowPolling( self, instance, timeout_polling=HTTP_TIMEOUT_CONNECT, session=None ) -> Dict[str, Any]: """Send a vm get request.""" inst_uri = self.GetUri(instance) return self._g.Request( 'GET', inst_uri, timeout=timeout_polling, session=session ) def ShowInitialization(self, instance) -> Dict[str, Any]: """Send a vm get initialization request.""" inst_uri = self.GetUri(instance) + '/initialization' return self._g.Request('GET', inst_uri) def Delete(self, instance): """Send a vm delete request.""" inst_uri = self.GetUri(instance) return self._g.Request('DELETE', inst_uri) def Stop(self, instance, force=False) -> Dict[str, Any]: """Send a vm stop request.""" inst_uri = self.GetUri(instance) + '/actions' req_data = {'type': 'stop', 'force': force} return self._g.Request('POST', '%s' % inst_uri, req_data) def Reboot(self, instance, force=False) -> Dict[str, Any]: """Send a vm reboot request.""" inst_uri = self.GetUri(instance) + '/actions' req_data = {'type': 'reboot', 'force': force} return self._g.Request('POST', inst_uri, req_data) def ShowVnic(self, instance, vnicid) -> Dict[str, Any]: """Send a vm vnic get request.""" inst_uri = self.GetUri(instance) + '/network_interfaces/' + vnicid return self._g.Request('GET', inst_uri) def CreateVnic(self, instance, name, subnet) -> Dict[str, Any]: """Send a vm vnic create request.""" inst_uri = self.GetUri(instance) + '/network_interfaces' return self._g.Request( 'POST', inst_uri, {'name': name, 'subnet': {'id': subnet}} ) def ListVnics(self, instance) -> Dict[str, Any]: """Send a vm vnic list request.""" inst_uri = self.GetUri(instance) + '/network_interfaces' return self._g.Request('GET', inst_uri) def DeleteVnic(self, instance, vnicid): """Send a vm vnic delete request.""" inst_uri = self.GetUri(instance) + '/network_interfaces/' + vnicid return self._g.Request('DELETE', inst_uri) def CreateVolume(self, instance, name, volume, delete) -> Dict[str, Any]: """Send a volume create request on a vm.""" inst_uri = self.GetUri(instance) + '/volume_attachments' return self._g.Request( 'POST', '%s' % inst_uri, { 'delete_volume_on_instance_delete': delete, 'name': name, 'volume': {'id': volume}, }, ) def ListVolumes(self, instance) -> Dict[str, Any]: """Send a volume list request on a vm.""" inst_uri = self.GetUri(instance) + '/volume_attachments' return self._g.Request('GET', inst_uri) def ShowVolume(self, instance, volume_attachment) -> Dict[str, Any]: """Send a volume get request on a vm.""" inst_uri = ( self.GetUri(instance) + '/volume_attachments/' + volume_attachment ) return self._g.Request('GET', inst_uri) def DeleteVolume(self, instance, volume_attachment): """Send a volume delete request on a vm.""" inst_uri = ( self.GetUri(instance) + '/volume_attachments/' + volume_attachment ) return self._g.Request('DELETE', inst_uri) def ListProfiles(self) -> Dict[str, Any]: """Send a vm profiles list request.""" return self._g.Request('GET', '%s' % self.GetProfileUri()) def List(self, next_start=None, session=None) -> Dict[str, Any]: """Send a vm list request.""" inst_uri = '/' + self._g.version + '/instances?limit=100' if next_start: inst_uri += '&start=' + next_start return self._g.Request('GET', inst_uri, session=session) class VolumeManager(BaseManager): """Handles requests on volumes.""" _type = 'volume' def Create(self, zone, **kwargs) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a vm create request. Args: zone: zone name. **kwargs: name - volume name. profile - volume profile. capacity - boot volume size. iops - iops on the volume. resource_group - optional. encryption_key - key to encrypt, optional. Returns: created volume. """ req_data = { 'zone': {'name': zone}, 'profile': {'name': kwargs.get('profile', 'general-purpose')}, } if kwargs.get('capacity', None): req_data['capacity'] = kwargs.get('capacity') if kwargs.get('iops', None): req_data['iops'] = kwargs.get('iops') if kwargs.get('name', None): req_data['name'] = kwargs.get('name') if kwargs.get('resource_group', None): req_data['resource_group'] = {'id': kwargs.get('resource_group')} if kwargs.get('encryption_key', None): req_data['encryption_key'] = {'crn': kwargs.get('encryption_key')} return self._g.create_volume(req_data) def Show(self, vol_id) -> Dict[str, Any]: """Send a volume get request.""" return self._g.Request('GET', 'volumes/%s' % vol_id) def Delete(self, vol_id): """Send a volume delete request.""" return self._g.Request('DELETE', 'volumes/%s' % vol_id) def List(self, next_start=None) -> Dict[str, Any]: """Send a volume list request.""" uri = '/' + self._g.version + '/volumes?limit=100' if next_start: uri += '&start=' + next_start return self._g.Request('GET', uri) def ListProfiles(self) -> Dict[str, Any]: """Send a volume profile list request.""" return self._g.Request('GET', 'volume/profiles') class VPCManager(BaseManager): """Handles requests on VPC.""" _type = 'vpc' def Create(self, name) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a vm create request. Args: name: name of the vpc. Returns: returns created vpc. """ req_data = {'name': name} return self._g.create_vpc(req_data) def Show(self, vpc) -> Dict[str, Any]: """Send a vpc get request.""" return self._g.Request('GET', 'vpcs/%s' % vpc) def Delete(self, vpc): """Send a vpc delete request.""" return self._g.Request('DELETE', 'vpcs/%s' % vpc) def CreatePrefix(self, vpc, zone, cidr, name=None) -> Dict[str, Any]: """Send a vpc address prefix create request.""" req_data = {'zone': {'name': zone}, 'cidr': cidr} if name: req_data['name'] = name vpc_uri = self.GetUri(vpc) + '/address_prefixes' return self._g.Request('POST', '%s' % vpc_uri, req_data) def ListPrefix(self, vpc) -> Dict[str, Any]: """Send a vpc address prefix list request.""" return self._g.Request('GET', 'vpcs/%s/address_prefixes' % vpc) def DeletePrefix(self, vpc, prefix): """Send a vpc address prefix delete request.""" return self._g.Request( 'DELETE', 'vpcs/%s/address_prefixes/%s' % (vpc, prefix) ) def PatchPrefix( self, vpc, prefix, default=False, name=None ) -> Dict[str, Any]: """Send a vpc address prefix patch request.""" req_data = {'is_default': default} if name: req_data['name'] = name return self._g.Request( 'PATCH', 'vpcs/%s/address_prefixes/%s' % (vpc, prefix), req_data ) def CreateRoutingTable( self, vpc, name, routes=None, session=None ) -> Dict[str, Any]: """Send a vpc routing table create request.""" req_data = {'name': name} if routes: req_data['routes'] = routes vpc_uri = self.GetUri(vpc) + '/routing_tables' return self._g.Request('POST', '%s' % vpc_uri, req_data, session=session) def ShowRoutingTable(self, vpc, routing, session=None) -> Dict[str, Any]: """Send a vpc routing table get request.""" return self._g.Request( 'GET', 'vpcs/%s/routing_tables/%s' % (vpc, routing), session=session ) def PatchRoutingTable( self, vpc, routing, ingress, flag, session=None ) -> Dict[str, Any]: """Send a vpc routing table patch request.""" vpc_uri = self.GetUri(vpc) + '/routing_tables/' + routing return self._g.Request('PATCH', vpc_uri, {ingress: flag}, session=session) def DeleteRoutingTable(self, vpc, routing, session=None): """Send a vpc routing table delete request.""" vpc_uri = self.GetUri(vpc) + '/routing_tables/' + routing return self._g.Request('DELETE', '%s' % vpc_uri, session=session) def ListRoutingTable(self, vpc, session=None) -> Dict[str, Any]: """Send a vpc routing table list request.""" return self._g.Request( 'GET', 'vpcs/%s/routing_tables?limit=100' % vpc, session=session ) def CreateRoute( self, vpc, routing, name, zone, action, destination, nexthop=None, session=None, ) -> Dict[str, Any]: """Send a vpc route create request.""" req_data = { 'name': name, 'action': action, 'destination': destination, 'zone': {'name': zone}, } if nexthop: req_data['next_hop'] = {'address': nexthop} vpc_uri = self.GetUri(vpc) + '/routing_tables/' + routing + '/routes' return self._g.Request('POST', '%s' % vpc_uri, req_data, session=session) def DeleteRoute(self, vpc, routing, route, session=None): """Send a vpc route delete request.""" vpc_uri = ( self.GetUri(vpc) + '/routing_tables/' + routing + '/routes/' + route ) return self._g.Request('DELETE', '%s' % vpc_uri, session=session) def ListRoute(self, vpc, routing, session=None) -> Dict[str, Any]: """Send a vpc route list request.""" return self._g.Request( 'GET', 'vpcs/%s/routing_tables/%s/routes?limit=100' % (vpc, routing), session=session, ) class SGManager(BaseManager): """Handles requests on security groups.""" _type = 'security_groups' def Create(self, resource_group, vpcid, **kwargs) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a security group create request. Args: resource_group: optional. vpcid: vpc id. **kwargs: name - name of the vm. Returns: returns created security groups. """ req_data = {'vpc': {'id': vpcid}} if resource_group: req_data['resource_group'] = {'id': resource_group} if kwargs.get('name', None): req_data['name'] = kwargs.get('name', None) return self._g.create_security_groups(req_data) def List(self) -> Dict[str, Any]: """Send a security group list request.""" return self._g.Request('GET', self._type) def Show(self, sg) -> Dict[str, Any]: """Send a security group get request.""" sg_id = self.GetUris(sg) return self._g.Request('GET', sg_id) def Delete(self, sg): """Send a security group delete request.""" sg_id = self.GetUris(sg) return self._g.Request('DELETE', sg_id) def ShowRule(self, sg, ruleid) -> Dict[str, Any]: """Send a security group rule get request.""" sg_uri = self.GetUris(sg) + '/rules/' + ruleid return self._g.Request('GET', sg_uri) def ListRules(self, sg) -> Dict[str, Any]: """Send a security group rule list request.""" sg_uri = self.GetUris(sg) + '/rules' return self._g.Request('GET', sg_uri) def CreateRule( self, sg, direction, ip_version, cidr_block, protocol, port, port_min=None, port_max=None, ) -> Dict[str, Any]: """Send a security group rule create request. Args: sg: security group. direction: in or outbound. ip_version: ipv4 or 6. cidr_block: cidr_block. protocol: tcp or udp. port: port. port_min: port min. port_max: port max. Returns: returns request response. """ sg_uri = self.GetUris(sg) + '/rules' req_data = { 'direction': direction, 'ip_version': ip_version, 'remote': {'cidr_block': cidr_block}, 'protocol': protocol, } if port: req_data['port_min'] = port req_data['port_max'] = port elif port_min and port_max: req_data['port_min'] = port_min req_data['port_max'] = port_max return self._g.Request('POST', sg_uri, req_data) def DeleteRule(self, sg, ruleid): """Send a security group rule delete request.""" sg_uri = self.GetUris(sg) + '/rules/' + ruleid return self._g.Request('DELETE', '%s' % sg_uri) class PGManager(BaseManager): """Handles requests on public gateways.""" _type = 'public_gateways' def Create(self, resource_group, vpcid, zone, **kwargs) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a vm create request. Args: resource_group: optional. vpcid: vpc id. zone: zone name. **kwargs: name - name of the vm. floating_ip - optional, floating ip id. Returns: returns created public gateways. """ req_data = {'vpc': {'id': vpcid}, 'zone': {'name': zone}} if resource_group: req_data['resource_group'] = {'id': resource_group} if kwargs.get('name', None): req_data['name'] = kwargs.get('name', None) if kwargs.get('floating_ip', None): req_data['floating_ip'] = {'id': kwargs.get('floating_ip')} return self._g.create_public_gateways(req_data) def List(self) -> Dict[str, Any]: """Send a public gateway list request.""" return self._g.Request('GET', self._type) def Show(self, pg) -> Dict[str, Any]: """Send a public gateway get request.""" pg_id = self.GetUris(pg) return self._g.Request('GET', pg_id) def Delete(self, pg): """Send a public gateway delete request.""" pg_id = self.GetUris(pg) return self._g.Request('DELETE', pg_id) class RegionManager(BaseManager): """Handles requests on regions.""" _type = 'region' def Show(self, region) -> Dict[str, Any]: """Send a region get request.""" return self._g.Request('GET', 'regions/%s' % region) class KeyManager(BaseManager): """Handles requests on ssh keys.""" _type = 'keys' def Create(self, key, key_type, **kwargs) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a ssh key create request. Args: key: public key string. key_type: rsa. **kwargs: name - name of the key: resource_group - optional. Returns: returns created key. """ req_data = {'public_key': key, 'type': key_type} if kwargs.get('name', None): req_data['name'] = kwargs.get('name', None) if kwargs.get('resource_group', None): req_data['resource_group'] = {'id': kwargs.get('resource_group', None)} return self._g.create_key(req_data) def List(self) -> Dict[str, Any]: """Send a key list request.""" return self._g.Request('GET', self._type) def Show(self, key) -> Dict[str, Any]: """Send a key get request.""" key_id = self.GetUris(key) return self._g.Request('GET', key_id) def Delete(self, key): """Send a key delete request.""" key_id = self.GetUris(key) return self._g.Request('DELETE', key_id) class NetworkAclManager(BaseManager): """Handles requests on network acls.""" _type = 'network_acls' def Create(self, name) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a vm create request. Args: name: name of the vm. Returns: returns created network. """ req_data = {'name': name} return self._g.create_network_acls(req_data) def List(self) -> Dict[str, Any]: """Send a network_acl list request.""" return self._g.Request('GET', 'network_acls') def Show(self, network_acl) -> Dict[str, Any]: """Send a network_acl get request.""" network_acl_id = self.GetUris(network_acl) return self._g.Request('GET', network_acl_id) def Delete(self, network_acl): """Send a network_acl delete request.""" network_acl_id = self.GetUris(network_acl) return self._g.Request('DELETE', network_acl_id) class SubnetManager(BaseManager): """Handles requests on subnets.""" _type = 'subnet' def Create(self, subnet, vpcid, **kwargs) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a vm create request. Args: subnet: subnet vpcid: vpc id. **kwargs: name - name of the subnet. zone - zone name. ip_version - ipv4 or 6. ipv4_cidr_block - cidr for subnet. Returns: created subnet. """ req_data = { 'ipv4_cidr_block': subnet, 'vpc': {'id': vpcid}, 'ip_version': 'ipv4', } if kwargs.get('name'): req_data['name'] = kwargs.get('name') if kwargs.get('zone'): req_data['zone'] = {'name': kwargs.get('zone')} return self._g.create_subnet(req_data) def Show(self, subnet) -> Dict[str, Any]: """Send a subnet get request.""" subnet_id = self.GetUri(subnet) return self._g.Request('GET', subnet_id) def Delete(self, subnet): """Send a subnet delete request.""" subnet_id = self.GetUri(subnet) return self._g.Request('DELETE', subnet_id) def Patch(self, **kwargs) -> Dict[str, Any]: """Send a subnet patch request.""" subnet_id = self.GetUri(kwargs.get('subnet')) return self._g.Request('PATCH', subnet_id, kwargs) def ShowPg(self, subnet) -> Dict[str, Any]: """Send a subnet public gateway get request.""" uri = self.GetUri(subnet) + '/public_gateway' return self._g.Request('GET', uri) def AttachPg(self, subnet, pgid) -> Dict[str, Any]: """Send a subnet public gateway attach request.""" uri = self.GetUri(subnet) + '/public_gateway' req_data = {'id': pgid} return self._g.Request('PUT', uri, req_data) def DeletePg(self, subnet): """Send a subnet public gateway delete request.""" uri = self.GetUri(subnet) + '/public_gateway' return self._g.Request('DELETE', uri) class ImageManager(BaseManager): """Handles requests on images.""" _type = 'image' def Create(self, href, osname, name=None) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a vm create request. Args: href: href to the image. osname: os name. name: name of the image. Returns: created image. """ req_data = {'file': {'href': href}} if name is not None: req_data['name'] = name if osname is not None: req_data['operating_system'] = {'name': osname} return self._g.create_image(req_data) def Show(self, img) -> Dict[str, Any]: """Send a image get request.""" img_id = self.GetUri(img) return self._g.Request('GET', img_id) def Delete(self, img): """Send a image delete request.""" img_id = self.GetUri(img) return self._g.Request('DELETE', img_id) class FipManager(BaseManager): """Handles requests on fips.""" _type = 'floating_ips' def Create(self, resource_group, target, **kwargs) -> Dict[str, Any]: # pytype: disable=signature-mismatch # overriding-parameter-count-checks """Construct and send a vm create request. Args: resource_group: optional. target: id of the vm network interface. **kwargs: name - name of the fip. zone - zone name. Returns: returns floating IP addresses. """ req_data = {'target': {'id': target}} if resource_group: req_data['resource_group'] = {'id': resource_group} if kwargs.get('name', None): req_data['name'] = kwargs.get('name', None) if kwargs.get('zone', None): req_data['zone'] = {'name': kwargs.get('name', None)} return self._g.create_floating_ips(req_data) def List(self) -> Dict[str, Any]: """Send a fip list request.""" return self._g.Request('GET', self._type) def Show(self, fip) -> Dict[str, Any]: """Send a fip get request.""" fip_id = self.GetUris(fip) return self._g.Request('GET', fip_id) def Delete(self, fip): """Send a fip delete request.""" fip_id = self.GetUris(fip) return self._g.Request('DELETE', fip_id)