in src/graph_notebook/magics/graph_magic.py [0:0]
def reset(self, line, local_ns: dict = None, service: str = None):
logger.info(f'calling system endpoint {self.client.host}')
parser = argparse.ArgumentParser()
parser.add_argument('-g', '--generate-token', action='store_true',
help='Generate token for database reset. Database only.')
parser.add_argument('-t', '--token', default='',
help='Perform database reset with given token. Database only.')
parser.add_argument('-s', '--snapshot', action='store_true', default=False,
help='Creates a final graph snapshot before the graph data is deleted. Analytics only.')
parser.add_argument('-y', '--yes', action='store_true',
help='Skip the prompt and perform reset.')
parser.add_argument('-m', '--max-status-retries', type=int, default=10,
help='Specifies how many times we should attempt to check if the database reset has '
'completed, in intervals of 5 seconds. Default is 10')
args = parser.parse_args(line.split())
generate_token = args.generate_token
skip_prompt = args.yes
snapshot = args.snapshot
if not service:
service = self.graph_notebook_config.neptune_service
if service == NEPTUNE_DB_SERVICE_NAME:
using_db = True
graph_id = None
message_instance = "cluster"
else:
using_db = False
graph_id = self.client.get_graph_id()
message_instance = "graph"
max_status_retries = args.max_status_retries if args.max_status_retries > 0 else 1
if not using_db or (generate_token is False and args.token == ''):
if skip_prompt:
if using_db:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
res = initiate_res.json()
token = res['payload']['token']
perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
logger.info(f'got the response {res}')
res = perform_reset_res.json()
return res
else:
if snapshot:
print(f"Snapshot creation is currently unsupported for prompt skip mode. Please use "
f"%create_graph_snapshot to take a snapshot prior to attempting graph reset.")
return
try:
res = self.client.reset_graph(graph_id=graph_id, snapshot=False)
print(
f"ResetGraph call submitted successfully for graph ID [{graph_id}]. "
f"Please note that the graph may take several minutes to become available again, "
f"You can use %status or %get_graph to check the current status of the graph.\n")
print(json.dumps(res, indent=2, default=str))
except Exception as e:
print("Received an error when attempting graph reset:")
print(e)
return
output = widgets.Output()
confirm_source = f'Are you sure you want to delete all the data in your {message_instance}?'
confirm_label = widgets.Label(confirm_source)
confirm_text_hbox = widgets.HBox([confirm_label])
confirm_check_box = widgets.Checkbox(
value=False,
disabled=False,
indent=False,
description=f'I acknowledge that upon deletion the {message_instance} data will no longer be available.',
layout=widgets.Layout(width='600px', margin='5px 5px 5px 5px')
)
button_delete = widgets.Button(description="Delete")
button_cancel = widgets.Button(description="Cancel")
button_hbox = widgets.HBox([button_delete, button_cancel])
if using_db:
display(confirm_text_hbox, confirm_check_box, button_hbox, output)
else:
snapshot_source = f'OPTIONAL: Create a final graph snapshot before reset?'
snapshot_label = widgets.Label(snapshot_source)
snapshot_text_hbox = widgets.HBox([snapshot_label])
snapshot_check_box = widgets.Checkbox(
value=snapshot,
disabled=False,
indent=False,
description=f'Yes',
layout=widgets.Layout(width='600px', margin='5px 5px 5px 5px')
)
display(confirm_text_hbox, confirm_check_box,
snapshot_text_hbox, snapshot_check_box,
button_hbox, output)
def on_button_delete_clicked(b):
if using_db:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
result = initiate_res.json()
confirm_text_hbox.close()
confirm_check_box.close()
if not using_db:
snapshot_text_hbox.close()
snapshot_check_box.close()
button_delete.close()
button_cancel.close()
button_hbox.close()
if not confirm_check_box.value:
with output:
print('Reset confirmation checkbox is not checked.')
return
if using_db:
token = result['payload']['token']
if token == "":
with output:
print('Failed to get token.')
print(result)
return
perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
result = perform_reset_res.json()
if 'status' not in result or result['status'] != '200 OK':
with output:
print('Database reset failed, please see exception below for details.')
print(result)
logger.error(result)
return
else:
if snapshot_check_box.value:
snapshot_name = generate_snapshot_name(graph_id)
try:
self.client.create_graph_snapshot(graph_id=graph_id, snapshot_name=snapshot_name)
except Exception as e:
with output:
print("Graph snapshot creation request failed, please see the exception below.")
print(f"\n{e}")
logger.error(e)
return
snapshot_static_output = widgets.Output()
snapshot_progress_output = widgets.Output()
snapshot_hbox = widgets.HBox([snapshot_static_output, snapshot_progress_output])
with snapshot_static_output:
print("Creating graph snapshot, this may take several minutes")
with output:
display(snapshot_hbox)
poll_interval = 5
poll_index = 0
status_ellipses = ""
while True:
snapshot_progress_output.clear_output()
poll_index += 1
if poll_index > poll_interval:
snapshot_progress_output.clear_output()
status_ellipses = ""
interval_check_response = self.client.get_graph(graph_id=graph_id)
current_status = interval_check_response["status"]
if current_status == 'AVAILABLE':
snapshot_static_output.clear_output()
with snapshot_static_output:
print(f'Snapshot creation complete, starting reset.')
break
elif current_status != 'SNAPSHOTTING':
snapshot_static_output.clear_output()
with snapshot_static_output:
print(f'Something went wrong with the snapshot creation.')
return
poll_index = 0
else:
status_ellipses += "."
with snapshot_progress_output:
print(status_ellipses)
time.sleep(1)
snapshot_progress_output.close()
try:
result = self.client.reset_graph(graph_id=graph_id, snapshot=False)
except Exception as e:
with output:
print("Failed to initiate graph reset, please see the exception below.")
print(f"\n{e}")
logger.error(e)
return
retry = max_status_retries
poll_interval = 5
interval_output = widgets.Output()
job_status_output = widgets.Output()
status_hbox = widgets.HBox([interval_output])
vbox = widgets.VBox([status_hbox, job_status_output])
with output:
display(vbox)
last_poll_time = time.time()
interval_check_response = {}
new_interval = True
while retry > 0:
time_elapsed = int(time.time() - last_poll_time)
time_remaining = poll_interval - time_elapsed
interval_output.clear_output()
if time_elapsed > poll_interval:
with interval_output:
print('Checking status...')
job_status_output.clear_output()
new_interval = True
try:
retry -= 1
if using_db:
status_res = self.client.status()
status_res.raise_for_status()
interval_check_response = status_res.json()
else:
interval_check_response = self.client.get_graph(graph_id=graph_id)
except Exception as e:
# Exception is expected when database is resetting, continue waiting
if using_db:
with job_status_output:
last_poll_time = time.time()
time.sleep(1)
continue
else:
print('Graph status check failed, something went wrong.')
print(e)
logger.error(e)
return
job_status_output.clear_output()
with job_status_output:
done_status = 'healthy' if using_db else 'AVAILABLE'
if interval_check_response["status"] == done_status:
interval_output.close()
print(f'{message_instance.capitalize()} has been reset.')
return
last_poll_time = time.time()
else:
if new_interval:
with job_status_output:
display_html(HTML(loading_wheel_html))
new_interval = False
with interval_output:
print(f'Checking status in {time_remaining} seconds')
time.sleep(1)
with (output):
job_status_output.clear_output()
interval_output.close()
total_status_wait = max_status_retries * poll_interval
final_status = interval_check_response.get("status")
if using_db:
if final_status != 'healthy':
print(f"Could not retrieve the status of the reset operation within the allotted time of "
f"{total_status_wait} seconds. If the database is not in healthy status after at "
f"least 1 minute, please try the operation again or reboot the cluster.")
print(result)
else:
if final_status == 'RESETTING':
print(f"Reset is still in progress after the allotted wait time of {total_status_wait} "
f"seconds, halting status checks. Please use %status to verify when your graph is "
f"available again.")
print(f"\nTIP: For future resets, you can use the --max-status-retries option to extend "
f"the total wait duration.")
elif final_status != 'AVAILABLE':
print(f"Graph is not in AVAILABLE or RESETTING status after the allotted time of "
f"{total_status_wait} seconds, something went wrong. "
f"Current status is {final_status}, see below for details.")
print(result)
def on_button_cancel_clicked(b):
confirm_text_hbox.close()
confirm_check_box.close()
if not using_db:
snapshot_text_hbox.close()
snapshot_check_box.close()
button_delete.close()
button_cancel.close()
button_hbox.close()
with output:
print(f'{message_instance.capitalize()} reset operation has been canceled.')
button_delete.on_click(on_button_delete_clicked)
button_cancel.on_click(on_button_cancel_clicked)
return
elif generate_token:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
res = initiate_res.json()
else:
# args.token is an array of a single string, e.g., args.token=['ade-23-c23'], use index 0 to take the string
perform_res = self.client.perform_reset(args.token)
perform_res.raise_for_status()
res = perform_res.json()
logger.info(f'got the response {res}')
return res