in providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py [0:0]
def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
"""
Return V1Pod object based on pod template file, full pod spec, and other operator parameters.
The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod
template file.
"""
self.log.debug("Creating pod for KubernetesPodOperator task %s", self.task_id)
self.env_vars = convert_env_vars_or_raise_error(self.env_vars) if self.env_vars else []
if self.pod_runtime_info_envs:
self.env_vars.extend(self.pod_runtime_info_envs)
if self.pod_template_file:
self.log.debug("Pod template file found, will parse for base pod")
pod_template = pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file)
if self.full_pod_spec:
pod_template = PodGenerator.reconcile_pods(pod_template, self.full_pod_spec)
elif self.pod_template_dict:
self.log.debug("Pod template dict found, will parse for base pod")
pod_template = pod_generator.PodGenerator.deserialize_model_dict(self.pod_template_dict)
if self.full_pod_spec:
pod_template = PodGenerator.reconcile_pods(pod_template, self.full_pod_spec)
elif self.full_pod_spec:
pod_template = self.full_pod_spec
else:
pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta())
pod = k8s.V1Pod(
api_version="v1",
kind="Pod",
metadata=k8s.V1ObjectMeta(
namespace=self.namespace,
labels=self.labels,
name=self.name,
annotations=self.annotations,
),
spec=k8s.V1PodSpec(
node_selector=self.node_selector,
affinity=self.affinity,
tolerations=self.tolerations,
init_containers=self.init_containers,
host_aliases=self.host_aliases,
containers=[
k8s.V1Container(
image=self.image,
name=self.base_container_name,
command=self.cmds,
ports=self.ports,
image_pull_policy=self.image_pull_policy,
resources=self.container_resources,
volume_mounts=self.volume_mounts,
args=self.arguments,
env=self.env_vars,
env_from=self.env_from,
security_context=self.container_security_context,
termination_message_policy=self.termination_message_policy,
)
],
image_pull_secrets=self.image_pull_secrets,
service_account_name=self.service_account_name,
host_network=self.hostnetwork,
hostname=self.hostname,
subdomain=self.subdomain,
security_context=self.security_context,
dns_policy=self.dnspolicy,
dns_config=self.dns_config,
scheduler_name=self.schedulername,
restart_policy="Never",
priority_class_name=self.priority_class_name,
volumes=self.volumes,
active_deadline_seconds=self.active_deadline_seconds,
),
)
pod = PodGenerator.reconcile_pods(pod_template, pod)
if not pod.metadata.name:
pod.metadata.name = create_unique_id(
task_id=self.task_id, unique=self.random_name_suffix, max_length=POD_NAME_MAX_LENGTH
)
elif self.random_name_suffix:
# user has supplied pod name, we're just adding suffix
pod.metadata.name = add_unique_suffix(name=pod.metadata.name)
if not pod.metadata.namespace:
hook_namespace = self.hook.get_namespace()
pod_namespace = self.namespace or hook_namespace or self._incluster_namespace or "default"
pod.metadata.namespace = pod_namespace
for secret in self.secrets:
self.log.debug("Adding secret to task %s", self.task_id)
pod = secret.attach_to_pod(pod)
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(
pod,
sidecar_container_image=self.hook.get_xcom_sidecar_container_image(),
sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(),
)
labels = self._get_ti_pod_labels(context)
self.log.info("Building pod %s with labels: %s", pod.metadata.name, labels)
# Merge Pod Identifying labels with labels passed to operator
pod.metadata.labels.update(labels)
# Add Airflow Version to the label
# And a label to identify that pod is launched by KubernetesPodOperator
pod.metadata.labels.update(
{
"airflow_version": airflow_version.replace("+", "-"),
"airflow_kpo_in_cluster": str(self.hook.is_in_cluster),
}
)
pod_mutation_hook(pod)
return pod