in liminal/runners/airflow/executors/kubernetes.py [0:0]
def __kubernetes_kwargs(self, task: ContainerTask):
config = copy.deepcopy(self.executor_config)
kubernetes_kwargs = {
'task_id': task.task_id,
'image': task.image,
'arguments': task.arguments,
'namespace': os.environ.get('AIRFLOW__KUBERNETES__NAMESPACE', 'default'),
'name': task.task_id.replace('_', '-'),
'in_cluster': os.environ.get('AIRFLOW__KUBERNETES__IN_CLUSTER', False),
'image_pull_policy': get_variable('image_pull_policy', default_val='IfNotPresent'),
'get_logs': config.pop('get_logs', True),
'is_delete_operator_pod': config.pop('is_delete_operator_pod', True),
'startup_timeout_seconds': config.pop('startup_timeout_seconds', 1200),
'env_vars': [k8s.V1EnvVar(name=x, value=v) for x, v in task.env_vars.items()],
'do_xcom_push': task.task_config.get('do_xcom_push', False),
'image_pull_secrets': config.pop('image_pull_secrets', 'regcred'),
'volumes': self.volumes,
'config_file': os.environ.get('AIRFLOW__KUBERNETES__CONFIG_FILE'),
'cluster_context': os.environ.get('AIRFLOW__KUBERNETES__CLUSTER_CONTEXT', None),
'cmds': task.cmds,
'volume_mounts': [
V1VolumeMount(
name=mount['volume'],
mount_path=mount['path'],
sub_path=mount.get('sub_path'),
read_only=mount.get('read_only', False),
)
for mount in task.mounts
]
+ [
V1VolumeMount(
name=secret['secret'],
mount_path=secret['remote_path'],
sub_path=secret.get('sub_path'),
read_only=secret.get('read_only', False),
)
for secret in task.secrets
],
}
config.pop('in_cluster', None)
config.pop('volumes', None)
config.pop('volume_mounts', None)
config.pop('executor', None)
config.pop('type', None)
kubernetes_kwargs.update(config)
if env_util.is_running_on_jenkins():
kubernetes_kwargs['affinity'] = self.__jenkins_kubernetes_affinity()
kubernetes_kwargs['namespace'] = 'jenkins'
if not task.dag:
kubernetes_kwargs.update(
{
'start_date': datetime.datetime(1970, 1, 1),
}
)
return kubernetes_kwargs