in composer/workflows/airflow_db_cleanup.py [0:0]
def cleanup_function(**context):
session = settings.Session()
logging.info("Retrieving max_execution_date from XCom")
max_date = context["ti"].xcom_pull(
task_ids=print_configuration.task_id, key="max_date"
)
max_date = dateutil.parser.parse(max_date) # stored as iso8601 str in xcom
airflow_db_model = context["params"].get("airflow_db_model")
state = context["params"].get("state")
age_check_column = context["params"].get("age_check_column")
keep_last = context["params"].get("keep_last")
keep_last_filters = context["params"].get("keep_last_filters")
keep_last_group_by = context["params"].get("keep_last_group_by")
logging.info("Configurations:")
logging.info("max_date: " + str(max_date))
logging.info("enable_delete: " + str(ENABLE_DELETE))
logging.info("session: " + str(session))
logging.info("airflow_db_model: " + str(airflow_db_model))
logging.info("state: " + str(state))
logging.info("age_check_column: " + str(age_check_column))
logging.info("keep_last: " + str(keep_last))
logging.info("keep_last_filters: " + str(keep_last_filters))
logging.info("keep_last_group_by: " + str(keep_last_group_by))
logging.info("")
logging.info("Running Cleanup Process...")
try:
if context["params"].get("do_not_delete_by_dag_id"):
query = build_query(
session=session,
airflow_db_model=airflow_db_model,
age_check_column=age_check_column,
max_date=max_date,
)
if PRINT_DELETES:
print_query(query, airflow_db_model, age_check_column)
if ENABLE_DELETE:
logging.info("Performing Delete...")
query.delete(synchronize_session=False)
session.commit()
else:
dags = session.query(airflow_db_model.dag_id).distinct()
session.commit()
list_dags = [str(list(dag)[0]) for dag in dags] + [None]
for dag_id in list_dags:
query = build_query(
session=session,
airflow_db_model=airflow_db_model,
age_check_column=age_check_column,
max_date=max_date,
dag_id=dag_id,
)
if PRINT_DELETES:
print_query(query, airflow_db_model, age_check_column)
if ENABLE_DELETE:
logging.info("Performing Delete...")
query.delete(synchronize_session=False)
session.commit()
if not ENABLE_DELETE:
logging.warning(
"You've opted to skip deleting the db entries. "
"Set ENABLE_DELETE to True to delete entries!!!"
)
logging.info("Finished Running Cleanup Process")
except ProgrammingError as e:
logging.error(e)
logging.error(
str(airflow_db_model) +
" is not present in the metadata." +
"Skipping..."
)
finally:
session.close()