def resolve_cluster()

in distribution_strategy/keras_model_to_estimator_client.py [0:0]


def resolve_cluster(port=5000, parse_task_name_fn=_parse_task_name_fn):
  """Queries Kubernetes cluster and gets cluster_spec."""
  kubernetes.config.load_kube_config()
  v1 = kubernetes.client.CoreV1Api()
  ret = v1.list_service_for_all_namespaces()
  cluster_spec = {}
  for item in ret.items:
    if item.status.load_balancer and item.status.load_balancer.ingress:
      task_type, task_id = parse_task_name_fn(item.metadata.name)
      if not task_type:
        continue
      if task_type not in cluster_spec:
        cluster_spec[task_type] = []
      while len(cluster_spec[task_type]) <= task_id:
        cluster_spec[task_type].append(None)
      cluster_spec[task_type][task_id] = '%s:%d' % (
          item.status.load_balancer.ingress[0].ip, port)

  if not cluster_spec:
    raise ValueError(
        "Cannot get cluster_spec. It's possible the cluster is not ready.")
  for task_type, targets in cluster_spec.items():
    for target in targets:
      if target is None:
        raise ValueError(
            'Not all %s tasks are found in the cluster' % task_type)
  tf.logging.info('Using cluster_spec %r' % cluster_spec)
  return cluster_spec