python/graphscope/framework/graph_schema.py (561 lines of code) (raw):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import hashlib
import itertools
import json
from collections import namedtuple
from typing import List
from graphscope.framework.utils import data_type_to_unified_type
from graphscope.framework.utils import unified_type_to_data_type
from graphscope.framework.utils import unify_type
from graphscope.proto import ddl_service_pb2
from graphscope.proto import graph_def_pb2
class Property:
def __init__(
self, name, data_type, is_primary_key=False, property_id=0, comment=""
):
self.name: str = name
self.data_type: int = data_type
self.is_primary_key: bool = is_primary_key
self.id: int = property_id
self.comment = comment
self.inner_id: int = 0
self.default_value = None
def as_property_def(self):
pb = graph_def_pb2.PropertyDefPb()
pb.name = self.name
pb.data_type = self.data_type
pb.pk = self.is_primary_key
pb.comment = self.comment
return pb
@property
def type(self):
return self.data_type
@classmethod
def from_property_def(cls, pb):
prop = cls(pb.name, pb.data_type, pb.pk, pb.id, pb.comment)
prop.inner_id = pb.inner_id
prop.default_value = pb.default_value
return prop
def __repr__(self) -> str:
type_str = graph_def_pb2.DataTypePb.Name(self.data_type)
return f"Property({self.id}, {self.name}, {type_str}, {self.is_primary_key}, {self.comment})"
def __str__(self) -> str:
return self.__repr__()
def to_dict(self) -> dict:
return {
"property_name": self.name,
"property_id": self.id,
"property_type": data_type_to_unified_type(self.data_type),
"is_primary_key": self.is_primary_key,
"description": self.comment,
}
Relation = namedtuple("Relation", "source destination")
class Label:
__slots__ = [
"_name",
"_props",
"_version_id",
"_label_id",
"_valid_props",
"_prop_index",
"_comment",
]
def __init__(self, name, label_id=0):
self._name: str = name
self._props: List[Property] = []
self._version_id: int = 0
self._label_id: int = label_id
self._valid_props: List[int] = []
self._prop_index: dict[str, int] = {}
self._comment: str = ""
def add_property(self, name, data_type, is_primary_key=False, comment=""):
self._prop_index[name] = len(self._props)
if isinstance(data_type, str):
data_type = unify_type(data_type)
self._props.append(
Property(name, data_type, is_primary_key, len(self._props), comment)
)
self._valid_props.append(1)
return self
def set_comment(self, comment):
self._comment = comment
return self
@property
def id(self) -> int:
return self._label_id
@property
def label(self) -> str:
return self._name
@property
def properties(self) -> List:
return list(itertools.compress(self._props, self._valid_props))
@property
def comment(self):
return self._comment
def get_property_id(self, name):
idx = self._prop_index[name]
if not self._valid_props[idx]:
raise ValueError(f"{name} not exist in properties")
return idx
def property_exists(self, name):
return (name in self._prop_index) and (
self._valid_props[self._prop_index[name]]
)
def __repr__(self) -> str:
s = f"Label: {self.label}\nProperties: {', '.join([str(p) for p in self.properties])}\nComment: {self.comment}"
return s
def __str__(self) -> str:
return self.__repr__()
def to_dict(self) -> dict:
properties = []
primary_keys = []
for p in self.properties:
properties.append(p.to_dict())
if p.is_primary_key:
primary_keys.append(p.name)
return {
"type_name": self.label,
"properties": properties,
"primary_keys": primary_keys,
"description": self.comment,
}
@property
def type_enum(self):
raise NotImplementedError()
def as_type_def(self):
pb = graph_def_pb2.TypeDefPb()
pb.label = self._name
pb.type_enum = self.type_enum
pb.comment = self._comment
for prop in self.properties:
pb.props.append(prop.as_property_def())
return pb
@classmethod
def from_type_def(cls, pb):
label = cls(pb.label)
label._label_id = pb.label_id.id
label._version_id = pb.version_id
label._comment = pb.comment
for prop_pb in pb.props:
label._props.append(Property.from_property_def(prop_pb))
label._valid_props.append(1)
return label
class VertexLabel(Label):
__slots__ = []
@property
def type_enum(self):
return graph_def_pb2.TypeEnumPb.VERTEX
def add_primary_key(self, name, data_type, comment=""):
return self.add_property(name, data_type, True, comment)
class EdgeLabel(Label):
__slots__ = ["_relations"]
def __init__(self, name, label_id=0):
super().__init__(name, label_id)
self._relations: List[Relation] = []
@property
def type_enum(self):
return graph_def_pb2.TypeEnumPb.EDGE
def source(self, label):
self._relations.append(Relation(label, ""))
return self
def destination(self, label):
assert (
self._relations
), "Found empty relation, maybe you should use `source` first."
assert not self._relations[-1].destination, "An destination is already exists."
self._relations[-1] = self._relations[-1]._replace(destination=label)
return self
@property
def relations(self) -> List[Relation]:
return self._relations
def __repr__(self) -> str:
s = super().__repr__()
if self._relations:
s += f"Relations: {self.relations}"
return s
def to_dict(self) -> dict:
# super dict
sd = super().to_dict()
relations = []
for r in self._relations:
relations.append(
{"source_vertex": r.source, "destination_vertex": r.destination}
)
sd.update({"vertex_type_pair_relations": relations})
return sd
class GraphSchema:
"""Hold schema of a graph.
Attributes:
oid_type (str): Original ID type
vid_type (str): Internal ID representation
vdata_type (str): Type of the data that holding by vertex (simple graph only)
edata_type (str): Type of the data that holding by edge (simple graph only)
vertex_labels (list): Label names of vertex
edge_labels (list): Label names of edge
edge_relationships (list(list(tuple))): Source label and destination label of each edge label
"""
def __init__(self):
self._conn = None
self._oid_type = None
self._vid_type = None
# simple graph only
self._vdata_type = graph_def_pb2.UNKNOWN
self._edata_type = graph_def_pb2.UNKNOWN
# list of entries
self._vertex_labels: List[VertexLabel] = []
self._edge_labels: List[EdgeLabel] = []
self._vertex_labels_to_add: List[VertexLabel] = []
self._edge_labels_to_add: List[EdgeLabel] = []
self._vertex_labels_to_drop: List[VertexLabel] = []
self._edge_labels_to_drop: List[EdgeLabel] = []
self._vertex_labels_to_add_property: List[VertexLabel] = []
self._edge_labels_to_add_property: List[VertexLabel] = []
# 1 indicate valid, 0 indicate invalid.
self._valid_vertices = []
self._valid_edges = []
self._v_label_index = {}
self._e_label_index = {}
def from_graph_def(self, graph_def):
if graph_def.extension.Is(graph_def_pb2.VineyardInfoPb.DESCRIPTOR):
return self._from_vineyard(graph_def)
if graph_def.extension.Is(graph_def_pb2.MutableGraphInfoPb.DESCRIPTOR):
return self._from_mutable_graph(graph_def)
return self._from_store_service(graph_def)
def _from_store_service(self, graph_def):
"""Decode information from proto message, generated by engine.
Args:
schema_def (`GraphSchemaDef`): Proto message defined in `proto/graph_def.proto`.
Raises:
ValueError: If the schema is not valid.
"""
self.clear()
id_to_label = {}
for type_def_pb in graph_def.type_defs:
id_to_label[type_def_pb.label_id.id] = type_def_pb.label
edge_kinds = {}
for kind in graph_def.edge_kinds:
edge_label = id_to_label[kind.edge_label_id.id]
if edge_label not in edge_kinds:
edge_kinds[edge_label] = []
edge_kinds[edge_label].append(
(
kind.src_vertex_label,
kind.dst_vertex_label,
)
)
for type_def_pb in graph_def.type_defs:
if type_def_pb.type_enum == graph_def_pb2.VERTEX:
self._v_label_index[type_def_pb.label] = len(self._vertex_labels)
self._vertex_labels.append(VertexLabel.from_type_def(type_def_pb))
self._valid_vertices.append(1)
else:
label = EdgeLabel.from_type_def(type_def_pb)
if label.label in edge_kinds:
for src, dst in edge_kinds[label.label]:
label.source(src).destination(dst)
self._e_label_index[type_def_pb.label] = len(self._edge_labels)
self._edge_labels.append(label)
self._valid_edges.append(1)
return self
def _from_vineyard(self, graph_def):
self.clear()
vy_info = graph_def_pb2.VineyardInfoPb()
graph_def.extension.Unpack(vy_info)
self._oid_type = vy_info.oid_type
self._vid_type = vy_info.vid_type
# simple graph schema.
if vy_info.vdata_type:
self._vdata_type = unify_type(vy_info.vdata_type)
if vy_info.edata_type:
self._edata_type = unify_type(vy_info.edata_type)
# property graph schema
if vy_info.property_schema_json:
try:
schema = json.loads(vy_info.property_schema_json)
if schema:
for item in schema["types"]:
def add_common_attributes(entry, item):
for prop in item["propertyDefList"]:
entry.add_property(
prop["name"], unify_type(prop["data_type"])
)
entry._valid_props = item["valid_properties"]
if item["type"] == "VERTEX":
entry = VertexLabel(item["label"], item["id"])
assert entry.id == len(self._vertex_labels)
add_common_attributes(entry, item)
self._vertex_labels.append(entry)
self._v_label_index[entry.label] = entry.id
elif item["type"] == "EDGE":
entry = EdgeLabel(item["label"], item["id"])
assert entry.id == len(self._edge_labels)
for rel in item["rawRelationShips"]:
entry.source(rel["srcVertexLabel"]).destination(
rel["dstVertexLabel"]
)
add_common_attributes(entry, item)
self._edge_labels.append(entry)
self._e_label_index[entry.label] = entry.id
self._valid_vertices = schema["valid_vertices"]
self._valid_edges = schema["valid_edges"]
except Exception as e:
raise ValueError("Invalid property graph schema") from e
return self
def _from_mutable_graph(self, graph_def):
graph_info = graph_def_pb2.MutableGraphInfoPb()
graph_def.extension.Unpack(graph_info)
# simple graph schema
if graph_info.vdata_type:
self._vdata_type = unify_type(graph_info.vdata_type)
if graph_info.edata_type:
self._edata_type = unify_type(graph_info.edata_type)
def __repr__(self):
s = ""
if self._oid_type is not None:
s += f"oid_type: {graph_def_pb2.DataTypePb.Name(self._oid_type)}\n"
if self._oid_type is not None:
s += f"vid_type: {graph_def_pb2.DataTypePb.Name(self._vid_type)}\n"
if (
self._vdata_type != graph_def_pb2.UNKNOWN
and self._edata_type != graph_def_pb2.UNKNOWN
):
s += f"vdata_type: {graph_def_pb2.DataTypePb.Name(self._vdata_type)}\n"
s += f"edata_type: {graph_def_pb2.DataTypePb.Name(self._edata_type)}\n"
for entry in self._valid_vertex_labels():
s += f"type: VERTEX\n{str(entry)}\n"
for entry in self._valid_edge_labels():
s += f"type: EDGE\n{str(entry)}\n"
return s
def __str__(self):
return self.__repr__()
def to_dict(self) -> dict:
vertices = []
for entry in self._valid_vertex_labels():
vertices.append(entry.to_dict())
edges = []
for entry in self._valid_edge_labels():
edges.append(entry.to_dict())
return {"vertex_types": vertices, "edge_types": edges}
def from_dict(self, input: dict):
if self._vertex_labels or self._edge_labels:
raise RuntimeError("Cannot load schema from dict within a non-empty graph.")
try:
self.clear()
vertices: list[dict] = input["vertex_types"]
edges: list[dict] = input["edge_types"]
for vertex in vertices:
label = VertexLabel(vertex["type_name"])
label.set_comment(vertex.get("description", ""))
primary_keys = vertex.get("primary_keys", [])
for prop in vertex["properties"]:
is_primary_key = prop["property_name"] in primary_keys
label = label.add_property(
prop["property_name"],
unified_type_to_data_type(prop["property_type"]),
is_primary_key,
prop.get("description", ""),
)
self._vertex_labels_to_add.append(label)
for edge in edges:
label = EdgeLabel(edge["type_name"])
label.set_comment(edge.get("description", ""))
primary_keys = edge.get("primary_keys", [])
for prop in edge["properties"]:
is_primary_key = prop["property_name"] in primary_keys
label = label.add_property(
prop["property_name"],
unified_type_to_data_type(prop["property_type"]),
is_primary_key,
prop.get("description", ""),
)
for rel in edge["vertex_type_pair_relations"]:
label = label.source(rel["source_vertex"]).destination(
rel["destination_vertex"]
)
self._edge_labels_to_add.append(label)
except Exception as e:
self.clear()
raise RuntimeError("Construct schema from dict failed!") from e
@property
def oid_type(self):
return self._oid_type
@property
def vid_type(self):
return self._vid_type
@property
def vdata_type(self):
# NB: simple graph only contain a single vertex property.
return self._vdata_type
@property
def edata_type(self):
# NB: simple graph only contain a single edge property.
return self._edata_type
def _valid_vertex_labels(self):
return itertools.compress(self._vertex_labels, self._valid_vertices)
def _valid_edge_labels(self):
return itertools.compress(self._edge_labels, self._valid_edges)
@property
def vertex_labels(self):
return [entry.label for entry in self._valid_vertex_labels()]
@property
def edge_labels(self):
return [entry.label for entry in self._valid_edge_labels()]
@property
def edge_relationships(self):
return [entry.relations for entry in self._valid_edge_labels()]
def get_relationships(self, label):
if label not in self._e_label_index:
raise KeyError(f"{label} not exists.")
label_id = self._e_label_index[label]
if not self._valid_edges[label_id]:
raise ValueError(f"{label} not exists.")
return self._edge_labels[self._e_label_index[label]].relations
@property
def vertex_label_num(self):
return sum(self._valid_vertices)
@property
def edge_label_num(self):
return sum(self._valid_edges)
def get_vertex_properties(self, label):
return self._vertex_labels[self.get_vertex_label_id(label)].properties
def get_edge_properties(self, label):
return self._edge_labels[self.get_edge_label_id(label)].properties
def vertex_properties_num(self, label):
return len(self._vertex_labels[self.get_vertex_label_id(label)].properties)
def edge_properties_num(self, label):
return len(self._edge_labels[self.get_edge_label_id(label)].properties)
def get_vertex_label_id(self, label):
if label not in self._v_label_index:
raise KeyError(f"{label} not exists.")
idx = self._v_label_index[label]
if not self._valid_vertices[idx]:
raise ValueError(f"Vertex {label} not exists in graph")
return idx
def get_edge_label_id(self, label):
if label not in self._e_label_index:
raise KeyError(f"{label} not exists.")
idx = self._e_label_index[label]
if not self._valid_edges[idx]:
raise ValueError(f"Edge {label} not exists in graph")
return idx
def get_vertex_property_id(self, label, prop):
return self._vertex_labels[self.get_vertex_label_id(label)].get_property_id(
prop
)
def vertex_property_exists(self, label, prop):
return self._vertex_labels[self.get_vertex_label_id(label)].property_exists(
prop
)
def get_edge_property_id(self, label, prop):
return self._edge_labels[self.get_edge_label_id(label)].get_property_id(prop)
def edge_property_exists(self, label, prop):
return self._edge_labels[self.get_edge_label_id(label)].property_exists(prop)
def clear(self):
self._oid_type = None
self._vid_type = None
self._vdata_type = graph_def_pb2.UNKNOWN
self._edata_type = graph_def_pb2.UNKNOWN
self._vertex_labels.clear()
self._edge_labels.clear()
self._vertex_labels_to_add.clear()
self._vertex_labels_to_drop.clear()
self._edge_labels_to_add.clear()
self._edge_labels_to_drop.clear()
self._vertex_labels_to_add_property.clear()
self._edge_labels_to_add_property.clear()
self._valid_vertices.clear()
self._valid_edges.clear()
self._v_label_index.clear()
self._e_label_index.clear()
def signature(self):
return hashlib.sha256("{}".format(self.__repr__()).encode("utf-8")).hexdigest()
def add_vertex_label(self, label, vid_field=None, properties=None, comment=""):
item = VertexLabel(label)
item.set_comment(comment)
if vid_field:
item = item.add_primary_key(*vid_field)
if properties:
for prop in properties:
item = item.add_property(*prop)
self._vertex_labels_to_add.append(item)
return self._vertex_labels_to_add[-1]
def add_edge_label(
self, label, src_label=None, dst_label=None, properties=None, comment=""
):
item = EdgeLabel(label)
item.set_comment(comment)
if src_label:
item = item.source(src_label)
if dst_label:
item = item.destination(dst_label)
if properties:
for prop in properties:
item = item.add_property(*prop)
self._edge_labels_to_add.append(item)
return self._edge_labels_to_add[-1]
def add_vertex_properties(self, label, properties=None):
item = VertexLabel(label)
if properties is not None:
for prop in properties:
item = item.add_property(*prop)
self._vertex_labels_to_add_property.append(item)
return self._vertex_labels_to_add_property[-1]
def add_edge_properties(self, label, properties=None):
item = EdgeLabel(label)
if properties is not None:
for prop in properties:
item = item.add_property(*prop)
self._edge_labels_to_add_property.append(item)
return self._edge_labels_to_add_property[-1]
def drop(self, label, src_label=None, dst_label=None):
for item in self._vertex_labels:
if label == item.label:
if src_label is not None or dst_label is not None:
raise ValueError(
"Vertex label should not have source and destination."
)
self._vertex_labels_to_drop.append(VertexLabel(label))
return
for item in self._edge_labels:
if label == item.label:
label_to_drop = EdgeLabel(label)
if src_label is not None and dst_label is not None:
label_to_drop.source(src_label).destination(dst_label)
self._edge_labels_to_drop.append(label_to_drop)
return
raise ValueError(f"Label {label} not found.")
def drop_all(self):
for item in self._edge_labels:
for rel in item.relations:
self._edge_labels_to_drop.append(
EdgeLabel(item.label)
.source(rel.source)
.destination(rel.destination)
)
self._edge_labels_to_drop.append(EdgeLabel(item.label))
for item in self._vertex_labels:
self._vertex_labels_to_drop.append(VertexLabel(item.label))
def _prepare_batch_rpc(self):
requests = ddl_service_pb2.BatchSubmitRequest()
for item in self._vertex_labels_to_add:
type_pb = item.as_type_def()
requests.value.add().create_vertex_type_request.type_def.CopyFrom(type_pb)
for item in self._edge_labels_to_add:
type_pb = item.as_type_def()
requests.value.add().create_edge_type_request.type_def.CopyFrom(type_pb)
for rel in item.relations:
assert rel.source and rel.destination, "Invalid relation"
request = ddl_service_pb2.AddEdgeKindRequest()
request.edge_label = item.label
request.src_vertex_label = rel.source
request.dst_vertex_label = rel.destination
requests.value.add().add_edge_kind_request.CopyFrom(request)
for item in self._edge_labels_to_drop:
if item.relations:
request = ddl_service_pb2.RemoveEdgeKindRequest()
request.edge_label = item.label
request.src_vertex_label = item.relations[0].source
request.dst_vertex_label = item.relations[0].destination
requests.value.add().remove_edge_kind_request.CopyFrom(request)
else:
requests.value.add().drop_edge_type_request.label = item.label
for item in self._vertex_labels_to_add_property:
type_pb = item.as_type_def()
requests.value.add().add_vertex_type_properties_request.type_def.CopyFrom(
type_pb
)
for item in self._edge_labels_to_add_property:
type_pb = item.as_type_def()
requests.value.add().add_edge_type_properties_request.type_def.CopyFrom(
type_pb
)
for item in self._vertex_labels_to_drop:
requests.value.add().drop_vertex_type_request.label = item.label
return requests
def update(self):
requests = self._prepare_batch_rpc()
self._vertex_labels_to_add.clear()
self._edge_labels_to_add.clear()
self._vertex_labels_to_drop.clear()
self._edge_labels_to_drop.clear()
self._vertex_labels_to_add_property.clear()
self._edge_labels_to_add_property.clear()
response = self._conn.submit(requests)
self.from_graph_def(response.graph_def)
return self
def create_and_replace(self, vineyard_schema):
pass