in dag_utils/tools.py [0:0]
def __init__(self,
# Directories to map into the DOCS gcs bucket
doc_dirs=[],
**kwargs):
# NOTE: There is a limitation to the GCS Fuse that it
# waits 30 seconds after a pod terminates.
#
# This delay is removed in the gcs-fuse-csi-driver but may not yet
# be available in Composer and GKE Autopilot:
# https://github.com/GoogleCloudPlatform/gcs-fuse-csi-driver/issues/91#issuecomment-1886185228
if doc_dirs:
# Initialize these values in kwargs
kwargs.setdefault('annotations', {})
kwargs.setdefault('volumes', [])
kwargs.setdefault('volume_mounts', [])
# Add in the required annotations
kwargs['annotations'].update({
"gke-gcsfuse/volumes": "true",
})
# Add in the docs bucket volume
kwargs['volumes'].append(V1Volume(
name="docs-bucket",
csi=V1CSIVolumeSource(
driver="gcsfuse.csi.storage.gke.io",
read_only=False,
volume_attributes={
'bucketName': GCS_DOCS_BUCKET,
'mountOptions': ','.join([
'implicit-dirs',
'file-mode=0666',
'dir-mode=0777',
]),
},
)
))
# Path in the docs bucket for the files
sub_path = ('{{ dag_run.dag_id }}/{{ task.task_id }}' +
'/{{ execution_date | ts }}')
# Add in the docs bucket volume
for doc_dir in doc_dirs:
kwargs['volume_mounts'].append(V1VolumeMount(
name="docs-bucket",
mount_path=doc_dir,
sub_path=sub_path + doc_dir,
read_only=False,
))
super().__init__(
# Always pull -- if image is updated, we need to use the latest
image_pull_policy='Always',
# See the following URL for why the config file needs to be set:
# https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator#version-5-0-0
config_file="/home/airflow/composer_kube_config",
kubernetes_conn_id="kubernetes_default",
# As per
# https://cloud.google.com/composer/docs/composer-2/use-kubernetes-pod-operator,
# use the composer-user-workloads namespace unless workload
# identity is setup.
namespace='composer-user-workloads',
# Capture all of the logs
get_logs=True,
log_events_on_failure=True,
is_delete_operator_pod=True,
**kwargs)