in dags/processing/processing.py [0:0]
def determine_workload(**kwargs):
"""
Determines if there is work to be done. It checks the xcom data for entries from the upstream sensor.
If no such data can be found, it returns the name of the 'no_work' branch, otherwise it'll return the
'tag_manifest_file' branch's name
:param kwargs:
:return: Name of the next task to execute, depending on the metadata in xcom
"""
print(f"Check xcom for metadata.")
key = kwargs['ti'].xcom_pull(task_ids=f"bag_file_sensor", key=f"filename_s3_key")
bucket = kwargs['ti'].xcom_pull(task_ids=f"bag_file_sensor", key=f"filename_s3_bucket")
if not (key and bucket):
print(f"No unprocessed bag file could be found. Stopping DAG execution")
return 'no_work'
print(f"Found unprocessed bag files. Continuing DAG execution.")
return 'tag_bag_file_in_process'