interactive_engine/groot-http/example/python_example.py (373 lines of code) (raw):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A simple example to show how to use the Interactive Python SDK to interact with the Groot service.
import argparse
import os
import time
import json
import ast
from gs_interactive.client.driver import Driver
from gs_interactive.client.session import Session
from gs_interactive.models import *
test_graph_def = {
"name": "test_graph",
"description": "This is a test graph",
"schema": {
"vertex_types": [
{
"type_name": "person",
"properties": [
{
"property_name": "id",
"property_type": {"primitive_type": "DT_SIGNED_INT64"},
},
{
"property_name": "name",
"property_type": {"string": {"long_text": ""}},
},
{
"property_name": "age",
"property_type": {"primitive_type": "DT_SIGNED_INT32"},
},
],
"primary_keys": ["id"],
}
],
"edge_types": [
{
"type_name": "knows",
"vertex_type_pair_relations": [
{
"source_vertex": "person",
"destination_vertex": "person",
"relation": "MANY_TO_MANY",
}
],
"properties": [
{
"property_name": "weight",
"property_type": {"primitive_type": "DT_DOUBLE"},
}
],
"primary_keys": [],
}
],
},
}
def createGraph(sess: Session):
create_graph_request = CreateGraphRequest.from_dict(test_graph_def)
resp = sess.create_graph(create_graph_request)
if resp.is_ok():
print("The response of create_graph:\n", resp)
graph_id = resp.get_value().graph_id
return graph_id
else:
raise Exception("create_graph failed with error: %s" % resp.get_status_message())
def getGraphSchema(sess: Session, graph_id: str):
resp = sess.get_graph_schema(graph_id)
if resp.is_ok():
print("The response of get_graph_schema:\n", resp)
return resp.get_value()
else:
raise Exception("get_graph_schema failed with error: %s" % resp.get_status_message())
def deleteGraph(sess: Session, graph_id: str):
resp = sess.delete_graph(graph_id)
if resp.is_ok():
print("The response of delete_graph:\n", resp)
else:
raise Exception("delete_graph failed with error: %s" % resp.get_status_message())
def addVertexType(sess: Session, graph_id: str):
create_vertex_type = CreateVertexType(
type_name="new_person",
properties=[
CreatePropertyMeta(
property_name="id",
property_type=GSDataType.from_dict({"primitive_type": "DT_SIGNED_INT64"}),
),
CreatePropertyMeta(
property_name="name",
property_type=GSDataType.from_dict({"string": {"long_text": ""}}),
),
],
primary_keys=["id"],
)
api_response = sess.create_vertex_type(graph_id, create_vertex_type)
if api_response.is_ok():
print("The response of create_vertex_type:\n", api_response)
else:
raise Exception(
"create_vertex_type failed with error: %s" % api_response.get_status_message()
)
def updateVertexType(sess: Session, graph_id: str):
update_vertex_type = CreateVertexType(
type_name="new_person",
# add a new property "age"
properties=[
CreatePropertyMeta(
property_name="age",
property_type=GSDataType.from_dict({"primitive_type": "DT_SIGNED_INT32"}),
),
],
primary_keys=["id"],
)
api_response = sess.update_vertex_type(graph_id, update_vertex_type)
if api_response.is_ok():
print("The response of update_vertex_type:\n", api_response)
else:
raise Exception(
"update_vertex_type failed with error: %s" % api_response.get_status_message()
)
def deleteVertexType(sess: Session, graph_id: str):
api_response = sess.delete_vertex_type(graph_id, "new_person")
if api_response.is_ok():
print("The response of delete_vertex_type:\n", api_response)
else:
raise Exception(
"delete_vertex_type failed with error: %s" % api_response.get_status_message()
)
def addEdgeType(sess: Session, graph_id: str):
create_edge_type = CreateEdgeType(
type_name="new_knows",
vertex_type_pair_relations=[
BaseEdgeTypeVertexTypePairRelationsInner(
source_vertex="new_person",
destination_vertex="new_person",
relation="MANY_TO_MANY",
)
],
properties=[
CreatePropertyMeta(
property_name="weight",
property_type=GSDataType.from_dict({"primitive_type": "DT_DOUBLE"}),
)
],
)
api_response = sess.create_edge_type(graph_id, create_edge_type)
if api_response.is_ok():
print("The response of create_edge_type:\n", api_response)
else:
raise Exception(
"create_edge_type failed with error: %s" % api_response.get_status_message()
)
def updateEdgeType(sess: Session, graph_id: str):
update_edge_type = CreateEdgeType(
type_name="new_knows",
# add a new property "new_weight"
properties=[
CreatePropertyMeta(
property_name="new_weight",
property_type=GSDataType.from_dict({"primitive_type": "DT_DOUBLE"}),
),
],
vertex_type_pair_relations=[
BaseEdgeTypeVertexTypePairRelationsInner(
source_vertex="new_person",
destination_vertex="new_person",
relation="MANY_TO_MANY",
)
],
)
api_response = sess.update_edge_type(graph_id, update_edge_type)
if api_response.is_ok():
print("The response of update_edge_type:\n", api_response)
else:
raise Exception(
"update_edge_type failed with error: %s" % api_response.get_status_message()
)
def deleteEdgeType(sess: Session, graph_id: str):
api_response = sess.delete_edge_type(graph_id, "new_knows", "new_person", "new_person")
if api_response.is_ok():
print("The response of delete_edge_type:\n", api_response)
else:
raise Exception(
"delete_edge_type failed with error: %s" % api_response.get_status_message()
)
def getSnapShotStatus(sess: Session, graph_id: str, snapshot_id: int):
api_response = sess.get_snapshot_status(graph_id, snapshot_id)
if api_response.is_ok():
print("The response of get_snapshot_status", api_response)
return api_response
else:
raise Exception(
"get_snapshot_status failed with error: %s" % api_response.get_status_message()
)
def extractValueFromResponse(api_response, value_name):
value = api_response.get_value()
if isinstance(value, str):
try:
json_value = ast.literal_eval(value)
if isinstance(json_value, dict):
return json_value.get(value_name)
except (ValueError, SyntaxError):
raise ValueError("String value is not a valid dictionary format")
elif isinstance(value, dict):
return value.get(value_name)
else:
try:
return getattr(value, value_name)
except AttributeError:
raise AttributeError(f"{type(value).__name__} object has no attribute '{value_name}'")
def addVertex(sess: Session, graph_id: str):
vertex_request = [
VertexRequest(
label="person",
primary_key_values= [
ModelProperty(name="id", value=1),
],
properties=[
ModelProperty(name="name", value="Alice"),
ModelProperty(name="age", value=20),
],
),
VertexRequest(
label="person",
primary_key_values= [
ModelProperty(name="id", value=8),
],
properties=[
ModelProperty(name="name", value="mike"),
ModelProperty(name="age", value=1),
],
),
]
edge_request = [
EdgeRequest(
src_label="person",
dst_label="person",
edge_label="knows",
src_primary_key_values=[ModelProperty(name="id", value=8)],
dst_primary_key_values=[ModelProperty(name="id", value=1)],
properties=[ModelProperty(name="weight", value=7)],
),
]
params = VertexEdgeRequest(vertex_request=vertex_request, edge_request=edge_request)
api_response = sess.add_vertex(graph_id, vertex_edge_request=params)
if api_response.is_ok():
print("The response of add_vertex:\n", api_response)
return extractValueFromResponse(api_response, "snapshot_id")
else:
raise Exception(
"add_vertex failed with error: %s" % api_response.get_status_message()
)
def updateVertex(sess: Session, graph_id: str):
name_property = ModelProperty(name="name", value="Cindy")
age_property = ModelProperty(name="age", value=24)
vertex_request = VertexRequest(
label="person",
primary_key_values= [
ModelProperty(name="id", value=8),
],
properties=[
name_property,
age_property,
],
)
api_response = sess.update_vertex(graph_id, [vertex_request])
if api_response.is_ok():
print("The response of update_vertex:\n", api_response)
return extractValueFromResponse(api_response, "snapshot_id")
else:
raise Exception(
"update_vertex failed with error: %s" % api_response.get_status_message()
)
def deleteVertex(sess: Session, graph_id: str):
label = "person" # str | The label name of querying vertex.
primary_key_value = ModelProperty(name="id", value=1) # object | The primary key value of querying vertex.
delete_vertex_request = DeleteVertexRequest(
label=label, primary_key_values=[primary_key_value]
)
api_response = sess.delete_vertex(graph_id, [delete_vertex_request])
if api_response.is_ok():
print("The response of delete_vertex:\n", api_response)
return extractValueFromResponse(api_response, "snapshot_id")
else:
raise Exception(
"delete_vertex failed with error: %s" % api_response.get_status_message()
)
def updateEdge(sess: Session, graph_id: str):
properties = [ModelProperty(name="weight", value=3)]
edge_request = EdgeRequest(
src_label="person",
dst_label="person",
edge_label="knows",
src_primary_key_values=[ModelProperty(name="id", value=1)],
dst_primary_key_values=[ModelProperty(name="id", value=8)],
properties=properties,
)
resp = sess.update_edge(graph_id, [edge_request])
if resp.is_ok():
print("The response of update_edge:\n", resp)
return extractValueFromResponse(resp, "snapshot_id")
else:
raise Exception("update_edge failed with error: %s" % resp.get_status_message())
def deleteEdge(sess: Session, graph_id: str):
src_label = "person"
dst_label = "person"
edge_label = "knows"
src_primary_key_values = [ModelProperty(name="id", value=1)]
dst_primary_key_values = [ModelProperty(name="id", value=8)]
delete_edge_request = DeleteEdgeRequest(
src_label=src_label,
dst_label=dst_label,
edge_label=edge_label,
src_primary_key_values=src_primary_key_values,
dst_primary_key_values=dst_primary_key_values,
)
api_response = sess.delete_edge(
graph_id,
[delete_edge_request],
)
if api_response.is_ok():
print("The response of delete_edge:\n", api_response)
return extractValueFromResponse(api_response, "snapshot_id")
else:
raise Exception(
"delete_edge failed with error: %s" % api_response.get_status_message()
)
def addEdge(sess: Session, graph_id: str):
edge_request = [
EdgeRequest(
src_label="person",
dst_label="person",
edge_label="knows",
src_primary_key_values=[ModelProperty(name="id", value=1)],
dst_primary_key_values=[ModelProperty(name="id", value=8)],
properties=[ModelProperty(name="weight", value=9.123)],
),
EdgeRequest(
src_label="person",
dst_label="person",
edge_label="knows",
src_primary_key_values=[ModelProperty(name="id", value=2)],
dst_primary_key_values=[ModelProperty(name="id", value=8)],
properties=[ModelProperty(name="weight", value=3.233)],
),
]
api_response = sess.add_edge(graph_id, edge_request)
if api_response.is_ok():
print("The response of add_edge:\n", api_response)
return extractValueFromResponse(api_response, "snapshot_id")
else:
raise Exception(
"add_edge failed with error: %s" % api_response.get_status_message()
)
if __name__ == "__main__":
# expect one argument: interactive_endpoint
parser = argparse.ArgumentParser(description="Example Python3 script")
# Parse the arguments
args = parser.parse_args()
driver = Driver()
with driver.session() as sess:
graph_id = "test_graph"
# start from a clean graph
deleteGraph(sess, graph_id)
current_schema = getGraphSchema(sess, graph_id)
createGraph(sess)
# dml operations
addVertex(sess, graph_id)
updateVertex(sess, graph_id)
deleteVertex(sess, graph_id)
addEdge(sess, graph_id)
updateEdge(sess, graph_id)
si = deleteEdge(sess, graph_id)
# get snapshot status, wait for the snapshot to be ready
resp = getSnapShotStatus(sess, graph_id, si)
while extractValueFromResponse(resp, "status") != "AVAILABLE":
time.sleep(1)
resp = getSnapShotStatus(sess, graph_id, si)
# ddl operations
addVertexType(sess, graph_id)
updateVertexType(sess, graph_id)
addEdgeType(sess, graph_id)
updateEdgeType(sess, graph_id)
deleteEdgeType(sess, graph_id)
deleteVertexType(sess, graph_id)