in code/KubernetesWrapper/KubernetesWrapper/Program.cs [10:175]
private static async Task Main(string[] args)
{
var (jobName, containerName, imageName, namespaceName, ttlSecondsAfterFinished, command, arguments) = Util.ProcessArgs(args);
Console.WriteLine("Information about the job:");
Console.WriteLine($"Job Name: {jobName}");
Console.WriteLine($"Container Name: {containerName}");
Console.WriteLine($"Image Name: {imageName}");
Console.WriteLine($"Namespace Name: {namespaceName}");
Console.WriteLine("---------");
Console.WriteLine("Command: ");
Console.WriteLine($"length: {command.Count}");
foreach (var item in command)
{
Console.WriteLine(item);
}
Console.WriteLine("---------");
Console.WriteLine("Arguments: ");
Console.WriteLine($"length: {arguments.Count}");
foreach (var item in arguments)
{
Console.WriteLine(item);
}
Console.WriteLine("---------");
string? homeDirectory = Environment.GetEnvironmentVariable("HOME");
string kubeConfigPath = $"{homeDirectory}/.kube/config";
Console.WriteLine($"Home directory: {homeDirectory}");
Console.WriteLine($"Kube config path: {kubeConfigPath}");
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(kubeConfigPath);
IKubernetes client = new Kubernetes(config);
var nodes = Environment.GetEnvironmentVariable("CCP_NODES");
var nodeList = Util.GetNodeList(nodes);
if (nodeList.Count == 0)
{
Console.WriteLine("Node list is empty. Exiting...");
return;
}
Console.WriteLine("node list: ");
foreach (var item in nodeList)
{
Console.WriteLine(item);
}
Console.WriteLine("---------");
CancellationTokenSource source = new();
CancellationToken token = source.Token;
Console.CancelKeyPress += async (sender, e) =>
{
e.Cancel = true; // Prevent the process from terminating immediately
Console.WriteLine("interrupt!!");
try
{
var job = await client.BatchV1.ReadNamespacedJobAsync(jobName, namespaceName).ConfigureAwait(false);
Console.WriteLine($"Job '{jobName}' found.");
// Job exists, so delete it
await client.BatchV1.DeleteNamespacedJobAsync(name: jobName, namespaceParameter: namespaceName).ConfigureAwait(false);
Console.WriteLine($"Job '{jobName}' deleted successfully.");
}
catch (k8s.Autorest.HttpOperationException ex) when (ex.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
{
Console.WriteLine($"Job '{jobName}' does not exist.");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
var podList = await client.CoreV1.ListNamespacedPodAsync(namespaceName, labelSelector: $"app={containerName}").ConfigureAwait(false);
Console.WriteLine($"Pod list count: {podList.Items.Count}");
foreach (var pod in podList.Items)
{
try
{
Console.WriteLine($"Pod: {pod.Metadata.Name}");
await client.CoreV1.DeleteNamespacedPodAsync(pod.Metadata.Name, namespaceName, new V1DeleteOptions()).ConfigureAwait(false);
Console.WriteLine($"Deleted pod: {pod.Metadata.Name}");
}
catch (k8s.Autorest.HttpOperationException ex) when (ex.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
{
Console.WriteLine($"Pod '{pod.Metadata.Name}' does not exist.");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
source.Cancel();
};
try
{
var existingJob = await client.BatchV1.ReadNamespacedJobAsync(jobName, namespaceName).ConfigureAwait(false);
if (existingJob != null)
{
Console.WriteLine($"Job '{jobName}' already exists in namespace '{namespaceName}'.");
return;
}
}
catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.NotFound)
{
Console.WriteLine($"Job '{jobName}' does not exist in namespace '{namespaceName}'. Proceeding to create job.");
}
try
{
var job = await CreateJob(client, jobName, containerName, imageName, namespaceName,
ttlSecondsAfterFinished, command, arguments, nodeList, token);
var jobWatcher = client.BatchV1.ListNamespacedJobWithHttpMessagesAsync(
namespaceName,
labelSelector: $"app={containerName}",
watch: true,
cancellationToken: token);
await foreach (var (type, item) in jobWatcher.WatchAsync<V1Job, V1JobList>(
onError: e =>
{
Console.WriteLine($"Watcher error: {e.Message}");
},
cancellationToken: token))
{
if (item.Status.Succeeded == nodeList.Count)
{
var pods = await GetPodsForJobAsync(client, jobName, namespaceName).ConfigureAwait(false);
if (pods.Count == 0)
{
Console.WriteLine($"No pods found for job '{jobName}' in namespace '{namespaceName}'.");
return;
}
// Retrieve logs from each Pod
foreach (var pod in pods)
{
Console.WriteLine($"Found Pod: {pod.Metadata.Name}. Retrieving logs...");
string logs = await GetPodLogsAsync(client, pod.Metadata.Name, namespaceName).ConfigureAwait(false);
Console.WriteLine($"=== Logs from Pod: {pod.Metadata.Name} ===");
Console.WriteLine(logs);
}
Console.WriteLine($"All pods reach Success state. About to exit in {ttlSecondsAfterFinished} seconds.");
if (type == WatchEventType.Deleted)
{
Console.WriteLine("Job reaches Deleted state. Exit monitoring now.");
break;
}
}
}
}
catch (TaskCanceledException ex)
{
Console.WriteLine($"Stop watching. Task was canceled: {ex.Message}");
}
}