in hostfactory/host_provider/src/cyclecloud_provider.py [0:0]
def _terminate_expired_requests(self):
# if a request is made but no subsequent status call is ever made, we won't have any information stored about the request.
# this forces a status call of those before moving on.
# Note we aren't in the creation_json lock here, as status will grab the lock.
never_queried_requests = []
for request_id, request in self.creation_json.read().items():
if request["allNodes"] is None:
never_queried_requests.append(request_id)
if never_queried_requests:
try:
unrecoverable_request_ids = []
response = self._create_status({"requests": [{"requestId": r} for r in never_queried_requests]},
quiet_output(),
update_completed_nodes=False)
for request in response["requests"]:
#setting _recoverable_ to false so we don't retry indefinitely
if request["status"] == RequestStates.complete_with_error and not request.get("_recoverable_", False):
unrecoverable_request_ids.append(request["requestId"])
# if we got a 404 on the request_id (a failed nodes/create call), set allNodes to an empty list so that we don't retry indefinitely.
with self.creation_json as creation_requests:
for request_id in unrecoverable_request_ids:
creation_requests[request_id]["allNodes"] = []
except Exception:
logger.exception("Could not request status of creation quests.")
requests_store = self.creation_json.read()
to_update_status = []
# find all requests that are not completed but have expired
for request_id, request in requests_store.items():
if request.get("completed"):
continue
created_timestamp = request["requestTime"]
now = calendar.timegm(self.clock())
delta = now - created_timestamp
if delta > self.creation_request_ttl:
to_update_status.append(request_id)
if not to_update_status:
return
self._create_status({"requests": [{"requestId": r} for r in to_update_status]},
quiet_output(),
# We need to terminate nodes that were not ready by the time the request expired
# We will terminate nodes that converge after timeout
update_completed_nodes=False)
with self.creation_json as requests_store:
to_shutdown = []
to_mark_complete = []
for request_id in to_update_status:
request = requests_store[request_id]
if request.get("completed"):
continue
if request.get("allNodes") is None:
logger.warning("Yet to find any NodeIds for RequestId %s", request_id)
continue
completed_node_ids = [x["nodeid"] for x in request["completedNodes"]]
failed_to_start = set(request["allNodes"]) - set(completed_node_ids)
if failed_to_start:
to_shutdown.extend(set(request["allNodes"]) - set(completed_node_ids))
logger.warning("Expired creation request found - %s. %d out of %d completed.", request_id,
len(completed_node_ids), len(request["allNodes"]))
to_mark_complete.append(request)
if not to_mark_complete:
return
if to_shutdown:
self.terminate_machines({"machines": [{"machineId": x, "name": x} for x in to_shutdown]}, quiet_output())
for request in to_mark_complete:
request["lastUpdateTime"] = calendar.timegm(self.clock())
request["completed"] = True