python-batch/gke_batch/batch.py (209 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tools to run Google Cloud Batch API
"""
__author__ = "J Ross Thomson drj@"
__version__ = "0.1.0"
import os
import uuid
import yaml
import sys
import kubernetes.client
from absl import app
from absl import flags
from dynaconf import settings
from google.api_core.operation import Operation
from kubernetes.client import Configuration
from kubernetes.client.rest import ApiException
from pprint import pprint
from tabulate import tabulate
from termcolor import colored
from typing import Iterable
from yaml.loader import SafeLoader
"""
We define multiple command line flags:
"""
FLAGS = flags.FLAGS
flags.DEFINE_string("project_id", None, "Google Cloud Project ID, not name")
flags.DEFINE_boolean("create_job", False, "Creates job, otherwise just prints config.")
flags.DEFINE_boolean("list_jobs", False, "If true, list jobs for config.")
flags.DEFINE_string("delete_job", "", "Job name to delete.")
flags.DEFINE_boolean("debug", False, "If true, print debug info.")
flags.DEFINE_string("args_file", None, "TOML/YAML file with list of container commands")
def get_setting(setting, settings):
if FLAGS.debug:
print(f"Getting setting {setting}", file=sys.stderr)
if setting in settings:
return(settings[setting])
else:
print(f"Setting {setting} not found in settings file. Exiting.", file=sys.stderr)
exit()
class KubernetesBatchJobs:
def __init__(self) -> None:
self.job_prefix = get_setting("job_prefix",settings)
kubernetes.config.load_kube_config()
try:
self.kube_config = Configuration().get_default_copy()
except AttributeError:
self.kube_config = Configuration()
self.kube_config.assert_hostname = False
Configuration.set_default(self.kube_config)
self.batch_v1 = kubernetes.client.BatchV1Api(kubernetes.client.ApiClient(self.kube_config))
self.project_id = get_setting(setting="project_id",settings=settings)
if os.environ.get('GOOGLE_CLOUD_PROJECT'):
self.project_id = os.environ.get('GOOGLE_CLOUD_PROJECT')
if FLAGS.project_id:
self.project_id = FLAGS.project_id
def create_job_request(self):
container1 = self._create_container()
volumes1 = self._create_volumes()
mounts1 = self._create_volume_mounts()
container1.volume_mounts = mounts1
pod_template1 = self._create_pod_template(container1, volumes1)
# Create JobSpec
parallelism = get_setting("parallelism", settings)
suspend = get_setting("suspend", settings)
job_spec=kubernetes.client.V1JobSpec(
template=pod_template1,
suspend=suspend,
parallelism=parallelism
)
completion_mode = get_setting("completion_mode", settings)
if completion_mode == "Indexed":
completions = get_setting("completions", settings)
job_spec.completion_mode = completion_mode
job_spec.completions = completions
# Create Job ObjectMetadata
jobspec_metadata=kubernetes.client.V1ObjectMeta(
generate_name=self.job_prefix,
)
# Create a V1Job object
job_request = kubernetes.client.V1Job(
api_version="batch/v1",
kind="Job",
spec=job_spec,
metadata=jobspec_metadata,
)
return(job_request)
def _create_volume_mounts(self):
# Create Volume Mounts
# Get volume settings
mount_settings = get_setting("volume_mount", settings)
volume_mount1 = kubernetes.client.V1VolumeMount(
name="gcs-fuse-csi-ephemeral",
mount_path=mount_settings["mount_path"]
)
return([volume_mount1])
def _create_volumes(self):
# Create Volumes
volume_settings = get_setting("volume", settings)
volume_attributes1 = {"bucketName": volume_settings["bucketName"]}
volume_source1 = kubernetes.client.V1CSIVolumeSource(
driver=volume_settings["driver"],
volume_attributes=volume_attributes1
)
volume1 = kubernetes.client.V1Volume(name="gcs-fuse-csi-ephemeral", csi=volume_source1)
volumes = [volume1]
return(volumes)
def _create_container(self):
# Get container settings
container_settings = get_setting("container", settings)
image = get_setting("image_uri", container_settings)
# Get Job commands
command = get_setting("command", settings)
env1 = kubernetes.client.V1EnvVar(
name = "JOB_NAME",
value_from=kubernetes.client.V1EnvVarSource(
field_ref=kubernetes.client.V1ObjectFieldSelector(
field_path="metadata.name"
)
)
)
# Get Job Limits
limits = get_setting("limits", settings)
# Create container
container = kubernetes.client.V1Container(
name=self.job_prefix,
image=image,
command=command,
resources=kubernetes.client.V1ResourceRequirements(
limits=limits,
),
env = [env1],
)
return(container)
def _create_object_metadata(self):
pass
def _create_pod_template(self, container1, volumes):
# Create Pod Template and PodSpec
service_account_name = get_setting("service_account_name", settings)
node_selector = get_setting("node_selector", settings)
pod_annotations = get_setting("pod_annotations", settings)
podspec_metadata=kubernetes.client.V1ObjectMeta(
name=self.job_prefix,
annotations=pod_annotations
)
pod_spec=kubernetes.client.V1PodSpec(
containers=[container1],
restart_policy="Never",
node_selector=node_selector,
volumes=volumes,
service_account_name=service_account_name,
)
# Create PodTemplateSpec
pod_template=kubernetes.client.V1PodTemplateSpec(
metadata=podspec_metadata,
spec=pod_spec
)
return(pod_template)
#
# Here is where the jobs is actually created
#
def create_job(self, create_request):
# Create the job in the cluster
namespace = get_setting("namespace", settings)
api_response = self.batch_v1.create_namespaced_job(
namespace=namespace,
body=create_request,
)
#print(api_response.metadata.labels["job-name"],file=sys.stderr)
print(f"Job {api_response.metadata.labels['job-name']} created. Timestamp='{api_response.metadata.creation_timestamp}'" )
def delete_job(self, job_name):
namespace = get_setting("namespace", settings)
try:
api_response = self.batch_v1.delete_namespaced_job(job_name, namespace)
print(api_response,file=sys.stderr)
except ApiException as e:
print("Exception when calling BatchV1Api->list_namespaced_job: %s\n" % e)
def list_jobs(self):
namespace = get_setting("namespace", settings)
try:
api_response = self.core_v1.list_namespaced_job(namespace)
for item in api_response.items:
succeeded = item.status.succeeded
failed = item.status.failed
completed = item.status.completed_indexes
print(f"Name: {item.metadata.labels['app']}\tSucceeded: {succeeded}\tFailed: {failed}\tCompleted Index: {completed}", file=sys.stderr)
# pprint(item)
except ApiException as e:
print("Exception when calling BatchV1Api->list_namespaced_job: %s\n" % e)
headers = ["Job Name", "Succeeded", "Failed", "Completed Index"]
rows = []
for item in api_response.items:
succeeded = item.status.succeeded
failed = item.status.failed
completed = item.status.completed_indexes
rows.append(
[
colored(item.metadata.labels['job-name'], 'blue'),
colored(succeeded, 'green'),
colored(failed, 'red'),
colored(completed, 'yellow'),
]
)
print(tabulate(rows, headers, tablefmt="grid"))
def main(argv):
"""
"""
jobs = None
if get_setting("platform", settings) == "gke-autopilot":
jobs = KubernetesBatchJobs()
else:
# Create Cloud Batch jobs object
print("platform must be set in the settings file. Exiting.")
# Delete job. JobID must be passed.
if(FLAGS.delete_job):
print("Deleting job", file=sys.stderr)
jobs.delete_job(FLAGS.delete_job)
exit()
# Prints list of jobs, in queue, running or complteted.
if(FLAGS.list_jobs):
print("Listing jobs", file=sys.stderr)
jobs.list_jobs()
exit()
# Prints config file info
if FLAGS.debug:
pass
if FLAGS.args_file:
settings.load_file(path=FLAGS.args_file) # list or `;/,` separated allowed
for arg in settings.ARGS:
print(f"This is the arg: {arg}")
settings.command = arg
jobs = KubernetesBatchJobs()
print(jobs.create_job(jobs.create_job_request()), file=sys.stderr)
exit()
if FLAGS.create_job:
# Create the job
print(jobs.create_job(jobs.create_job_request()), file=sys.stderr)
else:
print(jobs.create_job_request(), file=sys.stderr)
if __name__ == "__main__":
""" This is executed when run from the command line """
app.run(main)