in src/graph_notebook/magics/graph_magic.py [0:0]
def gremlin(self, line, cell, local_ns: dict = None):
parser = argparse.ArgumentParser()
parser.add_argument('query_mode', nargs='?', default='query',
help='query mode (default=query) [query|explain|profile]')
parser.add_argument('-cp', '--connection-protocol', type=str.lower, default='',
help=f'Neptune endpoints only. Connection protocol to use for connecting to the Gremlin '
f'database - either Websockets or HTTP. Valid inputs: {GREMLIN_PROTOCOL_FORMATS}. '
f'If not specified, defaults to the value of the gremlin.connection_protocol field '
f'in %%graph_notebook_config. Please note that this option has no effect on the '
f'Profile and Explain modes, which must use HTTP.')
parser.add_argument('-qp', '--query-parameters', type=str, default='',
help='Parameter definitions to apply to the query. This option can accept a local variable '
'name, or a string representation of the map.')
parser.add_argument('--explain-type', type=str.lower, default='dynamic',
help=f'Explain mode to use when using the explain query mode. '
f'Accepted values: {GREMLIN_EXPLAIN_MODES}')
parser.add_argument('-p', '--path-pattern', default='', help='path pattern')
parser.add_argument('-g', '--group-by', type=str, default='',
help='Property used to group nodes (e.g. code, T.region) default is T.label')
parser.add_argument('-gd', '--group-by-depth', action='store_true', default=False,
help="Group nodes based on path hierarchy")
parser.add_argument('-gr', '--group-by-raw', action='store_true', default=False,
help="Group nodes by the raw result")
parser.add_argument('-d', '--display-property', type=str, default='',
help='Property to display the value of on each node, default is T.label')
parser.add_argument('-de', '--edge-display-property', type=str, default='',
help='Property to display the value of on each edge, default is T.label')
parser.add_argument('-t', '--tooltip-property', type=str, default='',
help='Property to display the value of on each node tooltip. If not specified, tooltip '
'will default to the node label value.')
parser.add_argument('-te', '--edge-tooltip-property', type=str, default='',
help='Property to display the value of on each edge tooltip. If not specified, tooltip '
'will default to the edge label value.')
parser.add_argument('-l', '--label-max-length', type=int, default=10,
help='Specifies max length of vertex label, in characters. Default is 10')
parser.add_argument('-le', '--edge-label-max-length', type=int, default=10,
help='Specifies max length of edge labels, in characters. Default is 10')
parser.add_argument('--store-to', type=str, default='', help='store query result to this variable')
parser.add_argument('--store-format', type=str.lower, default='json',
help=f'Configures export type when using --store-to with base query mode. '
f'Valid inputs: {QUERY_STORE_TO_FORMATS}. Default is JSON')
parser.add_argument('--export-to', type=str, default='',
help='Export the base query mode CSV result to the provided file path.')
parser.add_argument('--ignore-groups', action='store_true', default=False, help="Ignore all grouping options")
parser.add_argument('--profile-no-results', action='store_false', default=True,
help='Display only the result count. If not used, all query results will be displayed in '
'the profile report by default.')
parser.add_argument('--profile-chop', type=int, default=250,
help='Property to specify max length of profile results string. Default is 250')
parser.add_argument('--profile-serializer', type=str, default='GRAPHSON_V3',
help='Specify how to serialize results. Allowed values are any of the valid MIME type or '
'TinkerPop driver "Serializers" enum values. Default is GRAPHSON_V3')
parser.add_argument('--profile-indexOps', action='store_true', default=False,
help='Show a detailed report of all index operations.')
parser.add_argument('--profile-debug', action='store_true', default=False,
help='Enable debug mode.')
parser.add_argument('--profile-misc-args', type=str, default='{}',
help='Additional profile options, passed in as a map.')
parser.add_argument('--use-port', action='store_true', default=False,
help='Includes the port in the URI for applicable Neptune HTTP requests where it is '
'excluded by default.')
parser.add_argument('-sp', '--stop-physics', action='store_true', default=False,
help="Disable visualization physics after the initial simulation stabilizes.")
parser.add_argument('-sd', '--simulation-duration', type=int, default=1500,
help='Specifies maximum duration of visualization physics simulation. Default is 1500ms')
parser.add_argument('--silent', action='store_true', default=False, help="Display no query output.")
parser.add_argument('-ct', '--connected-table', action='store_true', default=False,
help=f'Dynamically load jQuery and DataTables resources for iTables. For more information, see: '
f'https://mwouts.github.io/itables/quick_start.html#offline-mode-versus-connected-mode')
parser.add_argument('-r', '--results-per-page', type=int, default=10,
help='Specifies how many query results to display per page in the output. Default is 10')
parser.add_argument('--no-scroll', action='store_true', default=False,
help="Display the entire output without a scroll bar.")
parser.add_argument('--hide-index', action='store_true', default=False,
help="Hide the index column numbers when displaying the results.")
parser.add_argument('-mcl', '--max-content-length', type=str, default='',
help="Specifies maximum size (in bytes) of results that can be returned to the "
"GremlinPython client. Abbreviated memory units (ex.'50MB') are accepted. "
"Default is 10MB")
args = parser.parse_args(line.split())
mode = str_to_query_mode(args.query_mode)
logger.debug(f'Arguments {args}')
results_df = None
query_params = None
if args.query_parameters:
if args.query_parameters in local_ns:
query_params_input = local_ns[args.query_parameters]
else:
query_params_input = args.query_parameters
if isinstance(query_params_input, dict):
query_params = json.dumps(query_params_input)
else:
try:
query_params_dict = json.loads(query_params_input.replace("'", '"'))
query_params = json.dumps(query_params_dict)
except Exception as e:
print(f"Invalid query parameter input, ignoring.")
if args.no_scroll:
gremlin_layout = UNRESTRICTED_LAYOUT
gremlin_scrollY = True
gremlin_scrollCollapse = False
gremlin_paging = False
else:
gremlin_layout = DEFAULT_LAYOUT
gremlin_scrollY = "475px"
gremlin_scrollCollapse = True
gremlin_paging = True
if not args.silent:
tab = widgets.Tab()
children = []
titles = []
first_tab_output = widgets.Output(layout=gremlin_layout)
children.append(first_tab_output)
mcl_bytes = mcl_to_bytes(args.max_content_length)
transport_args = {'max_content_length': mcl_bytes}
if mode == QueryMode.EXPLAIN:
try:
explain_args = {}
if args.explain_type:
explain_args['explain.mode'] = args.explain_type
if self.client.is_analytics_domain() and query_params:
explain_args['parameters'] = query_params
res = self.client.gremlin_explain(cell,
args=explain_args,
use_port=args.use_port)
res.raise_for_status()
except Exception as e:
if self.client.is_analytics_domain():
print("%%gremlin is incompatible with Neptune Analytics.")
raise e
# Replace strikethrough character bytes, can't be encoded to ASCII
explain_bytes = res.content.replace(b'\xcc', b'-')
explain_bytes = explain_bytes.replace(b'\xb6', b'')
query_res = explain_bytes.decode('utf-8')
if not args.silent:
gremlin_metadata = build_gremlin_metadata_from_query(query_type='explain', results=query_res, res=res)
titles.append('Explain')
if 'Neptune Gremlin Explain' in query_res:
explain_bytes = query_res.encode('ascii', 'ignore')
base64_str = base64.b64encode(explain_bytes).decode('ascii')
first_tab_html = gremlin_explain_profile_template.render(content=query_res,
link=f"data:text/html;base64,{base64_str}")
else:
first_tab_html = pre_container_template.render(content='No explain found')
elif mode == QueryMode.PROFILE:
logger.debug(f'results: {args.profile_no_results}')
logger.debug(f'chop: {args.profile_chop}')
logger.debug(f'serializer: {args.profile_serializer}')
logger.debug(f'indexOps: {args.profile_indexOps}')
if args.profile_serializer in serializers_map:
serializer = serializers_map[args.profile_serializer]
else:
serializer = args.profile_serializer
profile_args = {"profile.results": args.profile_no_results,
"profile.chop": args.profile_chop,
"profile.serializer": serializer,
"profile.indexOps": args.profile_indexOps,
"profile.debug": args.profile_debug}
if self.client.is_analytics_domain() and query_params:
profile_args['parameters'] = query_params
try:
profile_misc_args_dict = json.loads(args.profile_misc_args)
profile_args.update(profile_misc_args_dict)
except JSONDecodeError:
print('--profile-misc-args received invalid input, please check that you are passing in a valid '
'string representation of a map, ex. "{\'profile.x\':\'true\'}"')
try:
res = self.client.gremlin_profile(query=cell,
args=profile_args,
use_port=args.use_port)
res.raise_for_status()
except Exception as e:
if self.client.is_analytics_domain():
print("%%gremlin is incompatible with Neptune Analytics.")
raise e
profile_bytes = res.content.replace(b'\xcc', b'-')
profile_bytes = profile_bytes.replace(b'\xb6', b'')
query_res = profile_bytes.decode('utf-8')
if not args.silent:
gremlin_metadata = build_gremlin_metadata_from_query(query_type='profile', results=query_res, res=res)
titles.append('Profile')
if 'Neptune Gremlin Profile' in query_res:
explain_bytes = query_res.encode('ascii', 'ignore')
base64_str = base64.b64encode(explain_bytes).decode('ascii')
first_tab_html = gremlin_explain_profile_template.render(content=query_res,
link=f"data:text/html;base64,{base64_str}")
else:
first_tab_html = pre_container_template.render(content='No profile found')
else:
using_http = False
query_start = time.time() * 1000 # time.time() returns time in seconds w/high precision; x1000 to get in ms
if self.client.is_neptune_domain():
if args.connection_protocol != '':
connection_protocol, bad_protocol_input = normalize_protocol_name(args.connection_protocol)
if bad_protocol_input:
if self.client.is_analytics_domain():
connection_protocol = DEFAULT_HTTP_PROTOCOL
else:
connection_protocol = DEFAULT_WS_PROTOCOL
print(f"Connection protocol input is invalid for Neptune, "
f"defaulting to {connection_protocol}.")
if connection_protocol == DEFAULT_WS_PROTOCOL and \
self.graph_notebook_config.gremlin.message_serializer not in GREMLIN_SERIALIZERS_WS:
print(f"Serializer is unsupported for GremlinPython client, "
f"compatible serializers are: {GREMLIN_SERIALIZERS_WS}")
print("Defaulting to HTTP protocol.")
connection_protocol = DEFAULT_HTTP_PROTOCOL
else:
connection_protocol = self.graph_notebook_config.gremlin.connection_protocol
try:
if connection_protocol == DEFAULT_HTTP_PROTOCOL:
using_http = True
headers = {}
message_serializer = self.graph_notebook_config.gremlin.message_serializer
message_serializer_mime = get_gremlin_serializer_mime(message_serializer, DEFAULT_HTTP_PROTOCOL)
if message_serializer_mime != GRAPHSONV4_UNTYPED:
headers['Accept'] = message_serializer_mime
passed_params = query_params if self.client.is_analytics_domain() else None
query_res_http = self.client.gremlin_http_query(cell,
headers=headers,
query_params=passed_params,
use_port=args.use_port)
query_res_http.raise_for_status()
try:
query_res_http_json = query_res_http.json()
except JSONDecodeError:
query_res_fixed = repair_json(query_res_http.text)
query_res_http_json = json.loads(query_res_fixed)
if 'result' in query_res_http_json:
query_res = query_res_http_json['result']['data']
else:
if 'reason' in query_res_http_json:
logger.debug('Query failed with internal error, see response.')
else:
logger.debug('Received unexpected response format, outputting as single entry.')
query_res = [query_res_http_json]
else:
query_res = self.client.gremlin_query(cell, transport_args=transport_args)
except Exception as e:
if self.client.is_analytics_domain():
print("%%gremlin is incompatible with Neptune Analytics.")
raise e
else:
try:
query_res = self.client.gremlin_query(cell, transport_args=transport_args)
except Exception as e:
store_to_ns(args.store_to, {'error': str(e)[5:]}, local_ns) # remove the leading error code.
raise e
query_time = time.time() * 1000 - query_start
if not args.silent:
gremlin_metadata = build_gremlin_metadata_from_query(query_type='query', results=query_res,
query_time=query_time)
titles.append('Console')
gremlin_network = None
try:
logger.debug(f'groupby: {args.group_by}')
logger.debug(f'display_property: {args.display_property}')
logger.debug(f'edge_display_property: {args.edge_display_property}')
logger.debug(f'label_max_length: {args.label_max_length}')
logger.debug(f'ignore_groups: {args.ignore_groups}')
gn = GremlinNetwork(group_by_property=args.group_by,
display_property=args.display_property,
group_by_raw=args.group_by_raw,
group_by_depth=args.group_by_depth,
edge_display_property=args.edge_display_property,
tooltip_property=args.tooltip_property,
edge_tooltip_property=args.edge_tooltip_property,
label_max_length=args.label_max_length,
edge_label_max_length=args.edge_label_max_length,
ignore_groups=args.ignore_groups,
using_http=using_http)
if using_http and 'path()' in cell and query_res and isinstance(query_res, list):
first_path = query_res[0]
if isinstance(first_path, dict) and first_path.keys() == {'labels', 'objects'}:
query_res_to_path_type = []
for path in query_res:
new_path_list = path['objects']
new_path = Path(labels=[], objects=new_path_list)
query_res_to_path_type.append(new_path)
query_res = query_res_to_path_type
if args.path_pattern == '':
gn.add_results(query_res, is_http=using_http)
else:
pattern = parse_pattern_list_str(args.path_pattern)
gn.add_results_with_pattern(query_res, pattern)
gremlin_network = gn
logger.debug(f'number of nodes is {len(gn.graph.nodes)}')
except ValueError as value_error:
logger.debug(
f'Unable to create graph network from result due to error: {value_error}. '
f'Skipping from result set.')
if gremlin_network and len(gremlin_network.graph.nodes) > 0:
try:
self.graph_notebook_vis_options['physics']['disablePhysicsAfterInitialSimulation'] \
= args.stop_physics
self.graph_notebook_vis_options['physics']['simulationDuration'] = args.simulation_duration
f = Force(network=gremlin_network, options=self.graph_notebook_vis_options)
titles.append('Graph')
children.append(f)
logger.debug('added gremlin network to tabs')
except Exception as force_error:
logger.debug(
f'Unable to render visualization from graph network due to error: {force_error}. Skipping.')
# Check if we can access the CDNs required by itables library.
# If not, then render our own HTML template.
mixed_results = False
if query_res:
# If the results set contains multiple datatypes, and the first result is a map or list, we need to
# insert a temp string first element, or we will get an error when creating the Dataframe.
first_res_type = type(query_res[0])
if first_res_type in [dict, list, set] and len(query_res) > 1:
if not all(isinstance(x, first_res_type) for x in query_res[1:]):
mixed_results = True
query_res_deque = deque(query_res)
query_res_deque.appendleft('x')
query_res = list(query_res_deque)
results_df = pd.DataFrame(query_res).convert_dtypes()
# Checking for created indices instead of the df itself here, as df.empty will still return True when
# only empty maps/lists are present in the data.
if not results_df.index.empty:
query_res_reformat = []
for result in query_res:
fixed_result = encode_html_chars(result)
query_res_reformat.append([fixed_result])
query_res_reformat.append([{'__DUMMY_KEY__': ['DUMMY_VALUE']}])
results_df = pd.DataFrame(query_res_reformat)
if mixed_results:
results_df = results_df[1:]
results_df.drop(results_df.index[-1], inplace=True)
results_df.insert(0, "#", range(1, len(results_df) + 1))
if len(results_df.columns) == 2 and int(results_df.columns[1]) == 0:
results_df.rename({results_df.columns[1]: 'Result'}, axis='columns', inplace=True)
else:
results_df.insert(1, "Result", [])
results_df.set_index('#', inplace=True)
results_df.columns.name = results_df.index.name
results_df.index.name = None
if not args.silent:
metadata_output = widgets.Output(layout=gremlin_layout)
titles.append('Query Metadata')
children.append(metadata_output)
tab.children = children
for i in range(len(titles)):
tab.set_title(i, titles[i])
display(tab)
with metadata_output:
display(HTML(gremlin_metadata.to_html()))
with first_tab_output:
if mode == QueryMode.DEFAULT:
visible_results, final_pagination_options, final_pagination_menu = generate_pagination_vars(
args.results_per_page)
gremlin_columndefs = [
{"type": "string", "targets": "_all"},
{"width": "5%", "targets": 0},
{"visible": True, "targets": 0},
{"searchable": False, "targets": 0},
{"minWidth": "95%", "targets": 1},
{"className": "nowrap dt-left", "targets": "_all"},
{"createdCell": JavascriptFunction(index_col_js), "targets": 0},
{"createdCell": JavascriptFunction(cell_style_js), "targets": "_all"},
]
if args.hide_index:
gremlin_columndefs[1]["visible"] = False
init_notebook_mode(connected=args.connected_table)
show(results_df,
connected=args.connected_table,
scrollX=True,
scrollY=gremlin_scrollY,
columnDefs=gremlin_columndefs,
paging=gremlin_paging,
scrollCollapse=gremlin_scrollCollapse,
lengthMenu=[final_pagination_options, final_pagination_menu],
pageLength=visible_results,
buttons=[
"pageLength",
{
"extend": "copyHtml5",
"text": "Copy",
"exportOptions": RESULTS_EXPORT_OPTIONS
},
{
"extend": "csvHtml5",
"title": GREMLIN_RESULTS_FILENAME,
"text": "Download CSV",
"exportOptions": RESULTS_EXPORT_OPTIONS
},
{
"extend": "excelHtml5",
"filename": GREMLIN_RESULTS_FILENAME,
"title": None,
"text": "Download XLSX",
"exportOptions": RESULTS_EXPORT_OPTIONS
}
]
)
else: # Explain/Profile
display(HTML(first_tab_html))
if mode == QueryMode.DEFAULT and results_df is not None:
json_results = query_res
res_store_type = args.store_format
res_export_path = args.export_to
if res_store_type in PANDAS_FORMATS or res_export_path != '':
results_df = process_df_for_store(language='gremlin',
results_df=results_df)
stored_results = get_results_for_store(store_type=res_store_type,
pandas_results=results_df,
json_results=json_results)
export_csv_results(export_path=res_export_path,
results_df=results_df)
else:
stored_results = query_res
store_to_ns(args.store_to, stored_results, local_ns)