in mephisto/data_model/assignment.py [0:0]
def get_status(self) -> str:
"""
Get the status of this assignment, as determined by the status of
the units
"""
units = self.get_units()
statuses = set(unit.get_status() for unit in units)
if len(statuses) == 1:
return statuses.pop()
if len(statuses) == 0:
return AssignmentState.CREATED
if AssignmentState.CREATED in statuses:
return AssignmentState.CREATED
if any([s == AssignmentState.LAUNCHED for s in statuses]):
# If any are only launched, consider the whole thing launched
return AssignmentState.LAUNCHED
if any([s == AssignmentState.ASSIGNED for s in statuses]):
# If any are still assigned, consider the whole thing assigned
return AssignmentState.ASSIGNED
if all(
[
s in [AssignmentState.ACCEPTED, AssignmentState.REJECTED]
for s in statuses
]
):
return AssignmentState.MIXED
if all([s in AssignmentState.final_agent() for s in statuses]):
return AssignmentState.COMPLETED
raise NotImplementedError(f"Unexpected set of unit statuses {statuses}")