def _load_config()

in src/translation/dags/controller_dag.py [0:0]


def _load_config(ti, **kwargs) -> None:
    client = storage.Client(
        client_info=ClientInfo(user_agent=custom_user_agent.USER_AGENT)
    )
    event_json = kwargs["dag_run"].conf
    event_type = event_json["message"]["attributes"]["eventType"]

    if event_type == "OBJECT_FINALIZE":  # New config file dropped
        message = event_json["message"]
        print(f"message : {message}")
        bucket_id = message["attributes"]["bucketId"]
        Variable.set("hive_config_bucket_id", bucket_id)
        object_id = message["attributes"]["objectId"]
        print(f"bucket_id : {bucket_id}, object_id: {object_id}")
        bucket = client.get_bucket(bucket_id)
        blob = storage.Blob(object_id, bucket)
        raw_config = blob.download_as_bytes()

        config = json.loads(raw_config)
        config = normalize_and_validate_config(PROJECT_ID, config)
        if CUSTOM_RUN_ID_KEY not in config:
            config[CUSTOM_RUN_ID_KEY] = datetime.datetime.now().strftime("%x %H:%M:%S")

        ti.xcom_push(key="config", value=config)
        ti.xcom_push(key="bucket_id", value=bucket_id)
        ti.xcom_push(key="object_id", value=object_id)

    elif event_type == "TRANSFER_RUN_FINISHED":
        config = json.loads(base64.b64decode(event_json["message"]["data"]))
        ti.xcom_push(key="config", value=config)
    ti.xcom_push(key="event_type", value=event_type)