perfkitbenchmarker/providers/ibmcloud/ibm_api.py (492 lines of code) (raw):
# Copyright 2020 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for working with IBM Cloud resources."""
import json
import logging
import os
import threading
import time
from absl import flags
from perfkitbenchmarker import errors
from perfkitbenchmarker.providers.ibmcloud import ibmcloud_manager
from perfkitbenchmarker.providers.ibmcloud import util
FLAGS = flags.FLAGS
ONE_HOUR = 3600
class Enum(tuple):
__getattr__ = tuple.index
class States(Enum):
AVAILABLE = 'available'
RUNNING = 'running'
STOPPED = 'stopped'
ATTACHED = 'attached'
class IbmAPICommand:
"""A IBM Cloud rest api command.
Attributes:
args: list of strings. Positional args to pass, typically specifying an
operation to perform (e.g. ['image', 'list'] to list available images).
flags: a dict mapping flag name string to flag value. Flags to pass to ibm
cloud rest api. If a provided value is True, the flag is passed without a
value. If a provided value is a list, the flag is passed multiple times,
once with each value in the list.
ibmcloud_auth_token: Auth token for cmd.
ibmcloud_auth_token_time: Time when auth token was acquired.
user_data: user data info.
vpc_id: vpc_id - passed in flags.
sg_id: sg_id - passed in flags.
zone: zone - passed in flags.
cidr: cidr - passed in flags.
name: name - passed in flags.
prefix: prefix - passed in flags.
items: items - passed in flags
capacity: capacity - passed in flags.
iops: iops - passed in flags.
profile: profile - passed in flags.
encryption_key: encryption_key - passed in flags.
resource_group: resource_group - passed in flags.
volume: volume - passed in flags.
pubkey: pubkey - passed in flags.
imageid: imageid - passed in flags.
target: target - passed in flags.
vnic_id: vnic_id - passed in flags.
fip_id: fip_id - passed in flags.
instance_id: instance_id - passed in flags.
subnet: subnet - passed in flags.
image_id: image_id - passed in flags.
image_name: image_name - passed in flags.
resource_id: resource_id - passed in flags.
data_encryption_key: data encryption key for account.
boot_encryption_key: boot encryption key for account.
"""
gen: ibmcloud_manager.IbmCloud = None
gen_instmgr: ibmcloud_manager.InstanceManager
gen_imgmgr: ibmcloud_manager.ImageManager
gen_keymgr: ibmcloud_manager.KeyManager
gen_subnetmgr: ibmcloud_manager.SubnetManager
gen_fipmgr: ibmcloud_manager.FipManager
gen_volumemgr: ibmcloud_manager.VolumeManager
gen_vpcmgr: ibmcloud_manager.VPCManager
gen_sgmgr: ibmcloud_manager.SGManager
ibmcloud_account_id = None
ibmcloud_apikey = None
ibmcloud_auth_token = None
ibmcloud_auth_token_time = 0
_lock = threading.Lock()
def __init__(self, *args, **kwargs):
"""Initializes a IbmAPICommand with the provided args and flags.
Args:
*args: sequence of strings. Positional args to pass to rest api calls,
typically specifying an operation to perform (e.g. ['image', 'list'] to
list available images).
**kwargs: resource - A IBM Cloud resource of type BaseResource.
"""
self._CheckEnvironment()
if (
self.gen is None
or time.time() - self.ibmcloud_auth_token_time >= ONE_HOUR
):
IbmAPICommand.gen = ibmcloud_manager.IbmCloud(
account=self.ibmcloud_account_id,
apikey=self.ibmcloud_apikey,
verbose=False,
version='v1',
silent=True,
force=False,
)
IbmAPICommand.gen_instmgr = ibmcloud_manager.InstanceManager(
IbmAPICommand.gen
)
IbmAPICommand.gen_imgmgr = ibmcloud_manager.ImageManager(
IbmAPICommand.gen
)
IbmAPICommand.gen_fipmgr = ibmcloud_manager.FipManager(IbmAPICommand.gen)
IbmAPICommand.gen_keymgr = ibmcloud_manager.KeyManager(IbmAPICommand.gen)
IbmAPICommand.gen_subnetmgr = ibmcloud_manager.SubnetManager(
IbmAPICommand.gen
)
IbmAPICommand.gen_volumemgr = ibmcloud_manager.VolumeManager(
IbmAPICommand.gen
)
IbmAPICommand.gen_vpcmgr = ibmcloud_manager.VPCManager(IbmAPICommand.gen)
IbmAPICommand.gen_sgmgr = ibmcloud_manager.SGManager(IbmAPICommand.gen)
self.ibmcloud_auth_token = self.GetToken()
self.ibmcloud_auth_token_time = time.time()
self.args = args
self.flags = kwargs
self.user_data = None
# self.sgid = None
@property
def vpc_id(self):
return self.flags['vpcid']
@property
def sg_id(self):
return self.flags['sgid']
@property
def zone(self):
return self.flags['zone']
@property
def cidr(self):
return self.flags['cidr']
@property
def name(self):
return self.flags['name']
@property
def prefix(self):
return self.flags['prefix']
@property
def items(self):
return self.flags['items']
@property
def capacity(self):
return self.flags['capacity']
@property
def iops(self):
return self.flags['iops']
@property
def profile(self):
return self.flags['profile']
@property
def encryption_key(self):
return self.flags['encryption_key']
@property
def resource_group(self):
return self.flags['resource_group']
@property
def volume(self):
return self.flags['volume']
@property
def pubkey(self):
return self.flags['pubkey']
@property
def image_id(self):
return self.flags['imageid']
@property
def target(self):
return self.flags['target']
@property
def vnic_id(self):
return self.flags['vnicid']
@property
def fip_id(self):
return self.flags['fipid']
@property
def instance_id(self):
return self.flags['instanceid']
@property
def subnet(self):
return self.flags['subnet']
@property
def image_name(self):
return self.flags['image_name']
@property
def resource_id(self):
return self.flags['id']
def _CheckEnvironment(self):
"""Get the information needed to authenticate on IBM Cloud.
Check for the config if env variables for apikey is not set.
"""
if not os.environ.get('IBMCLOUD_ENDPOINT'):
raise errors.Config.InvalidValue(
'PerfKit Benchmarker on IBM Cloud requires that the '
'environment variable IBMCLOUD_ENDPOINT is set.'
)
account = util.Account(
os.environ.get('IBMCLOUD_ACCOUNT_ID'),
os.environ.get('IBMCLOUD_APIKEY'),
None,
)
if not account.name or not account.apikey:
accounts = os.environ.get('IBMCLOUD_ACCOUNTS') # read from config file
if accounts and accounts:
config = util.ReadConfig(accounts)
benchmark = config[FLAGS.benchmarks[0]][0]
# first account
account = util.Account(benchmark[0], benchmark[1], benchmark[2])
logging.info('account_id: %s', account.name)
if FLAGS.ibmcloud_datavol_encryption_key: # if this flag is set
self.data_encryption_key = account.enckey
logging.info('KP key to use, data: %s', self.data_encryption_key)
if FLAGS.ibmcloud_bootvol_encryption_key: # use same key as data
self.boot_encryption_key = account.enckey
logging.info('KP key to use, boot: %s', self.boot_encryption_key)
if not account.name or not account.apikey:
raise errors.Config.InvalidValue(
'PerfKit Benchmarker on IBM Cloud requires that the '
'environment variables IBMCLOUD_ACCOUNT_ID and IBMCLOUD_APIKEY '
'are correctly set.'
)
def GetToken(self):
"""Returns user token."""
return self.gen.GetToken()
def CreateSgRules(self):
"""Creates default security group rules needed for vms to communicate."""
self.gen_sgmgr.CreateRule(
self.sg_id, 'inbound', 'ipv4', '0.0.0.0/0', 'icmp', None
)
self.gen_sgmgr.CreateRule(
self.sg_id, 'inbound', 'ipv4', '0.0.0.0/0', 'tcp', 22
)
self.gen_sgmgr.CreateRule(
self.sg_id,
'inbound',
'ipv4',
'0.0.0.0/0',
'tcp',
None,
port_min=1024,
port_max=50000,
)
self.gen_sgmgr.CreateRule(
self.sg_id,
'inbound',
'ipv4',
'0.0.0.0/0',
'udp',
None,
port_min=1024,
port_max=50000,
)
self.gen_sgmgr.CreateRule(
self.sg_id, 'inbound', 'ipv4', '0.0.0.0/0', 'tcp', 443
)
def GetSecurityGroupId(self):
"""Returns default security group id."""
logging.info('Looking up existing default sg')
prefix = self.prefix
resp = self.gen_sgmgr.List()
if resp and 'security_groups' in resp:
for item in resp['security_groups']:
# find the one with matching vpc name
if item['vpc']['name'].startswith(prefix):
return item['id']
else:
logging.error('Failed to retrieve security group id: %s', resp)
return None
def CreatePrefix(self):
"""Creates address prefix on vpc needed to attach subnets.
Flags:
vpcid: ibm cloud vpc id.
zone: name of the zone within ibm network.
cidr: ip range that this prefix will cover.
name: name of this prefix.
Returns:
The json representation of the created address prefix.
"""
return self.gen_vpcmgr.CreatePrefix(
self.vpc_id, self.zone, self.cidr, name=self.name
)
def CreateSubnet(self):
"""Creates a subnet on the vpc.
Flags:
vpcid: ibm cloud vpc id.
zone: name of the zone within ibm network.
cidr: ip range for the subnet.
name: name of this subnet.
Returns:
The json representation of the created subnet.
"""
kwargs = {}
kwargs['name'] = self.name
kwargs['zone'] = self.zone
return self.gen_subnetmgr.Create(self.cidr, self.vpc_id, **kwargs)
def DeleteResource(self):
"""Deletes a resource based on items set in the flags.
Flags:
items: type of resource to delete.
id: matching id to delete.
"""
data_mgr = self.gen_subnetmgr
if self.items == 'vpcs':
data_mgr = self.gen_vpcmgr
elif self.items == 'keys':
data_mgr = self.gen_keymgr
resp = data_mgr.List()
if resp and self.items in resp:
for item in resp[self.items]:
item_id = item.get('id')
if item_id and item_id == self.resource_id:
try:
data_mgr.Delete(item_id)
logging.info('Deleted %s, id: %s', self.items, item_id)
except Exception: # pylint: disable=broad-except
pass
else:
logging.info('No items found to delete: %s', self.items)
def GetResource(self):
"""Returns id of the found resource matching the resource type and prefix.
Flags:
items: type of resource.
zone: name of the zone within ibm network, for subnets.
preifx: a label matching the resource name.
Returns:
The id of the first matching resource.
"""
logging.info(
'Looking up existing %s matching prefix: %s, zone: %s',
self.items,
self.prefix,
self.zone,
)
data_mgr = self.gen_subnetmgr
if self.items == 'vpcs':
data_mgr = self.gen_vpcmgr
elif self.items == 'keys':
data_mgr = self.gen_keymgr
# instead of returning a list, we just return first matching item
resourceid = None
resp = data_mgr.List()
if resp and self.items in resp:
for item in resp[self.items]:
itemid = item.get('id')
name = item.get('name')
if itemid and name and name.startswith(self.prefix):
if self.items == 'subnets':
if self.zone == item.get('zone')['name']:
resourceid = itemid # find the id that matches the zone name
break
else: # for vpc and key
resourceid = itemid # # for vpcs, just get the first matching
break
logging.info('Resource found, id: %s', resourceid)
else:
logging.error('Failed to retrieve %s, no data returned', self.items)
return resourceid
def ListSubnetsExtra(self):
"""Returns a list of extra subnets, this is not used for regular runs.
Flags:
zone: name of the zone within ibm network, for subnets.
preifx: a label matching the resource name.
Returns:
List of ids of the additional subnets matching the prefix and zone.
"""
items = 'subnets'
logging.info('Looking up extra subnets for zone: %s', self.zone)
data_mgr = self.gen_subnetmgr
subnets = {}
resp = data_mgr.List()
if resp and items in resp:
for item in resp[items]:
itemid = item.get('id')
name = item.get('name')
ipv4_cidr_block = item.get('ipv4_cidr_block')
# extra subnets start with 'sxs'
if itemid and name and name.startswith(self.prefix):
if self.zone == item.get('zone')['name']:
logging.info('Found extra subnet, name: %s, id: %s', name, itemid)
names = name.split(util.DELIMITER)
if names[0] not in subnets:
subnets[names[0]] = {}
subnets[names[0]]['id'] = itemid
# needed to create route later
subnets[names[0]]['ipv4_cidr_block'] = ipv4_cidr_block
else:
logging.error('Failed to retrieve extra subnets, no data returned')
return subnets
def CreateVolume(self):
"""Creates an external disk volume.
Flags:
name: name of the new volume.
capacity: size of the volume in GB.
iops: desired iops on the volume.
profile: profile name of the volume, usually custom for large volumes
encryption_key: optional, encrytion key
resource_group: optional, resource group id to assign the volume to.
Returns:
json representation of the created volume.
"""
kwargs = {}
kwargs['name'] = self.name
kwargs['capacity'] = self.capacity
kwargs['iops'] = self.iops
kwargs['profile'] = self.profile
if 'encryption_key' in self.flags:
kwargs['encryption_key'] = self.encryption_key
if 'resource_group' in self.flags:
kwargs['resource_group'] = self.resource_group
return json.dumps(self.gen_volumemgr.Create(self.zone, **kwargs))
def DeleteVolume(self):
"""Deletes volume."""
return self.gen_volumemgr.Delete(self.volume)
def ShowVolume(self):
"""Shows volume in json."""
return json.dumps(self.gen_volumemgr.Show(self.volume), indent=2)
def CreateVpc(self):
"""Creates a vpc.
Flags:
name: name of the vpc to create.
Returns:
The id of the created vpc.
"""
vpcid = None
resp = self.gen_vpcmgr.Create(self.name)
if resp:
vpcid = resp.get('id')
sgid = resp.get('default_security_group')['id']
logging.info('Created vpc: %s, sgid: %s', vpcid, sgid)
self.flags['sgid'] = sgid
self.CreateSgRules()
logging.info('Created sg rules for icmp, tcp 22/443, tcp/udp 1024-50000')
return vpcid
def CreateKey(self):
"""Creates a ssh key in ibm cloud.
Flags:
name: name of the key to create.
pubkey: public key to use.
Returns:
The id of the created ssh key.
"""
kwargs = {}
kwargs['name'] = self.flags['name']
resp = self.gen_keymgr.Create(self.pubkey, 'rsa', **kwargs)
return resp.get('id') if resp else None
def CreateInstance(self):
"""Creates a vm.
Flags:
name: name of the vm.
imageid: id of the image to use.
profile: machine type for the vm.
vpcid: id of the vpc.
user_data: this is set if windows
optional: optional args added to kwargs.
Returns:
The json representation of the created vm.
"""
kwargs = {}
for arg in [
'key',
'zone',
'subnet',
'networks',
'encryption_key',
'iops',
'capacity',
'resource_group',
]:
if arg in self.flags:
kwargs[arg] = self.flags[arg]
return json.dumps(
self.gen_instmgr.Create(
self.name,
self.image_id,
self.profile,
self.vpc_id,
user_data=self.user_data,
**kwargs
)
)
def InstanceGetPrimaryVnic(self):
"""Finds the primary network vnic on vm.
Flags:
instanceid: id of the vm.
Returns:
The found vnic id.
"""
resp = self.gen_instmgr.Show(self.instance_id)
if (
'primary_network_interface' in resp
and 'href' in resp['primary_network_interface']
):
items = resp['primary_network_interface']['href'].split('/')
logging.info('vnic found: %s', items[len(items) - 1])
return items[len(items) - 1]
logging.error('no href in instance: %s', self.instance_id)
return None
def InstanceFipCreate(self):
"""Creates a FIP on vm.
Flags:
name: name of the fip.
Returns:
json represenation of the created fip.
"""
kwargs = {}
kwargs['name'] = self.name
return json.dumps(self.gen_fipmgr.Create(None, self.target, **kwargs))
def InstanceFipDelete(self):
"""Deletes FIP on vm matching the id."""
self.gen_fipmgr.Delete(self.fip_id)
def _GetStatus(self, resp):
"""Returns instance status in the given resp."""
status = None
if not resp:
logging.error('http response is None')
return status
if 'status' in resp:
status = resp['status']
else:
logging.error('No status found in resp: %s', resp)
return status
def InstanceStatus(self):
"""Returns instance status matching the instance id."""
resp = self.gen_instmgr.Show(self.instance_id)
status = self._GetStatus(resp)
logging.info('Instance status: %s, %s', self.instance_id, status)
if status != States.RUNNING:
endtime = time.time() + FLAGS.ibmcloud_timeout
status = self._GetStatus(self.gen_instmgr.Show(self.instance_id))
while status != States.RUNNING and time.time() < endtime:
time.sleep(FLAGS.ibmcloud_polling_delay)
status = self._GetStatus(self.gen_instmgr.Show(self.instance_id))
logging.info(
'Checking instance status: %s, %s', self.instance_id, status
)
return status
def InstanceStart(self):
"""Starts the instance matching instance id and returns final status."""
resp = self.gen_instmgr.Start(self.flags['instanceid'])
status = self._GetStatus(resp)
logging.info(
'Instance start issued: %s, %s', self.flags['instanceid'], status
)
if status != States.RUNNING:
endtime = time.time() + FLAGS.ibmcloud_timeout
status = self._GetStatus(self.gen_instmgr.Show(self.flags['instanceid']))
while status != States.RUNNING and time.time() < endtime:
time.sleep(FLAGS.ibmcloud_polling_delay)
status = self._GetStatus(
self.gen_instmgr.Show(self.flags['instanceid'])
)
logging.info(
'Checking instance start status: %s, %s',
self.flags['instanceid'],
status,
)
return status
def InstanceStop(self):
"""Stops instance matching the instance id and returns final status."""
resp = self.gen_instmgr.Stop(self.instance_id, force=False)
logging.info('Instance stop issued: %s', self.instance_id)
# force stop again
resp = self.gen_instmgr.Stop(self.instance_id, force=True)
status = self._GetStatus(resp)
logging.info('Instance stop force issued: %s, %s', self.instance_id, status)
if status != States.STOPPED:
endtime = time.time() + FLAGS.ibmcloud_timeout
status = self._GetStatus(self.gen_instmgr.Show(self.instance_id))
while status != States.STOPPED and time.time() < endtime:
time.sleep(FLAGS.ibmcloud_polling_delay * 2)
status = self._GetStatus(self.gen_instmgr.Show(self.instance_id))
logging.info(
'Checking instance stop status: %s, %s', self.instance_id, status
)
return status
def InstanceDelete(self):
"""Deletes instance matching the instance id."""
self.gen_instmgr.Delete(self.instance_id)
def InstanceList(self):
"""Lists instances and returns json objects of all found instances."""
return self.gen_instmgr.List()
def InstanceShow(self):
"""Returns the json respresentation of the matching instance id."""
return self.gen_instmgr.Show(self.instance_id)
def InstanceInitializationShow(self):
"""Returns the json format of the config used to initialize instance id."""
return self.gen_instmgr.ShowInitialization(self.instance_id)
def InstanceVnicCreate(self):
"""Creates a vnic on the instance.
Flags:
instanceid: id of the vm.
name: name of the vnic.
subnet: subnet id.
Returns:
id of the created vnic and assigned private ip address.
"""
ip_addr = None
resp = self.gen_instmgr.CreateVnic(self.instance_id, self.name, self.subnet)
status = self._GetStatus(resp)
vnicid = resp['id']
port_speed = resp['port_speed']
logging.info(
'Vnic created, vnicid: %s, port speed: %s, vnic status: %s',
vnicid,
port_speed,
status,
)
if status != States.AVAILABLE:
endtime = time.time() + FLAGS.ibmcloud_timeout
resp = self.gen_instmgr.ShowVnic(self.instance_id, vnicid)
status = self._GetStatus(resp)
while status != States.AVAILABLE and time.time() < endtime:
time.sleep(FLAGS.ibmcloud_polling_delay)
resp = self.gen_instmgr.ShowVnic(self.instance_id, vnicid)
status = self._GetStatus(resp)
logging.info('Checking instance vnic status: %s', status)
if status == States.AVAILABLE:
ip_addr = resp.get('primary_ipv4_address')
logging.info('primary_ipv4_address: %s', ip_addr)
assert status == States.AVAILABLE
return vnicid, ip_addr
def InstanceVnicShow(self):
"""GETs instance vnic.
Flags:
instanceid: id of the vm.
vnicid: id of the vnic.
Returns:
json representation of the found vnic.
"""
return json.dumps(
self.gen_instmgr.ShowVnic(self.instance_id, self.vnic_id), indent=2
)
def InstanceVnicList(self):
"""Returns a list of all vnics on instance.
Flags:
instanceid: id of the vm.
Returns:
json objects of all vnics on the instance.
"""
return json.dumps(self.gen_instmgr.ListVnics(self.instance_id), indent=2)
def InstanceCreateVolume(self):
"""Attaches a volume to instance.
Flags:
instanceid: id of the vm.
name: name of the volume on the vm.
volume: id of the volume to attach to the vm.
True: always true, delete the volume when instance is deleted.
Returns:
json representation of the attached volume on the instance.
"""
return json.dumps(
self.gen_instmgr.CreateVolume(
self.instance_id, self.name, self.volume, True
),
indent=2,
)
def InstanceDeleteVolume(self):
"""Deletes the volume on the vm matching instanceid and volume."""
return json.dumps(
self.gen_instmgr.DeleteVolume(self.instance_id, self.volume), indent=2
)
def InstanceShowVolume(self):
"""Returns json representation of the volume matching instanceid."""
for item in self.gen_instmgr.ListVolumes(self.instance_id)[
'volume_attachments'
]:
if item['volume'].get('id') == self.volume:
return json.dumps(
self.gen_instmgr.ShowVolume(self.instance_id, item.get('id')),
indent=2,
)
def ImageList(self):
"""Returns json objects of all available images."""
return self.gen_imgmgr.List(account=self.ibmcloud_account_id)
def GetImageId(self):
"""Returns image id that matches the image name."""
resp = self.gen_imgmgr.List()['images']
if resp is not None:
for image in resp:
if (
image['name'].startswith(self.image_name)
and image['status'] == States.AVAILABLE
):
return image['id']
return None
def ImageShow(self):
"""Returns json object of the image matching the id."""
return self.gen_imgmgr.Show(self.image_id)