python/graphscope/framework/graph_utils.py (346 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from typing import Any from typing import Mapping from typing import Sequence from typing import Tuple from typing import Union import numpy as np import pandas as pd try: import vineyard except (ImportError, TypeError): vineyard = None from graphscope.framework import utils from graphscope.framework.errors import check_argument from graphscope.framework.loader import Loader from graphscope.proto import attr_value_pb2 from graphscope.proto import types_pb2 if vineyard is not None: VineyardObjectTypes = (vineyard.Object, vineyard.ObjectID, vineyard.ObjectName) LoaderVariants = Union[ Loader, str, Sequence[np.ndarray], pd.DataFrame, vineyard.Object, vineyard.ObjectID, vineyard.ObjectName, ] else: VineyardObjectTypes = () LoaderVariants = Union[ Loader, str, Sequence[np.ndarray], pd.DataFrame, ] class VertexLabel(object): """Holds meta informations about a single vertex label.""" def __init__( self, label: str, loader: Any, properties: Union[Sequence, None] = None, vid_field: Union[str, int] = 0, session_id=None, id_type: str = "int64_t", vformat=None, ): self.label = label # loader to take various data source if isinstance(loader, Loader): self.loader = loader else: self.loader = Loader(loader) # raw properties passed by user parameters self.raw_properties = properties # finally properties for constructing graph self.properties = [] # column index or property name used as id field self.vid_field = vid_field # type of vertex original id # should be consistent with the original graph self.id_type = id_type self._session_id = session_id self._vformat = vformat # normalize properties # add vid to property list self.add_property(str(self.vid_field), self.id_type) if self.raw_properties: self.add_properties(self.raw_properties) elif self.loader.deduced_properties: self.add_properties(self.loader.deduced_properties) # set selected columns to loader self.loader.select_columns( self.properties, include_all=bool(self.raw_properties is None) ) def __str__(self) -> str: s = "\ntype: VertexLabel" s += "\nlabel: " + self.label s += "\nproperties: " + str(self.properties) s += "\nvid: " + str(self.vid_field) s += "\nid_type: " + self.id_type s += "\nloader: " + repr(self.loader) return s def __repr__(self) -> str: return self.__str__() def add_property(self, prop: str, dtype=None) -> None: """prop is a str, representing name. It can optionally have a type.""" self.properties.append((prop, utils.unify_type(dtype))) def add_properties(self, properties: Sequence) -> None: for prop in properties: if isinstance(prop, str): self.add_property(prop) else: self.add_property(prop[0], prop[1]) def attr(self) -> Sequence[attr_value_pb2.Chunk]: chunk = attr_value_pb2.Chunk() chunk.attr[types_pb2.CHUNK_NAME].CopyFrom(utils.s_to_attr("vertex")) chunk.attr[types_pb2.CHUNK_TYPE].CopyFrom(utils.s_to_attr("loader")) chunk.attr[types_pb2.LABEL].CopyFrom(utils.s_to_attr(self.label)) chunk.attr[types_pb2.VID].CopyFrom(utils.s_to_attr(str(self.vid_field))) if isinstance(self._vformat, str): chunk.attr[types_pb2.VFORMAT].CopyFrom(utils.s_to_attr(self._vformat)) # loader for k, v in self.loader.get_attr().items(): # raw bytes for pandas/numpy data if k == types_pb2.VALUES: chunk.buffer = v else: chunk.attr[k].CopyFrom(v) return [chunk] class EdgeSubLabel(object): """Hold meta informations of a single relationship. i.e. src_label -> edge_label -> dst_label """ def __init__( self, loader, properties=None, src_label: str = "_", dst_label: str = "_", src_field: Union[str, int] = 0, dst_field: Union[str, int] = 1, load_strategy="both_out_in", id_type: str = "int64_t", eformat=None, ): if isinstance(loader, Loader): self.loader = loader else: self.loader = Loader(loader) # raw properties passed by user parameters self.raw_properties = properties # finally properties for constructing graph self.properties = [] # type of vertex original id # should be consistent with the original graph self.id_type = id_type self.src_label = src_label self.dst_label = dst_label self.src_field = src_field self.dst_field = dst_field # check available check_argument( load_strategy in ("only_out", "only_in", "both_out_in"), "invalid load strategy: " + load_strategy, ) self.load_strategy = load_strategy if (isinstance(self.src_field, int) and isinstance(self.dst_field, str)) or ( isinstance(self.src_field, str) and isinstance(self.dst_field, int) ): print("src field", self.src_field, "dst_field", self.dst_field) raise SyntaxError( "Source vid and destination vid must have same formats, both use name or both use index" ) # normalize properties # add src/dst to property list self.add_property(str(self.src_field), self.id_type) self.add_property(str(self.dst_field), self.id_type) if self.raw_properties: self.add_properties(self.raw_properties) elif self.loader.deduced_properties: self.add_properties(self.loader.deduced_properties) # set selected columns to loader self.loader.select_columns( self.properties, include_all=bool(self.raw_properties is None) ) self._eformat = eformat def __str__(self) -> str: s = "\ntype: EdgeSubLabel" s += "\nsource_label: " + self.src_label s += "\ndestination_label: " + self.dst_label s += "\nproperties: " + str(self.properties) s += "\nloader: " + repr(self.loader) return s def __repr__(self) -> str: return self.__str__() def add_property(self, prop: str, dtype=None) -> None: """prop is a str, representing name. It can optionally have a type.""" self.properties.append((prop, utils.unify_type(dtype))) def add_properties(self, properties: Sequence) -> None: for prop in properties: if isinstance(prop, str): self.add_property(prop) else: self.add_property(prop[0], prop[1]) def get_attr(self) -> attr_value_pb2.Chunk: chunk = attr_value_pb2.Chunk() chunk.attr[types_pb2.SUB_LABEL].CopyFrom( utils.s_to_attr("{}_{}".format(self.src_label, self.dst_label)) ) chunk.attr[types_pb2.SRC_LABEL].CopyFrom(utils.s_to_attr(self.src_label)) chunk.attr[types_pb2.DST_LABEL].CopyFrom(utils.s_to_attr(self.dst_label)) chunk.attr[types_pb2.LOAD_STRATEGY].CopyFrom( utils.s_to_attr(self.load_strategy) ) chunk.attr[types_pb2.SRC_VID].CopyFrom(utils.s_to_attr(str(self.src_field))) chunk.attr[types_pb2.DST_VID].CopyFrom(utils.s_to_attr(str(self.dst_field))) if isinstance(self._eformat, str): chunk.attr[types_pb2.EFORMAT].CopyFrom(utils.s_to_attr(self._eformat)) # loader for k, v in self.loader.get_attr().items(): # raw bytes for pandas/numpy data if k == types_pb2.VALUES: chunk.buffer = v else: chunk.attr[k].CopyFrom(v) return chunk class EdgeLabel(object): """Hold meta informations of an edge label. An Edge label may be consist of a few `EdgeSubLabel`s. i.e. src_label1 -> edge_label -> dst_label1 src_label2 -> edge_label -> dst_label2 src_label3 -> edge_label -> dst_label3 """ def __init__(self, label: str, id_type: str, session_id=None): self.label = label # type of vertex original id # should be consistent with the original graph self.id_type = id_type self.sub_labels = {} self._session_id = session_id def __str__(self): s = "\ntype: EdgeLabel" s += "\nlabel: " + self.label s += "\nsub_labels: " for sub_label in self.sub_labels.values(): s += "\n" s += str(sub_label) return s def __repr__(self): return self.__str__() def add_sub_label(self, sub_label): src = sub_label.src_label dst = sub_label.dst_label if (src, dst) in self.sub_labels: raise ValueError( f"The relationship {src} -> {self.label} <- {dst} already existed in graph." ) self.sub_labels[(src, dst)] = sub_label def attr(self) -> Sequence[attr_value_pb2.Chunk]: chunk_list = [] for sub_label in self.sub_labels.values(): chunk = sub_label.get_attr() chunk.attr[types_pb2.CHUNK_NAME].CopyFrom(utils.s_to_attr("edge")) chunk.attr[types_pb2.CHUNK_TYPE].CopyFrom(utils.s_to_attr("loader")) chunk.attr[types_pb2.LABEL].CopyFrom(utils.s_to_attr(self.label)) chunk_list.append(chunk) return chunk_list def _convert_array_to_deprecated_form(items): compat_items = [] # for i in range(len(items)): for i, item in enumerate(items): if i < 2: compat_items.append(item) elif i == 2: if isinstance(item, (int, str)) and isinstance(items[i + 1], (int, str)): compat_items.append("_") compat_items.append("_") compat_items.append(item) compat_items.append(items[i + 1]) else: assert len(item) == 2 and len(items[i + 1]) == 2 compat_items.append(item[1]) compat_items.append(items[i + 1][1]) compat_items.append(item[0]) compat_items.append(items[i + 1][0]) elif i == 3: pass else: compat_items.append(item) return compat_items def _convert_dict_to_compat_form(items): if "source" in items: if isinstance(items["source"], (int, str)): items["src_label"] = "_" items["src_field"] = items["source"] else: assert len(items["source"]) == 2 items["src_label"] = items["source"][1] items["src_field"] = items["source"][0] items.pop("source") if "destination" in items: if isinstance(items["destination"], (int, str)): items["dst_label"] = "_" items["dst_field"] = items["destination"] else: assert len(items["destination"]) == 2 items["dst_label"] = items["destination"][1] items["dst_field"] = items["destination"][0] items.pop("destination") return items def normalize_parameter_edges( edges: Union[ Mapping[str, Union[Sequence, LoaderVariants, Mapping]], Tuple, LoaderVariants ], id_type: str, eformat: Union[str, None] = None, ): """Normalize parameters user passed in. Since parameters are very flexible, we need to be careful about it. Args: edges (Union[ Mapping[str, Union[Sequence, LoaderVariants, Mapping]], Tuple, LoaderVariants ]): Edges definition. id_type (str): Type of vertex original id. """ def process_sub_label(items): if isinstance(items, (Loader, str, pd.DataFrame, *VineyardObjectTypes)): return EdgeSubLabel( items, None, "_", "_", 0, 1, id_type=id_type, eformat=eformat ) elif isinstance(items, Sequence): if all([isinstance(item, np.ndarray) for item in items]): return EdgeSubLabel( items, None, "_", "_", 0, 1, id_type=id_type, eformat=eformat ) else: check_argument(len(items) < 6, "Too many arguments for a edge label") compat_items = _convert_array_to_deprecated_form(items) return EdgeSubLabel(*compat_items, id_type=id_type, eformat=eformat) elif isinstance(items, Mapping): items = _convert_dict_to_compat_form(items) return EdgeSubLabel(**items, id_type=id_type, eformat=eformat) else: raise SyntaxError("Wrong format of e sub label: " + str(items)) def process_label(label, items): e_label = EdgeLabel(label, id_type) if isinstance(items, (Loader, str, pd.DataFrame, *VineyardObjectTypes)): e_label.add_sub_label(process_sub_label(items)) elif isinstance(items, Sequence): if isinstance( items[0], (Loader, str, pd.DataFrame, *VineyardObjectTypes, np.ndarray) ): e_label.add_sub_label(process_sub_label(items)) else: for item in items: e_label.add_sub_label(process_sub_label(item)) elif isinstance(items, Mapping): e_label.add_sub_label(process_sub_label(items)) else: raise SyntaxError("Wrong format of e label: " + str(items)) return e_label e_labels = [] if edges is None: raise ValueError("Edges should be None") if isinstance(edges, Mapping): for label, attr in edges.items(): e_labels.append(process_label(label, attr)) else: e_labels.append(process_label("_", edges)) return e_labels def normalize_parameter_vertices( vertices: Union[ Mapping[str, Union[Sequence, LoaderVariants, Mapping]], Tuple, LoaderVariants, None, ], id_type: str, vformat: Union[str, None] = None, ): """Normalize parameters user passed in. Since parameters are very flexible, we need to be careful about it. Args: vertices (Union[ Mapping[str, Union[Sequence, LoaderVariants, Mapping]], Tuple, LoaderVariants, None, ]): Vertices definition. id_type (str): Type of vertex original id. """ def process_label(label, items): if isinstance(items, (Loader, str, pd.DataFrame, *VineyardObjectTypes)): return VertexLabel( label=label, id_type=id_type, loader=items, vformat=vformat ) elif isinstance(items, Sequence): if all([isinstance(item, np.ndarray) for item in items]): return VertexLabel( label=label, id_type=id_type, loader=items, vformat=vformat ) else: check_argument(len(items) < 4, "Too many arguments for a vertex label") return VertexLabel(label, *items, id_type=id_type, vformat=vformat) elif isinstance(items, Mapping): if "vid" in items: items["vid_field"] = items["vid"] items.pop("vid") return VertexLabel(label, id_type=id_type, vformat=vformat, **items) else: raise RuntimeError("Wrong format of v label: " + str(items)) v_labels = [] if vertices is None: return v_labels if isinstance(vertices, Mapping): for label, attr in vertices.items(): v_labels.append(process_label(label, attr)) else: v_labels.append(process_label("_", vertices)) return v_labels