def _register_agent()

in mephisto/operations/supervisor.py [0:0]


    def _register_agent(self, packet: Packet, channel_info: ChannelInfo):
        """Process an agent registration packet to register an agent"""
        # First see if this is a reconnection
        crowd_data = packet.data["provider_data"]
        agent_registration_id = crowd_data["agent_registration_id"]
        logger.debug(f"Incoming request to register agent {agent_registration_id}.")
        if agent_registration_id in self.agents_by_registration_id:
            agent = self.agents_by_registration_id[agent_registration_id].agent
            # Update the source channel, in case it has changed
            self.agents[agent.get_agent_id()].used_channel_id = channel_info.channel_id
            self.message_queue.append(
                Packet(
                    packet_type=PACKET_TYPE_PROVIDER_DETAILS,
                    sender_id=SYSTEM_CHANNEL_ID,
                    receiver_id=channel_info.channel_id,
                    data={
                        "request_id": packet.data["request_id"],
                        "agent_id": agent.get_agent_id(),
                    },
                )
            )
            logger.debug(
                f"Found existing agent_registration_id {agent_registration_id}, "
                f"reconnecting to {agent}."
            )
            return

        # Process a new agent
        task_runner = channel_info.job.task_runner
        task_run = task_runner.task_run
        worker_id = crowd_data["worker_id"]
        worker = Worker.get(self.db, worker_id)

        # get the list of tentatively valid units
        units = task_run.get_valid_units_for_worker(worker)
        if len(units) == 0:
            self.message_queue.append(
                Packet(
                    packet_type=PACKET_TYPE_PROVIDER_DETAILS,
                    sender_id=SYSTEM_CHANNEL_ID,
                    receiver_id=channel_info.channel_id,
                    data={"request_id": packet.data["request_id"], "agent_id": None},
                )
            )
            logger.debug(
                f"agent_registration_id {agent_registration_id}, had no valid units."
            )
            return

        # If there's onboarding, see if this worker has already been disqualified
        worker_id = crowd_data["worker_id"]
        worker = Worker.get(self.db, worker_id)
        blueprint = task_run.get_blueprint(args=task_runner.args)
        if isinstance(blueprint, OnboardingRequired) and blueprint.use_onboarding:
            if worker.is_disqualified(blueprint.onboarding_qualification_name):
                self.message_queue.append(
                    Packet(
                        packet_type=PACKET_TYPE_PROVIDER_DETAILS,
                        sender_id=SYSTEM_CHANNEL_ID,
                        receiver_id=channel_info.channel_id,
                        data={
                            "request_id": packet.data["request_id"],
                            "agent_id": None,
                        },
                    )
                )
                logger.debug(
                    f"Worker {worker_id} is already disqualified by onboarding "
                    f"qual {blueprint.onboarding_qualification_name}."
                )
                return
            elif not worker.is_qualified(blueprint.onboarding_qualification_name):
                # Send a packet with onboarding information
                onboard_data = blueprint.get_onboarding_data(worker.db_id)
                onboard_agent = OnboardingAgent.new(self.db, worker, task_run)
                onboard_agent.state.set_init_state(onboard_data)
                agent_info = AgentInfo(
                    agent=onboard_agent, used_channel_id=channel_info.channel_id
                )
                onboard_id = onboard_agent.get_agent_id()
                # register onboarding agent
                self.agents[onboard_id] = agent_info
                self.onboarding_packets[onboard_id] = packet
                self.message_queue.append(
                    Packet(
                        packet_type=PACKET_TYPE_PROVIDER_DETAILS,
                        sender_id=SYSTEM_CHANNEL_ID,
                        receiver_id=channel_info.channel_id,
                        data={
                            "request_id": packet.data["request_id"],
                            "agent_id": onboard_id,
                            "onboard_data": onboard_data,
                        },
                    )
                )

                logger.debug(
                    f"{worker} is starting onboarding thread with "
                    f"onboarding {onboard_agent}."
                )

                # Create an onboarding thread
                onboard_thread = threading.Thread(
                    target=self._launch_and_run_onboarding,
                    args=(agent_info, channel_info.job.task_runner),
                    name=f"Onboard-thread-{onboard_id}",
                )

                onboard_agent.update_status(AgentState.STATUS_ONBOARDING)
                agent_info.assignment_thread = onboard_thread
                onboard_thread.start()
                return
        if isinstance(blueprint, ScreenTaskRequired) and blueprint.use_screening_task:
            if (
                blueprint.worker_needs_screening(worker)
                and blueprint.should_generate_unit()
            ):
                screening_data = blueprint.get_screening_unit_data()
                if screening_data is not None:
                    launcher = channel_info.job.task_launcher
                    assert (
                        launcher is not None
                    ), "Job must have launcher to use screening tasks"
                    units = [launcher.launch_screening_unit(screening_data)]
                else:
                    self.message_queue.append(
                        Packet(
                            packet_type=PACKET_TYPE_PROVIDER_DETAILS,
                            sender_id=SYSTEM_CHANNEL_ID,
                            receiver_id=channel_info.channel_id,
                            data={
                                "request_id": packet.data["request_id"],
                                "agent_id": None,
                            },
                        )
                    )
                    logger.debug(
                        f"No screening units left for {agent_registration_id}."
                    )
                    return
        if isinstance(blueprint, UseGoldUnit) and blueprint.use_golds:
            if blueprint.should_produce_gold_for_worker(worker):
                gold_data = blueprint.get_gold_unit_data_for_worker(worker)
                if gold_data is not None:
                    launcher = channel_info.job.task_launcher
                    units = [launcher.launch_gold_unit(gold_data)]
                else:
                    self.message_queue.append(
                        Packet(
                            packet_type=PACKET_TYPE_PROVIDER_DETAILS,
                            sender_id=SYSTEM_CHANNEL_ID,
                            receiver_id=channel_info.channel_id,
                            data={
                                "request_id": packet.data["request_id"],
                                "agent_id": None,
                            },
                        )
                    )
                    logger.debug(f"No gold units left for {agent_registration_id}...")
                    return

        # Not onboarding, so just register directly
        self._assign_unit_to_agent(packet, channel_info, units)