def get_host_info_by_strategy()

in aws_advanced_python_wrapper/fastest_response_strategy_plugin.py [0:0]


    def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
        if not self.accepts_strategy(role, strategy):
            logger.error("FastestResponseStrategyPlugin.UnsupportedHostSelectorStrategy", strategy)
            raise AwsWrapperError(Messages.get_formatted("FastestResponseStrategyPlugin.UnsupportedHostSelectorStrategy", strategy))

        fastest_response_host: Optional[HostInfo] = self._cached_fastest_response_host_by_role.get(role.name)
        if fastest_response_host is not None:

            # Found a fastest host. Let's find it in the latest topology.
            for host in self._plugin_service.hosts:
                if host == fastest_response_host:
                    # found the fastest host in the topology
                    return host
                # It seems that the fastest cached host isn't in the latest topology.
                # Let's ignore cached results and find the fastest host.

        # Cached result isn't available. Need to find the fastest response time host.
        eligible_hosts: List[FastestResponseStrategyPlugin.ResponseTimeTuple] = []
        for host in self._plugin_service.hosts:
            if role == host.role:
                response_time_tuple = FastestResponseStrategyPlugin.ResponseTimeTuple(host,
                                                                                      self._host_response_time_service.get_response_time(host))
                eligible_hosts.append(response_time_tuple)

        # Sort by response time then retrieve the first host
        sorted_eligible_hosts: List[FastestResponseStrategyPlugin.ResponseTimeTuple] = \
            sorted(eligible_hosts, key=lambda x: x.response_time)

        calculated_fastest_response_host = sorted_eligible_hosts[0].host_info
        if calculated_fastest_response_host is None or \
                self._host_response_time_service.get_response_time(calculated_fastest_response_host) == MAX_VALUE:
            logger.debug("FastestResponseStrategyPlugin.RandomHostSelected")
            return self._random_host_selector.get_host(self._plugin_service.hosts, role, self._properties)

        self._cached_fastest_response_host_by_role.put(role.name,
                                                       calculated_fastest_response_host,
                                                       self._cache_expiration_nanos)

        return calculated_fastest_response_host