in distribution_strategy/keras_model_to_estimator_client.py [0:0]
def resolve_cluster(port=5000, parse_task_name_fn=_parse_task_name_fn):
"""Queries Kubernetes cluster and gets cluster_spec."""
kubernetes.config.load_kube_config()
v1 = kubernetes.client.CoreV1Api()
ret = v1.list_service_for_all_namespaces()
cluster_spec = {}
for item in ret.items:
if item.status.load_balancer and item.status.load_balancer.ingress:
task_type, task_id = parse_task_name_fn(item.metadata.name)
if not task_type:
continue
if task_type not in cluster_spec:
cluster_spec[task_type] = []
while len(cluster_spec[task_type]) <= task_id:
cluster_spec[task_type].append(None)
cluster_spec[task_type][task_id] = '%s:%d' % (
item.status.load_balancer.ingress[0].ip, port)
if not cluster_spec:
raise ValueError(
"Cannot get cluster_spec. It's possible the cluster is not ready.")
for task_type, targets in cluster_spec.items():
for target in targets:
if target is None:
raise ValueError(
'Not all %s tasks are found in the cluster' % task_type)
tf.logging.info('Using cluster_spec %r' % cluster_spec)
return cluster_spec