in hostfactory/host_provider/src/cyclecloud_provider.py [0:0]
def _create_status(self, input_json, output_handler=None, update_completed_nodes=True):
"""
input:
{'requests': [{'requestId': 'req-123'}, {'requestId': 'req-234'}]}
output:
{'message': '',
'requests': [{'machines': [{'launchtime': 1516131665,
'machineId': 'id-123',
'message': '',
'privateDnsAddress': '
'name': 'execute-5',
'privateIpAddress': '10.0.1.23',
'result': 'succeed',
'status': 'running'}],
'message': '',
'requestId': 'req-123',
'status': 'complete'}],
'status': 'complete'}
"""
output_handler = output_handler or self.stdout_handler
request_ids = [r["requestId"] for r in input_json["requests"]]
nodes_by_request_id = {}
exceptions = []
# go one by one in case one of the request_ids does not exist in CycleCloud
for request_id in request_ids:
try:
nodes_by_request_id.update(self.cluster.nodes(request_ids=[request_id]))
logger.debug("Node list by request id %s",nodes_by_request_id)
except Exception as e:
if "No operation found for request id" in str(e):
nodes_by_request_id[request_id] = {"nodes": []}
elif "Could not find request id" in str(e):
nodes_by_request_id[request_id] = []
else:
exceptions.append(e)
# send HF request is still running so that it remembers request.
logger.exception("Azure CycleCloud experienced an error but reporting status as running %s. %s", request_id, e)
return output_handler.handle({"status": RequestStates.running,
"requests": [{"requestId": request_id, "status": RequestStates.running} for request_id in request_ids],
"message": "Azure CycleCloud is still requesting nodes"})
if not nodes_by_request_id:
error_messages = " | ".join(list(set([str(e) for e in exceptions])))
return output_handler.handle({"status": RequestStates.complete_with_error,
"requests": [{"requestId": request_id, "status": RequestStates.complete_with_error} for request_id in request_ids],
"message": "Azure CycleCloud experienced an error: %s" % error_messages})
message = ""
response = {"requests": []}
for request_id, requested_nodes in nodes_by_request_id.items():
request_status = RequestStates.complete
unknown_state_count = 0
requesting_count = 0
if not requested_nodes:
# nothing to do.
logger.warning("No nodes found for request id %s.", request_id)
completed_nodes = []
# Collect all node ids associated with requestId for recovery of failed request_store operation.
all_nodes = []
# Collect nodes that have potential to be fulfilled. Excludes failed, terminating and unavailable.
valid_nodes = []
machines = []
request = {"requestId": request_id,
"machines": machines}
response["requests"].append(request)
report_failure_states = ["Unavailable", "Failed"]
terminate_states = []
if self.config.get("symphony.terminate_failed_nodes", False):
report_failure_states = ["Unavailable"]
terminate_states = ["Failed"]
for node in requested_nodes:
# for new nodes, completion is Ready. For "released" nodes, as long as
# the node has begun terminated etc, we can just say success.
# node_status = node.get("State")
node_status = node.state
node_target_state = node.target_state
node_id = self.cluster.get_node_id(node)
all_nodes.append(node_id)
valid_nodes.append(node_id)
machine_status = MachineStates.active
hostname = None
private_ip_address = None
if not node_target_state:
unknown_state_count = unknown_state_count + 1
continue
if node_target_state and node_target_state != "Started":
valid_nodes.remove(node_id)
logger.debug("Node %s target state is not started it is %s", node.get("Name"), node_target_state)
continue
if node_status in report_failure_states:
valid_nodes.remove(node_id)
machine_result = MachineResults.failed
machine_status = MachineStates.error
if request_status != RequestStates.running:
# message = node.get("StatusMessage", "Unknown error.")
request_status = RequestStates.complete_with_error
elif node_status in terminate_states:
# just terminate the node and next iteration the node will be gone. This allows retries of the shutdown to happen, as
# we will report that the node is still booting.
unknown_state_count = unknown_state_count + 1
machine_result = MachineResults.executing
machine_status = MachineStates.building
request_status = RequestStates.running
hostname = node.hostname
if not hostname:
try:
hostname = self.hostnamer.hostname(node.private_ip)
except Exception:
logger.warning("_create_status: No hostname set and could not convert ip %s to hostname for \"%s\" VM.", node.private_ip, node_status)
try:
logger.warning("Warning: Cluster status check terminating failed node %s", node)
# import traceback
#logger.warning("Traceback:\n%s", '\n'.join([line for line in traceback.format_stack()]))
self.cluster.shutdown_nodes([{"machineId": self.cluster.get_node_id(node), "name": hostname}])
except Exception:
logger.exception("Could not terminate node with id %s" % self.cluster.get_node_id(node))
elif not node.instance_id:
requesting_count = requesting_count + 1
request_status = RequestStates.running
machine_result = MachineResults.executing
continue
elif node_status in ["Ready", "Started"]:
machine_result = MachineResults.succeed
machine_status = MachineStates.active
private_ip_address = node.private_ip
if not private_ip_address:
logger.warning("No ip address found for ready node %s", node.get("Name"))
machine_result = MachineResults.executing
machine_status = MachineStates.building
request_status = RequestStates.running
else:
hostname = node.hostname
if not hostname:
try:
hostname = self.hostnamer.hostname(node.private_ip)
logger.warning("_create_status: Node does not have hostname using %s ", hostname)
except Exception:
# TODO: need to append to completed node somewhere? What do we do?
logger.warning("_create_status: No hostname set and could not convert ip %s to hostname for \"%s\" VM.", node.get("PrivateIp"), node)
completed_nodes.append({"hostname": hostname, "nodeid": node_id})
else:
machine_result = MachineResults.executing
machine_status = MachineStates.building
request_status = RequestStates.running
machine = {
"name": hostname or "",
"status": machine_status,
"result": machine_result,
"machineId": self.cluster.get_node_id(node) or "",
# launchTime is manditory in Symphony
# maybe we can add something so we don"t have to expose this
# node["PhaseMap"]["Cloud.AwaitBootup"]["StartTime"]["$date"]
"launchtime": int(time.time()),
# "launchtime": node.get("LaunchTime") or int(time.time()),
"privateIpAddress": private_ip_address or "",
"message": ""
#"message": node.get("StatusMessage") or ""
}
machines.append(machine)
with self.creation_json as requests_store:
if request_id not in requests_store:
logger.warning("Unknown request_id %s. Creating a new entry and resetting requestTime", request_id)
requests_store[request_id] = {"requestTime": calendar.timegm(self.clock())}
#set default
requests_store[request_id]["lastUpdateTime"] = calendar.timegm(self.clock())
# Bugfix: Periodic cleanup calls this function however nodes reach ready state after symphony has
# stopped making status calls should not update this.
if update_completed_nodes:
requests_store[request_id]["completedNodes"] = completed_nodes
if requests_store[request_id].get("allNodes") is None:
requests_store[request_id]["allNodes"] = all_nodes
requests_store[request_id]["completed"] = len(requested_nodes) == len(completed_nodes)
active = len([x for x in machines if x["status"] == MachineStates.active])
building = len([x for x in machines if x["status"] == MachineStates.building])
failed = len([x for x in machines if x["status"] == MachineStates.error])
logger.info("Machine states for requestId %s: %d active, %d building, %d requesting, %d failed and %d in an unknown state.",
request_id, active, building, requesting_count, failed, unknown_state_count)
request["status"] = request_status
if request_status == RequestStates.complete:
logger.info("Request %s is complete.", request_id)
elif request_status == RequestStates.complete_with_error:
logger.warning("Request %s completed with error: %s.", request_id, message)
request["message"] = message
response["status"] = symphony.RequestStates.complete
return output_handler.handle(response)