def _update_join_table_states()

in common/sagemaker_rl/orchestrator/workflow/manager/join_manager.py [0:0]


    def _update_join_table_states(self, join_job_record):
        """Update the joining job states in the joining job table.
        This method will keep polling the Athena query status and then
        update joining job metadata

        Args:
            join_job_record (dict): Current joining job record in the
                joining table
        """
        if join_job_record is None:
            return

        current_state = join_job_record.get("current_state", None)
        join_query_ids = join_job_record.get("join_query_ids", [])

        # join job already ended in terminated state
        if current_state is not None and current_state.endswith("ED"):
            return

        if not join_query_ids:
            raise JoinQueryIdsNotAvailableException(
                f"Query ids for Joining job " f"'{self.join_job_id}' cannot be found."
            )

        query_states = []

        for query_id in join_query_ids:
            query_states.append(self.get_query_status(query_id))

        # only 'SUCCEEDED' if both queries are 'SUCCEEDED'
        if query_states[0] == "SUCCEEDED" and query_states[1] == "SUCCEEDED":
            current_state = "SUCCEEDED"
        elif "FAILED" in query_states:
            current_state = "FAILED"
        elif "CANCELLED" in query_states:
            current_state = "CANCELLED"
        else:
            current_state = "RUNNING"

        # update table states via ddb client
        self.join_db_client.update_join_job_current_state(
            self.experiment_id, self.join_job_id, current_state
        )