def __kubernetes_kwargs()

in liminal/runners/airflow/executors/kubernetes.py [0:0]


    def __kubernetes_kwargs(self, task: ContainerTask):
        config = copy.deepcopy(self.executor_config)

        kubernetes_kwargs = {
            'task_id': task.task_id,
            'image': task.image,
            'arguments': task.arguments,
            'namespace': os.environ.get('AIRFLOW__KUBERNETES__NAMESPACE', 'default'),
            'name': task.task_id.replace('_', '-'),
            'in_cluster': os.environ.get('AIRFLOW__KUBERNETES__IN_CLUSTER', False),
            'image_pull_policy': get_variable('image_pull_policy', default_val='IfNotPresent'),
            'get_logs': config.pop('get_logs', True),
            'is_delete_operator_pod': config.pop('is_delete_operator_pod', True),
            'startup_timeout_seconds': config.pop('startup_timeout_seconds', 1200),
            'env_vars': [k8s.V1EnvVar(name=x, value=v) for x, v in task.env_vars.items()],
            'do_xcom_push': task.task_config.get('do_xcom_push', False),
            'image_pull_secrets': config.pop('image_pull_secrets', 'regcred'),
            'volumes': self.volumes,
            'config_file': os.environ.get('AIRFLOW__KUBERNETES__CONFIG_FILE'),
            'cluster_context': os.environ.get('AIRFLOW__KUBERNETES__CLUSTER_CONTEXT', None),
            'cmds': task.cmds,
            'volume_mounts': [
                V1VolumeMount(
                    name=mount['volume'],
                    mount_path=mount['path'],
                    sub_path=mount.get('sub_path'),
                    read_only=mount.get('read_only', False),
                )
                for mount in task.mounts
            ]
            + [
                V1VolumeMount(
                    name=secret['secret'],
                    mount_path=secret['remote_path'],
                    sub_path=secret.get('sub_path'),
                    read_only=secret.get('read_only', False),
                )
                for secret in task.secrets
            ],
        }

        config.pop('in_cluster', None)
        config.pop('volumes', None)
        config.pop('volume_mounts', None)
        config.pop('executor', None)
        config.pop('type', None)

        kubernetes_kwargs.update(config)

        if env_util.is_running_on_jenkins():
            kubernetes_kwargs['affinity'] = self.__jenkins_kubernetes_affinity()
            kubernetes_kwargs['namespace'] = 'jenkins'

        if not task.dag:
            kubernetes_kwargs.update(
                {
                    'start_date': datetime.datetime(1970, 1, 1),
                }
            )

        return kubernetes_kwargs