in mephisto/abstractions/providers/mturk/mturk_unit.py [0:0]
def get_status(self) -> str:
"""Get status for this unit directly from MTurk, fall back on local info"""
if self.db_status == AssignmentState.CREATED:
return super().get_status()
elif self.db_status in [
AssignmentState.ACCEPTED,
AssignmentState.EXPIRED,
AssignmentState.SOFT_REJECTED,
]:
# These statuses don't change with a get_status call
return self.db_status
if self.db_status in [AssignmentState.COMPLETED, AssignmentState.REJECTED]:
# These statuses only change on agent dependent changes
agent = self.get_assigned_agent()
found_status = self.db_status
if agent is not None:
agent_status = agent.get_status()
if agent_status == AgentState.STATUS_APPROVED:
found_status = AssignmentState.ACCEPTED
elif agent_status == AgentState.STATUS_REJECTED:
found_status = AssignmentState.REJECTED
elif agent_status == AgentState.STATUS_SOFT_REJECTED:
found_status = AssignmentState.SOFT_REJECTED
else:
logger.warning(f"Agent for unit {self} is None")
if found_status != self.db_status:
self.set_db_status(found_status)
return self.db_status
# Remaining statuses are tracking a live HIT
mturk_hit_id = self.get_mturk_hit_id()
if mturk_hit_id is None:
# If the hit_id is None and there's an agent still assigned,
# then that agent has timed out and we should expire
agent = self.get_assigned_agent()
if agent is not None:
if agent.get_status() != AgentState.STATUS_EXPIRED:
agent.update_status(AgentState.STATUS_EXPIRED)
# Can't determine anything else if there is no HIT on this unit
return self.db_status
requester = self.get_requester()
client = self._get_client(requester._requester_name)
hit = get_hit(client, mturk_hit_id)
if hit is None:
return AssignmentState.EXPIRED
hit_data = hit["HIT"]
local_status = self.db_status
external_status = self.db_status
if hit_data["HITStatus"] == "Assignable":
external_status = AssignmentState.LAUNCHED
elif hit_data["HITStatus"] == "Unassignable":
external_status = AssignmentState.ASSIGNED
elif hit_data["HITStatus"] in ["Reviewable", "Reviewing"]:
external_status = AssignmentState.COMPLETED
if hit_data["NumberOfAssignmentsAvailable"] != 0:
external_status = AssignmentState.EXPIRED
elif hit_data["HITStatus"] == "Disposed":
# The HIT was deleted, must rely on what we have
external_status = local_status
else:
raise Exception(f"Unexpected HIT status {hit_data['HITStatus']}")
if external_status != local_status:
if local_status == AssignmentState.ASSIGNED and external_status in [
AssignmentState.LAUNCHED,
AssignmentState.EXPIRED,
]:
# Treat this as a return event, this hit may be doable by someone else
agent = self.get_assigned_agent()
if agent is not None and agent.get_status() in [
AgentState.STATUS_IN_TASK,
AgentState.STATUS_ONBOARDING,
AgentState.STATUS_WAITING,
AgentState.STATUS_PARTNER_DISCONNECT,
]:
# mark the in-task agent as having returned the HIT, to
# free any running tasks and have Blueprint decide on cleanup.
agent.update_status(AgentState.STATUS_RETURNED)
if external_status == AssignmentState.EXPIRED:
# If we're expired, then it won't be doable, and we should update
self.set_db_status(external_status)
else:
self.set_db_status(external_status)
return self.db_status