perfkitbenchmarker/resources/kubernetes/kubernetes_disk.py (254 lines of code) (raw):
# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains code related Kubernetes disk."""
import json
import logging
import re
from absl import flags
from perfkitbenchmarker import disk
from perfkitbenchmarker import errors
from perfkitbenchmarker import flag_util
from perfkitbenchmarker import kubernetes_helper
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import resource
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.configs import option_decoders
FLAGS = flags.FLAGS
def CreateDisks(disk_specs, vm_name):
"""Creates instances of KubernetesDisk child classes."""
# Depending on scratch disk type.
scratch_disks = []
for disk_num, disk_spec in enumerate(disk_specs):
disk_class = GetKubernetesDiskClass(disk_spec.disk_type)
scratch_disk = disk_class(disk_num, disk_spec, vm_name)
scratch_disk.Create()
scratch_disks.append(scratch_disk)
return scratch_disks
class KubernetesDiskSpec(disk.BaseDiskSpec):
"""Kubernetes disk Spec class."""
CLOUD = provider_info.KUBERNETES
@classmethod
def _GetOptionDecoderConstructions(cls):
result = super()._GetOptionDecoderConstructions()
result.update({
'provisioner': (
option_decoders.StringDecoder,
{'default': None, 'none_ok': True},
),
'parameters': (
option_decoders.TypeVerifier,
{'default': {}, 'valid_types': (dict,)},
),
})
return result
@classmethod
def _ApplyFlags(cls, config_values, flag_values):
"""Overrides config values with flag values.
Can be overridden by derived classes to add support for specific flags.
Args:
config_values: dict mapping config option names to provided values. Is
modified by this function.
flag_values: flags.FlagValues. Runtime flags that may override the
provided config values.
Returns:
dict mapping config option names to values derived from the config
values or flag values.
"""
super()._ApplyFlags(config_values, flag_values)
if flag_values['k8s_volume_provisioner'].present:
config_values['provisioner'] = flag_values.k8s_volume_provisioner
if flag_values['k8s_volume_parameters'].present:
config_values['parameters'] = config_values.get('parameters', {})
config_values['parameters'].update(
flag_util.ParseKeyValuePairs(flag_values.k8s_volume_parameters)
)
def GetKubernetesDiskClass(volume_type):
return resource.GetResourceClass(KubernetesDisk, K8S_VOLUME_TYPE=volume_type)
class KubernetesDisk(disk.BaseDisk):
"""Base class for Kubernetes Disks."""
RESOURCE_TYPE = 'KubernetesDisk'
REQUIRED_ATTRS = ['K8S_VOLUME_TYPE']
def __init__(self, disk_num, disk_spec, name):
super().__init__(disk_spec)
self.name = '%s-%s' % (name, disk_num)
def _Create(self):
return
def _Delete(self):
return
def Attach(self, vm):
return
def Detach(self):
return
def SetDevicePath(self, vm):
return
def AttachVolumeMountInfo(self, volume_mounts):
volume_mount = {'mountPath': self.mount_point, 'name': self.name}
volume_mounts.append(volume_mount)
class EmptyDirDisk(KubernetesDisk):
"""Implementation of Kubernetes 'emptyDir' type of volume."""
K8S_VOLUME_TYPE = 'emptyDir'
def GetDevicePath(self):
"""Get device path."""
# In case of LocalDisk, host's disk is mounted (empty directory from the
# host is mounted to the docker instance) and we intentionally
# prevent from formatting the device.
raise errors.Error('GetDevicePath not supported for Kubernetes local disk')
def AttachVolumeInfo(self, volumes):
local_volume = {'name': self.name, 'emptyDir': {}}
volumes.append(local_volume)
class CephDisk(KubernetesDisk):
"""Implementation of Kubernetes 'rbd' type of volume."""
K8S_VOLUME_TYPE = 'rbd'
def __init__(self, disk_num, disk_spec, name):
super().__init__(disk_num, disk_spec, name)
self.ceph_secret = FLAGS.ceph_secret
def _Create(self):
"""Creates Rados Block Device volumes and installs filesystem on them."""
cmd = [
'rbd',
'-p',
FLAGS.rbd_pool,
'create',
self.name,
'--size',
str(1024 * self.disk_size),
]
_, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False)
if retcode != 0:
raise Exception('Creating RBD image failed: %s' % stderr)
cmd = ['rbd', 'map', FLAGS.rbd_pool + '/' + self.name]
stdout, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False)
if retcode != 0:
raise Exception('Mapping RBD image failed: %s' % stderr)
rbd_device = stdout.rstrip()
if '/dev/rbd' not in rbd_device:
# Sometimes 'rbd map' command doesn't return any output.
# Trying to find device location another way.
cmd = ['rbd', 'showmapped']
stdout, _, _ = vm_util.IssueCommand(cmd, raise_on_failure=False)
for image_device in stdout.split('\n'):
if self.name in image_device:
pattern = re.compile('/dev/rbd.*')
output = pattern.findall(image_device)
rbd_device = output[0].rstrip()
break
cmd = ['/sbin/mkfs.ext4', rbd_device]
stdout, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False)
if retcode != 0:
raise Exception('Formatting partition failed: %s' % stderr)
cmd = ['rbd', 'unmap', rbd_device]
stdout, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False)
if retcode != 0:
raise Exception('Unmapping block device failed: %s' % stderr)
def _Delete(self):
"""Deletes RBD image."""
cmd = ['rbd', 'rm', FLAGS.rbd_pool + '/' + self.name]
stdout, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False)
if retcode != 0:
msg = 'Removing RBD image failed. Reattempting.'
logging.warning(msg)
raise Exception(msg)
def AttachVolumeInfo(self, volumes):
ceph_volume = {
'name': self.name,
'rbd': {
'monitors': FLAGS.ceph_monitors,
'pool': FLAGS.rbd_pool,
'image': self.name,
'keyring': FLAGS.ceph_keyring,
'user': FLAGS.rbd_user,
'fsType': 'ext4',
'readOnly': False,
},
}
if FLAGS.ceph_secret:
ceph_volume['rbd']['secretRef'] = {'name': FLAGS.ceph_secret}
volumes.append(ceph_volume)
def SetDevicePath(self, vm):
"""Retrieves the path to scratch disk device."""
cmd = "mount | grep %s | tr -s ' ' | cut -f 1 -d ' '" % self.mount_point
device, _ = vm.RemoteCommand(cmd)
self.device_path = device.rstrip()
def GetDevicePath(self):
return self.device_path
class PersistentVolumeClaim(resource.BaseResource):
"""Object representing a K8s PVC."""
@vm_util.Retry(poll_interval=10, max_retries=100, log_errors=False)
def _WaitForPVCBoundCompletion(self):
"""Need to wait for the PVC to get up."""
# PVC may take some time to be ready(Bound).
exists_cmd = [
FLAGS.kubectl,
'--kubeconfig=%s' % FLAGS.kubeconfig,
'get',
'pvc',
'-o=json',
self.name,
]
logging.info('Waiting for PVC %s', self.name)
pvc_info, _, _ = vm_util.IssueCommand(exists_cmd, raise_on_failure=False)
if pvc_info:
pvc_info = json.loads(pvc_info)
pvc = pvc_info['status']['phase']
if pvc == 'Bound':
logging.info('PVC is ready.')
return
raise Exception(
'PVC %s is not ready. Retrying to check status.' % self.name
)
def __init__(self, name, storage_class, size):
super().__init__()
self.name = name
self.storage_class = storage_class
self.size = size
def _Create(self):
"""Creates the PVC."""
body = self._BuildBody()
kubernetes_helper.CreateResource(body)
self._WaitForPVCBoundCompletion()
def _Delete(self):
"""Deletes the PVC."""
body = self._BuildBody()
kubernetes_helper.DeleteResource(body)
def _BuildBody(self):
"""Builds JSON representing the PVC."""
body = {
'kind': 'PersistentVolumeClaim',
'apiVersion': 'v1',
'metadata': {'name': self.name},
'spec': {
'accessModes': ['ReadWriteOnce'],
'resources': {'requests': {'storage': '%sGi' % self.size}},
'storageClassName': self.storage_class,
},
}
return json.dumps(body)
class StorageClass(resource.BaseResource):
"""Object representing a K8s StorageClass (with dynamic provisioning)."""
def __init__(self, name, provisioner, parameters):
super().__init__()
self.name = name
self.provisioner = provisioner
self.parameters = parameters
def _CheckStorageClassExists(self):
"""Prevent duplicated StorageClass creation."""
# If the StorageClass with the same name and parameters exists
# :return: True or False
exists_cmd = [
FLAGS.kubectl,
'--kubeconfig=%s' % FLAGS.kubeconfig,
'get',
'sc',
'-o=json',
self.name,
]
sc_info, _, _ = vm_util.IssueCommand(exists_cmd, raise_on_failure=False)
if sc_info:
sc_info = json.loads(sc_info)
sc_name = sc_info['metadata']['name']
if sc_name == self.name:
logging.info('StorageClass already exists.')
return True
else:
logging.info('About to create new StorageClass: %s', self.name)
return False
def _Create(self):
"""Creates the StorageClass."""
body = self._BuildBody()
if not self._CheckStorageClassExists():
kubernetes_helper.CreateResource(body)
def _Delete(self):
"""Deletes the StorageClass."""
body = self._BuildBody()
kubernetes_helper.DeleteResource(body)
def _BuildBody(self):
"""Builds JSON representing the StorageClass."""
body = {
'kind': 'StorageClass',
'apiVersion': 'storage.k8s.io/v1',
'metadata': {'name': self.name},
'provisioner': self.provisioner,
'parameters': self.parameters,
}
return json.dumps(body)
class PvcVolume(KubernetesDisk):
"""Volume representing a persistent volume claim."""
K8S_VOLUME_TYPE = 'persistentVolumeClaim'
PROVISIONER = None
def __init__(self, disk_num, spec, name):
super().__init__(disk_num, spec, name)
self.storage_class = StorageClass(
name, self.PROVISIONER or spec.provisioner, spec.parameters
)
self.pvc = PersistentVolumeClaim(
self.name, self.storage_class.name, spec.disk_size
)
def _Create(self):
self.storage_class.Create()
self.pvc.Create()
def _Delete(self):
self.pvc.Delete()
self.storage_class.Delete()
def AttachVolumeInfo(self, volumes):
pvc_volume = {
'name': self.name,
'persistentVolumeClaim': {'claimName': self.name},
}
volumes.append(pvc_volume)