in mephisto/operations/supervisor.py [0:0]
def _register_agent(self, packet: Packet, channel_info: ChannelInfo):
"""Process an agent registration packet to register an agent"""
# First see if this is a reconnection
crowd_data = packet.data["provider_data"]
agent_registration_id = crowd_data["agent_registration_id"]
logger.debug(f"Incoming request to register agent {agent_registration_id}.")
if agent_registration_id in self.agents_by_registration_id:
agent = self.agents_by_registration_id[agent_registration_id].agent
# Update the source channel, in case it has changed
self.agents[agent.get_agent_id()].used_channel_id = channel_info.channel_id
self.message_queue.append(
Packet(
packet_type=PACKET_TYPE_PROVIDER_DETAILS,
sender_id=SYSTEM_CHANNEL_ID,
receiver_id=channel_info.channel_id,
data={
"request_id": packet.data["request_id"],
"agent_id": agent.get_agent_id(),
},
)
)
logger.debug(
f"Found existing agent_registration_id {agent_registration_id}, "
f"reconnecting to {agent}."
)
return
# Process a new agent
task_runner = channel_info.job.task_runner
task_run = task_runner.task_run
worker_id = crowd_data["worker_id"]
worker = Worker.get(self.db, worker_id)
# get the list of tentatively valid units
units = task_run.get_valid_units_for_worker(worker)
if len(units) == 0:
self.message_queue.append(
Packet(
packet_type=PACKET_TYPE_PROVIDER_DETAILS,
sender_id=SYSTEM_CHANNEL_ID,
receiver_id=channel_info.channel_id,
data={"request_id": packet.data["request_id"], "agent_id": None},
)
)
logger.debug(
f"agent_registration_id {agent_registration_id}, had no valid units."
)
return
# If there's onboarding, see if this worker has already been disqualified
worker_id = crowd_data["worker_id"]
worker = Worker.get(self.db, worker_id)
blueprint = task_run.get_blueprint(args=task_runner.args)
if isinstance(blueprint, OnboardingRequired) and blueprint.use_onboarding:
if worker.is_disqualified(blueprint.onboarding_qualification_name):
self.message_queue.append(
Packet(
packet_type=PACKET_TYPE_PROVIDER_DETAILS,
sender_id=SYSTEM_CHANNEL_ID,
receiver_id=channel_info.channel_id,
data={
"request_id": packet.data["request_id"],
"agent_id": None,
},
)
)
logger.debug(
f"Worker {worker_id} is already disqualified by onboarding "
f"qual {blueprint.onboarding_qualification_name}."
)
return
elif not worker.is_qualified(blueprint.onboarding_qualification_name):
# Send a packet with onboarding information
onboard_data = blueprint.get_onboarding_data(worker.db_id)
onboard_agent = OnboardingAgent.new(self.db, worker, task_run)
onboard_agent.state.set_init_state(onboard_data)
agent_info = AgentInfo(
agent=onboard_agent, used_channel_id=channel_info.channel_id
)
onboard_id = onboard_agent.get_agent_id()
# register onboarding agent
self.agents[onboard_id] = agent_info
self.onboarding_packets[onboard_id] = packet
self.message_queue.append(
Packet(
packet_type=PACKET_TYPE_PROVIDER_DETAILS,
sender_id=SYSTEM_CHANNEL_ID,
receiver_id=channel_info.channel_id,
data={
"request_id": packet.data["request_id"],
"agent_id": onboard_id,
"onboard_data": onboard_data,
},
)
)
logger.debug(
f"{worker} is starting onboarding thread with "
f"onboarding {onboard_agent}."
)
# Create an onboarding thread
onboard_thread = threading.Thread(
target=self._launch_and_run_onboarding,
args=(agent_info, channel_info.job.task_runner),
name=f"Onboard-thread-{onboard_id}",
)
onboard_agent.update_status(AgentState.STATUS_ONBOARDING)
agent_info.assignment_thread = onboard_thread
onboard_thread.start()
return
if isinstance(blueprint, ScreenTaskRequired) and blueprint.use_screening_task:
if (
blueprint.worker_needs_screening(worker)
and blueprint.should_generate_unit()
):
screening_data = blueprint.get_screening_unit_data()
if screening_data is not None:
launcher = channel_info.job.task_launcher
assert (
launcher is not None
), "Job must have launcher to use screening tasks"
units = [launcher.launch_screening_unit(screening_data)]
else:
self.message_queue.append(
Packet(
packet_type=PACKET_TYPE_PROVIDER_DETAILS,
sender_id=SYSTEM_CHANNEL_ID,
receiver_id=channel_info.channel_id,
data={
"request_id": packet.data["request_id"],
"agent_id": None,
},
)
)
logger.debug(
f"No screening units left for {agent_registration_id}."
)
return
if isinstance(blueprint, UseGoldUnit) and blueprint.use_golds:
if blueprint.should_produce_gold_for_worker(worker):
gold_data = blueprint.get_gold_unit_data_for_worker(worker)
if gold_data is not None:
launcher = channel_info.job.task_launcher
units = [launcher.launch_gold_unit(gold_data)]
else:
self.message_queue.append(
Packet(
packet_type=PACKET_TYPE_PROVIDER_DETAILS,
sender_id=SYSTEM_CHANNEL_ID,
receiver_id=channel_info.channel_id,
data={
"request_id": packet.data["request_id"],
"agent_id": None,
},
)
)
logger.debug(f"No gold units left for {agent_registration_id}...")
return
# Not onboarding, so just register directly
self._assign_unit_to_agent(packet, channel_info, units)