odps/mars_extension/legacy/deploy/app.py [304:407]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    def _get_ready_pod_count(self, label_selector):
        if self._node_blacklist:
            query = self._core_api.list_namespaced_pod(
                namespace=self._namespace, label_selector=label_selector
            ).to_dict()
            for el in query["items"]:
                node_name = el.get("spec", {}).get("node_name")
                pod_name = el["metadata"]["name"]

                if pod_name in self._blacklisted_pods:
                    continue
                if node_name and node_name in self._node_blacklist:
                    logger.warning(
                        "Found node %s in blacklist, will terminate pod %s",
                        node_name,
                        pod_name,
                    )
                    try:
                        self._core_api.delete_namespaced_pod(
                            name=pod_name, namespace=self._namespace
                        )
                        self._blacklisted_pods.add(pod_name)
                    except:  # noqa: E722
                        pass
        return super()._get_ready_pod_count(label_selector)

    def _get_pods_name_and_ip(self, label_selector):
        pod_name_list = []
        pod_ip_list = []
        query = self._core_api.list_namespaced_pod(
            namespace=self._namespace, label_selector=label_selector
        ).to_dict()
        for el in query["items"]:
            if el["status"]["reason"] == "CupidStarted":
                pod_name_list.append(el["metadata"]["name"])
                pod_ip_list.append(el["status"]["pod_ip"])
        return pod_name_list, pod_ip_list


class MarsCupidServer(object):
    def __init__(self):
        self.args = None
        self._instance_id = None
        self._kube_url = None
        self._kube_client = None

    def __call__(self, argv=None):
        if argv is None:
            argv = sys.argv[1:]
        return self._main(argv)

    def config_logging(self):
        import logging.config
        import mars

        log_conf = self.args.log_conf or "logging.conf"

        conf_file_paths = [
            "",
            os.path.abspath("."),
            os.path.dirname(os.path.dirname(mars.__file__)),
        ]
        log_configured = False
        for path in conf_file_paths:
            conf_path = os.path.join(path, log_conf) if path else log_conf
            if os.path.exists(conf_path):
                logging.config.fileConfig(conf_path, disable_existing_loggers=False)
                log_configured = True
                break

        if not log_configured:
            log_level = self.args.log_level
            level = getattr(logging, log_level.upper()) if log_level else logging.INFO
            logging.getLogger("mars").setLevel(level)
            logging.basicConfig(format=self.args.log_format)

    def _main(self, argv=None):
        parser = argparse.ArgumentParser(description="Mars Cupid Application")
        parser.add_argument("--log-level", help="log level")
        parser.add_argument("--log-format", help="log format")
        parser.add_argument(
            "--log-conf", help="log config file, logging.conf by default"
        )
        parser.add_argument("encoded_args")
        self.args = parser.parse_args(argv)
        self.config_logging()

        self._kube_url = os.environ["KUBE_API_ADDRESS"].strip('"')
        self._instance_id = os.environ["KUBE_NAMESPACE"].strip('"')

        args_dict = json.loads(base64.b64decode(self.args.encoded_args).decode())
        args_dict["namespace"] = self._instance_id

        if args_dict.get("worker_disk_num") is not None:
            disk_num = args_dict.pop("worker_disk_num")
            disk_size = args_dict.pop("worker_disk_size")
            args_dict["worker_spill_paths"] = [
                DiskDriverVolumeConfig(
                    name="diskdriver-volume%d" % i,
                    mount_path="/diskdriver%d" % i,
                    device_size=readable_size(disk_size, trunc=True).lower(),
                )
                for i in range(disk_num)
            ]
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



odps/mars_extension/oscar/deploy/app.py [298:401]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    def _get_ready_pod_count(self, label_selector):
        if self._node_blacklist:
            query = self._core_api.list_namespaced_pod(
                namespace=self._namespace, label_selector=label_selector
            ).to_dict()
            for el in query["items"]:
                node_name = el.get("spec", {}).get("node_name")
                pod_name = el["metadata"]["name"]

                if pod_name in self._blacklisted_pods:
                    continue
                if node_name and node_name in self._node_blacklist:
                    logger.warning(
                        "Found node %s in blacklist, will terminate pod %s",
                        node_name,
                        pod_name,
                    )
                    try:
                        self._core_api.delete_namespaced_pod(
                            name=pod_name, namespace=self._namespace
                        )
                        self._blacklisted_pods.add(pod_name)
                    except:  # noqa: E722
                        pass
        return super()._get_ready_pod_count(label_selector)

    def _get_pods_name_and_ip(self, label_selector):
        pod_name_list = []
        pod_ip_list = []
        query = self._core_api.list_namespaced_pod(
            namespace=self._namespace, label_selector=label_selector
        ).to_dict()
        for el in query["items"]:
            if el["status"]["reason"] == "CupidStarted":
                pod_name_list.append(el["metadata"]["name"])
                pod_ip_list.append(el["status"]["pod_ip"])
        return pod_name_list, pod_ip_list


class MarsCupidServer(object):
    def __init__(self):
        self.args = None
        self._instance_id = None
        self._kube_url = None
        self._kube_client = None

    def __call__(self, argv=None):
        if argv is None:
            argv = sys.argv[1:]
        return self._main(argv)

    def config_logging(self):
        import logging.config
        import mars

        log_conf = self.args.log_conf or "logging.conf"

        conf_file_paths = [
            "",
            os.path.abspath("."),
            os.path.dirname(os.path.dirname(mars.__file__)),
        ]
        log_configured = False
        for path in conf_file_paths:
            conf_path = os.path.join(path, log_conf) if path else log_conf
            if os.path.exists(conf_path):
                logging.config.fileConfig(conf_path, disable_existing_loggers=False)
                log_configured = True
                break

        if not log_configured:
            log_level = self.args.log_level
            level = getattr(logging, log_level.upper()) if log_level else logging.INFO
            logging.getLogger("mars").setLevel(level)
            logging.basicConfig(format=self.args.log_format)

    def _main(self, argv=None):
        parser = argparse.ArgumentParser(description="Mars Cupid Application")
        parser.add_argument("--log-level", help="log level")
        parser.add_argument("--log-format", help="log format")
        parser.add_argument(
            "--log-conf", help="log config file, logging.conf by default"
        )
        parser.add_argument("encoded_args")
        self.args = parser.parse_args(argv)
        self.config_logging()

        self._kube_url = os.environ["KUBE_API_ADDRESS"].strip('"')
        self._instance_id = os.environ["KUBE_NAMESPACE"].strip('"')

        args_dict = json.loads(base64.b64decode(self.args.encoded_args).decode())
        args_dict["namespace"] = self._instance_id

        if args_dict.get("worker_disk_num") is not None:
            disk_num = args_dict.pop("worker_disk_num")
            disk_size = args_dict.pop("worker_disk_size")
            args_dict["worker_spill_paths"] = [
                DiskDriverVolumeConfig(
                    name="diskdriver-volume%d" % i,
                    mount_path="/diskdriver%d" % i,
                    device_size=readable_size(disk_size, trunc=True).lower(),
                )
                for i in range(disk_num)
            ]
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



