liminal/kubernetes/volume_util.py (116 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import logging import os import sys from time import sleep from kubernetes import client, config from kubernetes.client import V1PersistentVolume, V1PersistentVolumeClaim # noinspection PyBroadException try: config.load_kube_config() except Exception: msg = "Kubernetes is not running\n" sys.stdout.write(f"INFO: {msg}") _LOG = logging.getLogger('volume_util') _LOCAL_VOLUMES = set() _kubernetes = client.CoreV1Api() def get_volume_configs(liminal_config, base_dir): volumes_config = liminal_config.get('volumes', []) for volume_config in volumes_config: if 'local' in volume_config: path = volume_config['local']['path'] if path.startswith(".."): path = os.path.join(base_dir, path) if path.startswith("."): path = os.path.join(base_dir, path[1:]) volume_config['local']['path'] = path return volumes_config def create_local_volumes(liminal_config, base_dir): volumes_config = get_volume_configs(liminal_config, base_dir) for volume_config in volumes_config: logging.info(f'Creating local kubernetes volume if needed: {volume_config}') create_local_volume(volume_config) def create_local_volume(conf, namespace='default') -> None: name = conf['volume'] _LOG.info(f'Requested volume {name}') if name not in _LOCAL_VOLUMES: matching_volumes = _kubernetes.list_persistent_volume(field_selector=f'metadata.name={name}').to_dict()[ 'items' ] while len(matching_volumes) == 0: _create_local_volume(conf, name) sleep(5) matching_volumes = _kubernetes.list_persistent_volume(field_selector=f'metadata.name={name}').to_dict()[ 'items' ] pvc_name = conf.get('claim_name', f'{name}-pvc') matching_claims = _kubernetes.list_persistent_volume_claim_for_all_namespaces( field_selector=f'metadata.name={pvc_name}' ).to_dict()['items'] while len(matching_claims) == 0: _create_persistent_volume_claim(pvc_name, name, namespace) sleep(5) matching_claims = _kubernetes.list_persistent_volume_claim_for_all_namespaces( field_selector=f'metadata.name={pvc_name}' ).to_dict()['items'] _LOCAL_VOLUMES.add(name) def delete_local_volumes(liminal_config, base_dir): volumes_config = get_volume_configs(liminal_config, base_dir) for volume_config in volumes_config: logging.info(f'Delete local kubernetes volume if needed: {volume_config}') delete_local_volume(volume_config['volume']) def delete_local_volume(name, namespace='default'): pvc_name = f'{name}-pvc' matching_claims = _list_persistent_volume_claims(pvc_name) if len(matching_claims) > 0: _LOG.info(f'Deleting persistent volume claim {pvc_name}') _kubernetes.delete_namespaced_persistent_volume_claim(pvc_name, namespace) while len(matching_claims) > 0: matching_claims = _list_persistent_volume_claims(pvc_name) matching_volumes = _list_persistent_volumes(name) if len(matching_volumes) > 0: _LOG.info(f'Deleting persistent volume {name}') _kubernetes.delete_persistent_volume(name) while len(matching_volumes) > 0: matching_volumes = _list_persistent_volumes(name) if name in _LOCAL_VOLUMES: _LOCAL_VOLUMES.remove(name) def _list_persistent_volume_claims(name): return _kubernetes.list_persistent_volume_claim_for_all_namespaces( field_selector=f'metadata.name={name}' ).to_dict()['items'] def _list_persistent_volumes(name): return _kubernetes.list_persistent_volume(field_selector=f'metadata.name={name}').to_dict()['items'] def _create_persistent_volume_claim(pvc_name, volume_name, namespace): _LOG.info(f'Creating persistent volume claim {pvc_name} with volume {volume_name}') spec = { 'volumeName': volume_name, 'volumeMode': 'Filesystem', 'storageClassName': 'local-storage', 'accessModes': ['ReadWriteOnce'], 'resources': {'requests': {'storage': '100Gi'}}, } _kubernetes.create_namespaced_persistent_volume_claim( namespace, V1PersistentVolumeClaim( api_version='v1', kind='PersistentVolumeClaim', metadata={'name': pvc_name}, spec=spec ), ) def _create_local_volume(conf, name): _LOG.info(f'Creating persistent volume {name} with spec {conf}') spec = { 'capacity': {'storage': '100Gi'}, 'volumeMode': 'Filesystem', 'accessModes': ['ReadWriteOnce'], 'persistentVolumeReclaimPolicy': 'Retain', 'storageClassName': 'local-storage', 'nodeAffinity': { 'required': { 'nodeSelectorTerms': [ {'matchExpressions': [{'key': 'kubernetes.io/hostname', 'operator': 'NotIn', 'values': ['']}]} ] } }, } spec.update(conf) _kubernetes.create_persistent_volume( V1PersistentVolume(api_version='v1', kind='PersistentVolume', metadata={'name': name}, spec=spec) )