modules/python/clients/kubernetes_client.py (200 lines of code) (raw):
import time
import os
import yaml
from kubernetes import client, config
from kubernetes.stream import stream
from utils.logger_config import get_logger, setup_logging
# https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/#taint-based-evictions
# https://kubernetes.io/docs/reference/labels-annotations-taints/
builtin_taints_keys = [
"node.kubernetes.io/not-ready",
"node.kubernetes.io/unreachable",
"node.kubernetes.io/pid-pressure",
"node.kubernetes.io/out-of-disk",
"node.kubernetes.io/memory-pressure",
"node.kubernetes.io/disk-pressure",
"node.kubernetes.io/network-unavailable",
"node.kubernetes.io/unschedulable",
"node.cloudprovider.kubernetes.io/uninitialized",
"node.cloudprovider.kubernetes.io/shutdown",
]
# Configure logging
setup_logging()
logger = get_logger(__name__)
class KubernetesClient:
def __init__(self, kubeconfig=None):
config.load_kube_config(kubeconfig)
self.api = client.CoreV1Api()
self.app = client.AppsV1Api()
self.storage = client.StorageV1Api()
def get_app_client(self):
return self.app
def describe_node(self, node_name):
return self.api.read_node(node_name)
def get_nodes(self, label_selector=None, field_selector=None):
return self.api.list_node(label_selector=label_selector, field_selector=field_selector).items
def get_ready_nodes(self, label_selector=None, field_selector=None):
"""
Get a list of nodes that are ready to be scheduled. Should apply all those conditions:
- 'Ready' condition status is True
- 'NetworkUnavailable' condition status is not present or is False
- Spec unschedulable is False
- Spec taints do not have any of the builtin taints keys with effect 'NoSchedule' or 'NoExecute'
"""
nodes = self.get_nodes(label_selector=label_selector, field_selector=field_selector)
return [
node for node in nodes
if self._is_node_schedulable(node) and self._is_node_untainted(node)
]
def _is_node_schedulable(self, node):
status_conditions = {cond.type: cond.status for cond in node.status.conditions}
is_schedulable = (
status_conditions.get("Ready") == "True"
and status_conditions.get("NetworkUnavailable") != "True"
and node.spec.unschedulable is not True
)
if not is_schedulable:
logger.info(f"Node NOT Ready: '{node.metadata.name}' is not schedulable. status_conditions: {status_conditions}. unschedulable: {node.spec.unschedulable}")
return is_schedulable
def _is_node_untainted(self, node):
if not node.spec.taints:
return True
for taint in node.spec.taints:
if taint.key in builtin_taints_keys and taint.effect in ("NoSchedule", "NoExecute"):
logger.info(f"Node NOT Ready: '{node.metadata.name}' has taint '{taint.key}' with effect '{taint.effect}'")
return False
return True
def _is_ready_pod(self, pod):
if pod.status.phase != "Running":
return False
for condition in pod.status.conditions:
if condition.type == "Ready" and condition.status == "True":
return True
return False
def get_pods_by_namespace(self, namespace, label_selector=None, field_selector=None):
return self.api.list_namespaced_pod(namespace=namespace, label_selector=label_selector, field_selector=field_selector).items
def get_ready_pods_by_namespace(self, namespace=None, label_selector=None, field_selector=None):
pods = self.get_pods_by_namespace(namespace=namespace, label_selector=label_selector, field_selector=field_selector)
return [pod for pod in pods if pod.status.phase == "Running" and self._is_ready_pod(pod)]
def get_persistent_volume_claims_by_namespace(self, namespace):
return self.api.list_namespaced_persistent_volume_claim(namespace=namespace).items
def get_bound_persistent_volume_claims_by_namespace(self, namespace):
claims = self.get_persistent_volume_claims_by_namespace(namespace=namespace)
return [claim for claim in claims if claim.status.phase == "Bound"]
def delete_persistent_volume_claim_by_namespace(self, namespace):
pvcs = self.get_persistent_volume_claims_by_namespace(namespace=namespace)
for pvc in pvcs:
try:
self.api.delete_namespaced_persistent_volume_claim(pvc.metadata.name, namespace, body=client.V1DeleteOptions())
except client.rest.ApiException as e:
logger.error(f"Error deleting PVC '{pvc.metadata.name}': {e}")
def get_volume_attachments(self):
return self.storage.list_volume_attachment().items
def get_attached_volume_attachments(self):
volume_attachments = self.get_volume_attachments()
return [attachment for attachment in volume_attachments if attachment.status.attached]
def create_namespace(self, namespace):
"""
Returns the namespace object if it exists, otherwise creates it.
"""
try:
namespace = self.api.read_namespace(namespace)
logger.info(f"Namespace '{namespace.metadata.name}' already exists.")
return namespace
except client.rest.ApiException as e:
if e.status == 404:
body = client.V1Namespace(metadata=client.V1ObjectMeta(name=namespace))
return self.api.create_namespace(body)
raise e
def delete_namespace(self, namespace):
return self.api.delete_namespace(namespace)
# TODO: Explore https://kustomize.io for templating
def create_template(self, template_path: str, replacements: dict) -> str:
"""
Generate a Kubernetes resource template by replacing placeholders with actual values.
:param template_path: Path to the YAML template file.
:param replacements: Dictionary of placeholders and their corresponding values.
:return: Processed YAML content as a string.
"""
if not os.path.isfile(template_path):
raise FileNotFoundError(f"Template file not found: {template_path}")
try:
with open(template_path, "r", encoding="utf-8") as file:
template = file.read()
for key, value in replacements.items():
template = template.replace(f"{{{{{key}}}}}", str(value))
logger.info(f"Final template: \n{template}")
return template
except Exception as e:
raise Exception(f"Error processing template file {template_path}: {str(e)}") from e
def create_deployment(self, template, namespace="default"):
"""
Create a Deployment in the specified namespace using the provided YAML template.
:param template: YAML template for the Deployment.
:param namespace: Namespace where the Deployment will be created.
:return: Name of the created Deployment.
"""
try:
deployment_obj = yaml.safe_load(template)
response = self.app.create_namespaced_deployment(
body=deployment_obj,
namespace=namespace
)
return response.metadata.name
except yaml.YAMLError as e:
raise Exception(f"Error parsing deployment template: {str(e)}") from e
except Exception as e:
raise Exception(f"Error creating deployment {template}: {str(e)}") from e
def wait_for_nodes_ready(self, node_count, operation_timeout_in_minutes, label_selector=None):
"""
Waits for a specific number of nodes with a given label to be ready within a specified timeout.
Raises an exception if the expected number of nodes are not ready within the timeout.
:param node_label: The label to filter nodes.
:param node_count: The expected number of nodes to be ready.
:param operation_timeout_in_minutes: The timeout in minutes to wait for the nodes to be ready.
:return: None
"""
ready_nodes = []
ready_node_count = 0
timeout = time.time() + (operation_timeout_in_minutes * 60)
logger.info(f"Validating {node_count} nodes with label {label_selector} are ready.")
while time.time() < timeout:
ready_nodes = self.get_ready_nodes(label_selector=label_selector)
ready_node_count = len(ready_nodes)
logger.info(f"Currently {ready_node_count} nodes are ready.")
if ready_node_count == node_count:
return ready_nodes
logger.info(f"Waiting for {node_count} nodes to be ready.")
time.sleep(10)
if ready_node_count != node_count:
raise Exception(f"Only {ready_node_count} nodes are ready, expected {node_count} nodes!")
return ready_nodes
def wait_for_pods_ready(self, pod_count, operation_timeout_in_minutes, namespace="default", label_selector=None):
"""
Waits for a specific number of pods with a given label to be ready within a specified timeout.
Raises an exception if the expected number of pods are not ready within the timeout.
:param label_selector: The label to filter pods.
:param pod_count: The expected number of pods to be ready.
:param operation_timeout_in_minutes: The timeout in minutes to wait for the pods to be ready.
:param namespace: The namespace to filter pods.
:return: None
"""
pods = []
timeout = time.time() + (operation_timeout_in_minutes * 60)
logger.info(f"Validating {pod_count} pods with label {label_selector} are ready.")
while time.time() < timeout:
pods = self.get_ready_pods_by_namespace(namespace=namespace, label_selector=label_selector)
if len(pods) == pod_count:
return pods
logger.info(f"Waiting for {pod_count} pods to be ready.")
time.sleep(10)
if len(pods) != pod_count:
raise Exception(f"Only {len(pods)} pods are ready, expected {pod_count} pods!")
return pods
def get_pod_logs(self, pod_name, namespace="default", container=None, tail_lines=None):
"""
Get logs from a specific pod in the given namespace.
:param pod_name: Name of the pod
:param namespace: Namespace where the pod is located (default: "default")
:param container: Container name if pod has multiple containers (optional)
:param tail_lines: Number of lines to return from the end of the logs (optional)
:return: String containing the pod logs
"""
try:
return self.api.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
container=container,
tail_lines=tail_lines
)
except client.rest.ApiException as e:
raise Exception(f"Error getting logs for pod '{pod_name}' in namespace '{namespace}': {str(e)}") from e
def run_pod_exec_command(self, pod_name: str, container_name: str, command: str, dest_path: str = None, namespace: str = "default") -> str:
"""
Executes a command in a specified container within a Kubernetes pod and optionally saves the output to a file.
Args:
pod_name (str): The name of the pod where the command will be executed.
container_name (str): The name of the container within the pod where the command will be executed.
command (str): The command to be executed in the container.
dest_path (str, optional): The file path where the command output will be saved. Defaults to None.
namespace (str, optional): The Kubernetes namespace where the pod is located. Defaults to "default".
Returns:
str: The combined standard output of the executed command.
Raises:
Exception: If an error occurs while executing the command in the pod.
"""
commands = ['/bin/sh', '-c', command]
resp = stream(self.api.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
command=commands,
container=container_name,
stderr=True, stdin=False,
stdout=True, tty=False,
_preload_content=False)
res = []
file = open(dest_path, 'wb') if dest_path else None # pylint: disable=consider-using-with
try:
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
stdout = resp.read_stdout()
res.append(stdout)
logger.info(f"STDOUT: {stdout}")
if file:
file.write(stdout.encode('utf-8'))
logger.info(f"Saved response to file: {dest_path}")
if resp.peek_stderr():
error_msg = resp.read_stderr()
raise Exception(f"Error occurred while executing command in pod: {error_msg}")
finally:
resp.close()
if file is not None:
file.close()
return ''.join(res)
def get_daemonsets_pods_allocated_resources(self, namespace, node_name):
pods = self.get_pods_by_namespace(namespace=namespace, field_selector=f"spec.nodeName={node_name}")
cpu_request = 0
memory_request = 0
for pod in pods:
for container in pod.spec.containers:
logger.info(f"Pod {pod.metadata.name} has container {container.name} with resources {container.resources.requests}")
cpu_request += int(container.resources.requests.get("cpu", "0m").replace("m", ""))
memory_request += int(container.resources.requests.get("memory", "0Mi").replace("Mi", ""))
return cpu_request, memory_request * 1024 # Convert to KiB