def __init__()

in dag_utils/tools.py [0:0]


    def __init__(self,
                 # Directories to map into the DOCS gcs bucket
                 doc_dirs=[],
                 **kwargs):

        # NOTE: There is a limitation to the GCS Fuse that it
        # waits 30 seconds after a pod terminates.
        #
        # This delay is removed in the gcs-fuse-csi-driver but may not yet
        # be available in Composer and GKE Autopilot:
        # https://github.com/GoogleCloudPlatform/gcs-fuse-csi-driver/issues/91#issuecomment-1886185228
        if doc_dirs:

            # Initialize these values in kwargs
            kwargs.setdefault('annotations', {})
            kwargs.setdefault('volumes', [])
            kwargs.setdefault('volume_mounts', [])

            # Add in the required annotations
            kwargs['annotations'].update({
                "gke-gcsfuse/volumes": "true",
            })

            # Add in the docs bucket volume
            kwargs['volumes'].append(V1Volume(
                name="docs-bucket",
                csi=V1CSIVolumeSource(
                    driver="gcsfuse.csi.storage.gke.io",
                    read_only=False,
                    volume_attributes={
                        'bucketName': GCS_DOCS_BUCKET,
                        'mountOptions': ','.join([
                            'implicit-dirs',
                            'file-mode=0666',
                            'dir-mode=0777',
                        ]),
                    },
                )
            ))

            # Path in the docs bucket for the files
            sub_path = ('{{ dag_run.dag_id }}/{{ task.task_id }}' +
                        '/{{ execution_date | ts }}')

            # Add in the docs bucket volume
            for doc_dir in doc_dirs:
                kwargs['volume_mounts'].append(V1VolumeMount(
                    name="docs-bucket",
                    mount_path=doc_dir,
                    sub_path=sub_path + doc_dir,
                    read_only=False,
                ))

        super().__init__(

            # Always pull -- if image is updated, we need to use the latest
            image_pull_policy='Always',

            # See the following URL for why the config file needs to be set:
            # https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator#version-5-0-0
            config_file="/home/airflow/composer_kube_config",
            kubernetes_conn_id="kubernetes_default",

            # As per
            # https://cloud.google.com/composer/docs/composer-2/use-kubernetes-pod-operator,
            # use the composer-user-workloads namespace unless workload
            # identity is setup.
            namespace='composer-user-workloads',

            # Capture all of the logs
            get_logs=True,
            log_events_on_failure=True,
            is_delete_operator_pod=True,

            **kwargs)