in src/translation/dags/controller_dag.py [0:0]
def _load_config(ti, **kwargs) -> None:
client = storage.Client(
client_info=ClientInfo(user_agent=custom_user_agent.USER_AGENT)
)
event_json = kwargs["dag_run"].conf
event_type = event_json["message"]["attributes"]["eventType"]
if event_type == "OBJECT_FINALIZE": # New config file dropped
message = event_json["message"]
print(f"message : {message}")
bucket_id = message["attributes"]["bucketId"]
Variable.set("hive_config_bucket_id", bucket_id)
object_id = message["attributes"]["objectId"]
print(f"bucket_id : {bucket_id}, object_id: {object_id}")
bucket = client.get_bucket(bucket_id)
blob = storage.Blob(object_id, bucket)
raw_config = blob.download_as_bytes()
config = json.loads(raw_config)
config = normalize_and_validate_config(PROJECT_ID, config)
if CUSTOM_RUN_ID_KEY not in config:
config[CUSTOM_RUN_ID_KEY] = datetime.datetime.now().strftime("%x %H:%M:%S")
ti.xcom_push(key="config", value=config)
ti.xcom_push(key="bucket_id", value=bucket_id)
ti.xcom_push(key="object_id", value=object_id)
elif event_type == "TRANSFER_RUN_FINISHED":
config = json.loads(base64.b64decode(event_json["message"]["data"]))
ti.xcom_push(key="config", value=config)
ti.xcom_push(key="event_type", value=event_type)