in common/sagemaker_rl/orchestrator/workflow/manager/join_manager.py [0:0]
def _update_join_table_states(self, join_job_record):
"""Update the joining job states in the joining job table.
This method will keep polling the Athena query status and then
update joining job metadata
Args:
join_job_record (dict): Current joining job record in the
joining table
"""
if join_job_record is None:
return
current_state = join_job_record.get("current_state", None)
join_query_ids = join_job_record.get("join_query_ids", [])
# join job already ended in terminated state
if current_state is not None and current_state.endswith("ED"):
return
if not join_query_ids:
raise JoinQueryIdsNotAvailableException(
f"Query ids for Joining job " f"'{self.join_job_id}' cannot be found."
)
query_states = []
for query_id in join_query_ids:
query_states.append(self.get_query_status(query_id))
# only 'SUCCEEDED' if both queries are 'SUCCEEDED'
if query_states[0] == "SUCCEEDED" and query_states[1] == "SUCCEEDED":
current_state = "SUCCEEDED"
elif "FAILED" in query_states:
current_state = "FAILED"
elif "CANCELLED" in query_states:
current_state = "CANCELLED"
else:
current_state = "RUNNING"
# update table states via ddb client
self.join_db_client.update_join_job_current_state(
self.experiment_id, self.join_job_id, current_state
)