interactive_engine/executor/store/mcsr/src/schema.rs (539 lines of code) (raw):
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::fs::File;
use std::hash::Hash;
use std::path::Path;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use crate::columns::DataType;
use crate::types::*;
/// The starting id field in an edge file
pub const START_ID_FIELD: &'static str = "start_id";
/// The end id field in an edge file
pub const END_ID_FIELD: &'static str = "end_id";
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub enum PartitionType {
Dynamic,
Static,
Null,
}
impl<'a> From<&'a str> for PartitionType {
fn from(_token: &'a str) -> Self {
let token_str = _token.to_uppercase();
let token = token_str.as_str();
if token == "DYNAMIC" {
PartitionType::Dynamic
} else if token == "STATIC" {
PartitionType::Static
} else {
error!("Unsupported type {:?}", token);
PartitionType::Null
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub enum EdgeStrategy {
Single,
Multiple,
Null,
}
impl<'a> From<&'a str> for EdgeStrategy {
fn from(_token: &'a str) -> Self {
let token_str = _token.to_uppercase();
let token = token_str.as_str();
if token == "SINGLE" {
EdgeStrategy::Single
} else if token == "MULTI" {
EdgeStrategy::Multiple
} else {
error!("Unsupported type {:?}", token);
EdgeStrategy::Null
}
}
}
/// An edge's label is consisted of three elements:
/// edge_label, src_vertex_label and dst_vertex_label.
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub struct EdgeLabelTuple {
pub edge_label: LabelId,
pub src_vertex_label: LabelId,
pub dst_vertex_label: LabelId,
}
pub trait Schema {
/// Get the header for the certain type of vertex if any
fn get_vertex_header(&self, vertex_type_id: LabelId) -> Option<&[(String, DataType)]>;
/// Get the header for the certain type of edge if any
fn get_edge_header(
&self, src_label: LabelId, edge_label: LabelId, dst_label: LabelId,
) -> Option<&[(String, DataType)]>;
/// Get the schema for the certain type of vertex if any.
fn get_vertex_schema(&self, vertex_type_id: LabelId) -> Option<&HashMap<String, (DataType, usize)>>;
/// Get the schema for the certain
/// type of edge if any.
fn get_edge_schema(
&self, edge_type_id: (LabelId, LabelId, LabelId),
) -> Option<&HashMap<String, (DataType, usize)>>;
/// Get a certain vertex type's id if any
fn get_vertex_label_id(&self, vertex_type: &str) -> Option<LabelId>;
/// Get a certain edge type's id
fn get_edge_label_id(&self, edge_type: &str) -> Option<LabelId>;
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct CsrGraphSchema {
/// Map from vertex types to labelid
pub vertex_type_to_id: HashMap<String, LabelId>,
/// Map from edge types to `EdgeLabelTuple`
pub edge_type_to_id: HashMap<String, LabelId>,
/// Map from vertex/edge (labelid) to its property name, data types and index in the row
vertex_prop_meta: HashMap<LabelId, HashMap<String, (DataType, usize)>>,
vertex_prop_vec: HashMap<LabelId, Vec<(String, DataType)>>,
vertex_partition_type: HashMap<LabelId, PartitionType>,
edge_prop_meta: HashMap<(LabelId, LabelId, LabelId), HashMap<String, (DataType, usize)>>,
edge_prop_vec: HashMap<(LabelId, LabelId, LabelId), Vec<(String, DataType)>>,
edge_single_ie: HashSet<(LabelId, LabelId, LabelId)>,
edge_single_oe: HashSet<(LabelId, LabelId, LabelId)>,
}
impl CsrGraphSchema {
pub fn vertex_label_names(&self) -> Vec<String> {
let mut ret = vec![];
let vertex_label_num = self.vertex_type_to_id.len();
for _ in 0..vertex_label_num {
ret.push(String::new());
}
for pair in self.vertex_type_to_id.iter() {
ret[*pair.1 as usize] = pair.0.clone();
}
ret
}
pub fn edge_label_names(&self) -> Vec<String> {
let mut ret = vec![];
let edge_label_num = self.edge_type_to_id.len();
for _ in 0..edge_label_num {
ret.push(String::new());
}
for pair in self.edge_type_to_id.iter() {
ret[*pair.1 as usize] = pair.0.clone();
}
ret
}
pub fn is_static_vertex(&self, vertex_label: LabelId) -> bool {
*self
.vertex_partition_type
.get(&vertex_label)
.unwrap()
== PartitionType::Static
}
pub fn is_single_ie(&self, src_label: LabelId, edge_label: LabelId, dst_label: LabelId) -> bool {
if self
.edge_single_ie
.contains(&(src_label, edge_label, dst_label))
{
true
} else {
false
}
}
pub fn is_single_oe(&self, src_label: LabelId, edge_label: LabelId, dst_label: LabelId) -> bool {
if self
.edge_single_oe
.contains(&(src_label, edge_label, dst_label))
{
true
} else {
false
}
}
pub fn desc(&self) {
info!(
"vertex label num: {}, edge label num: {}",
self.vertex_type_to_id.len(),
self.edge_type_to_id.len()
);
for pair in self.vertex_type_to_id.iter() {
info!("vertex label: {}, id: {}", pair.0.clone(), pair.1);
}
for pair in self.edge_type_to_id.iter() {
info!("edge label: {}, id: {}", pair.0.clone(), pair.1);
}
}
pub fn from_json_file<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
let file = File::open(path)?;
let schema_json =
serde_json::from_reader::<File, CsrGraphSchemaJson>(file).map_err(std::io::Error::from)?;
Ok(CsrGraphSchema::from(&schema_json))
}
pub fn to_json_file<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
let file = File::create(path)?;
let schema_json = CsrGraphSchemaJson::from(self);
serde_json::to_writer_pretty::<File, CsrGraphSchemaJson>(file, &schema_json)
.map_err(std::io::Error::from)
}
/// Get a certain edge type's id, together with its start- and edge- vertices's type
/// while giving the `full_edge_type` that is "<src_vertex_label>_<edge_label>_<dst_vertex_label>"
pub fn get_edge_label_tuple(&self, full_edge_type: &str) -> Option<EdgeLabelTuple> {
let mut parts = full_edge_type.split("_");
let src_label_id =
if let Some(src_label) = parts.next() { self.get_vertex_label_id(src_label) } else { None };
let edge_label_id =
if let Some(edge_label) = parts.next() { self.get_edge_label_id(edge_label) } else { None };
let dst_label_id =
if let Some(dst_label) = parts.next() { self.get_vertex_label_id(dst_label) } else { None };
if src_label_id.is_some() && edge_label_id.is_some() && dst_label_id.is_some() {
Some(EdgeLabelTuple {
edge_label: edge_label_id.unwrap(),
src_vertex_label: src_label_id.unwrap(),
dst_vertex_label: dst_label_id.unwrap(),
})
} else {
None
}
}
}
fn is_map_eq<K: PartialEq + Ord + Debug + Hash, V: PartialEq + Ord + Debug>(
map1: &HashMap<K, V>, map2: &HashMap<K, V>,
) -> bool {
map1.iter().sorted().eq(map2.iter().sorted())
}
impl PartialEq for CsrGraphSchema {
fn eq(&self, other: &Self) -> bool {
let mut is_eq = is_map_eq(&self.vertex_type_to_id, &other.vertex_type_to_id)
&& is_map_eq(&self.edge_type_to_id, &other.edge_type_to_id)
&& is_map_eq(&self.vertex_prop_vec, &other.vertex_prop_vec)
&& is_map_eq(&self.edge_prop_vec, &other.edge_prop_vec)
&& self.vertex_prop_meta.len() == other.vertex_prop_meta.len()
&& self.edge_prop_meta.len() == other.edge_prop_meta.len();
if is_eq {
for ((k1, v1), (k2, v2)) in self
.vertex_prop_meta
.iter()
.sorted_by(|e1, e2| e1.0.cmp(e2.0))
.zip(
other
.vertex_prop_meta
.iter()
.sorted_by(|e1, e2| e1.0.cmp(e2.0)),
)
{
is_eq = k1 == k2 && is_map_eq(v1, v2);
if !is_eq {
break;
}
}
for ((k1, v1), (k2, v2)) in self
.edge_prop_meta
.iter()
.sorted_by(|e1, e2| e1.0.cmp(e2.0))
.zip(
other
.edge_prop_meta
.iter()
.sorted_by(|e1, e2| e1.0.cmp(e2.0)),
)
{
is_eq = k1 == k2 && is_map_eq(v1, v2);
if !is_eq {
break;
}
}
}
is_eq
}
}
impl<'a> From<&'a CsrGraphSchemaJson> for CsrGraphSchema {
fn from(schema_json: &'a CsrGraphSchemaJson) -> Self {
let mut vertex_type_to_id = HashMap::new();
let mut vertex_partition_type = HashMap::new();
let mut vertex_label = 0 as LabelId;
for vertex_info in &schema_json.vertex {
vertex_type_to_id.insert(vertex_info.label.clone(), vertex_label);
vertex_partition_type.insert(vertex_label, vertex_info.partition_type.clone());
vertex_label += 1;
}
let mut edge_type_to_id = HashMap::new();
let mut edge_label = 0 as LabelId;
for edge_info in &schema_json.edge {
if !edge_type_to_id.contains_key(&edge_info.label) {
edge_type_to_id.insert(edge_info.label.clone(), edge_label);
edge_label += 1;
}
}
let mut vertex_prop_meta: HashMap<LabelId, HashMap<String, (DataType, usize)>> =
HashMap::with_capacity(schema_json.vertex.len());
let mut vertex_prop_vec: HashMap<LabelId, Vec<(String, DataType)>> =
HashMap::with_capacity(schema_json.vertex.len());
let mut edge_prop_meta: HashMap<(LabelId, LabelId, LabelId), HashMap<String, (DataType, usize)>> =
HashMap::with_capacity(schema_json.edge.len());
let mut edge_prop_vec: HashMap<(LabelId, LabelId, LabelId), Vec<(String, DataType)>> =
HashMap::with_capacity(schema_json.edge.len());
let mut edge_single_ie = HashSet::new();
let mut edge_single_oe = HashSet::new();
for vertex_info in &schema_json.vertex {
let label_id = vertex_type_to_id[&vertex_info.label];
let vertex_map = vertex_prop_meta
.entry(label_id)
.or_insert_with(HashMap::new);
let vertex_vec = vertex_prop_vec
.entry(label_id)
.or_insert_with(Vec::new);
for (index, column) in vertex_info.properties.iter().enumerate() {
vertex_map.insert(column.name.clone(), (column.data_type.clone(), index));
vertex_vec.push((column.name.clone(), column.data_type.clone()));
}
}
for edge_info in &schema_json.edge {
let src_label_id = vertex_type_to_id[&edge_info.src_label];
let dst_label_id = vertex_type_to_id[&edge_info.dst_label];
let label_id = edge_type_to_id[&edge_info.label];
let edge_map = edge_prop_meta
.entry((src_label_id, label_id, dst_label_id))
.or_insert_with(HashMap::new);
let edge_vec = edge_prop_vec
.entry((src_label_id, label_id, dst_label_id))
.or_insert_with(Vec::new);
if edge_info.ie_strategy.is_some()
&& *edge_info.ie_strategy.as_ref().unwrap() == EdgeStrategy::Single
{
edge_single_ie.insert((src_label_id, label_id, dst_label_id));
}
if edge_info.oe_strategy.is_some()
&& *edge_info.oe_strategy.as_ref().unwrap() == EdgeStrategy::Single
{
edge_single_oe.insert((src_label_id, label_id, dst_label_id));
}
if let Some(properties) = &edge_info.properties {
for (index, column) in properties.iter().enumerate() {
edge_map.insert(column.name.clone(), (column.data_type.clone(), index));
edge_vec.push((column.name.clone(), column.data_type.clone()));
}
}
}
Self {
vertex_type_to_id,
edge_type_to_id,
vertex_prop_meta,
vertex_prop_vec,
vertex_partition_type,
edge_prop_meta,
edge_prop_vec,
edge_single_ie,
edge_single_oe,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct InputSchema {
/// Map from vertex label id to headers in input file
vertex_headers: HashMap<LabelId, Vec<(String, DataType)>>,
/// Map from src_vertex, edge, dst_vertex label id to headers in input file
edge_headers: HashMap<(LabelId, LabelId, LabelId), Vec<(String, DataType)>>,
/// Map for vertex label id to input file
vertex_files: HashMap<LabelId, Vec<String>>,
/// Map for src_vertex, edge, dst_vertex label id to input file
edge_files: HashMap<(LabelId, LabelId, LabelId), Vec<String>>,
}
impl InputSchema {
pub fn get_vertex_header(&self, vertex_label: LabelId) -> Option<&[(String, DataType)]> {
self.vertex_headers
.get(&vertex_label)
.map(|vec| vec.as_slice())
}
pub fn get_edge_header(
&self, src_label: LabelId, edge_label: LabelId, dst_label: LabelId,
) -> Option<&[(String, DataType)]> {
self.edge_headers
.get(&(src_label, edge_label, dst_label))
.map(|vec| vec.as_slice())
}
pub fn get_vertex_file(&self, vertex_label: LabelId) -> Option<&Vec<String>> {
self.vertex_files.get(&vertex_label)
}
pub fn get_edge_file(
&self, src_label: LabelId, edge_label: LabelId, dst_label: LabelId,
) -> Option<&Vec<String>> {
self.edge_files
.get(&(src_label, edge_label, dst_label))
}
pub fn from_json_file<P: AsRef<Path>>(path: P, graph_schema: &CsrGraphSchema) -> std::io::Result<Self> {
let file = File::open(path)?;
let input_json =
serde_json::from_reader::<File, InputSchemaJson>(file).map_err(std::io::Error::from)?;
let mut vertex_headers = HashMap::new();
let mut vertex_files = HashMap::new();
for vertex in &input_json.vertex {
if let Some(vertex_label) = graph_schema
.vertex_type_to_id
.get(&vertex.label)
{
let mut properties = vec![];
for column in &vertex.columns {
properties.push((column.name.clone(), column.data_type.clone()));
}
vertex_headers.insert(*vertex_label, properties);
vertex_files.insert(*vertex_label, vertex.files.clone());
}
}
let mut edge_headers = HashMap::new();
let mut edge_files = HashMap::new();
for edge in &input_json.edge {
if let (Some(src_label), Some(edge_label), Some(dst_label)) = (
graph_schema
.vertex_type_to_id
.get(&edge.src_label),
graph_schema.edge_type_to_id.get(&edge.label),
graph_schema
.vertex_type_to_id
.get(&edge.dst_label),
) {
let mut properties = vec![];
for column in &edge.columns {
properties.push((column.name.clone(), column.data_type.clone()));
}
edge_headers.insert((*src_label, *edge_label, *dst_label), properties);
edge_files.insert((*src_label, *edge_label, *dst_label), edge.files.clone());
}
}
Ok(InputSchema { vertex_headers, edge_headers, vertex_files, edge_files })
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct ColumnInfo {
name: String,
data_type: DataType,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct InputVertex {
label: String,
columns: Vec<ColumnInfo>,
files: Vec<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct InputEdge {
src_label: String,
dst_label: String,
label: String,
columns: Vec<ColumnInfo>,
files: Vec<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct VertexInfo {
label: String,
partition_type: PartitionType,
properties: Vec<ColumnInfo>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct EdgeInfo {
src_label: String,
dst_label: String,
label: String,
#[serde(skip_serializing_if = "Option::is_none")]
ie_strategy: Option<EdgeStrategy>,
#[serde(skip_serializing_if = "Option::is_none")]
oe_strategy: Option<EdgeStrategy>,
#[serde(skip_serializing_if = "Option::is_none")]
properties: Option<Vec<ColumnInfo>>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct CsrGraphSchemaJson {
vertex: Vec<VertexInfo>,
edge: Vec<EdgeInfo>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct InputSchemaJson {
vertex: Vec<InputVertex>,
edge: Vec<InputEdge>,
}
impl<'a> From<&'a CsrGraphSchema> for CsrGraphSchemaJson {
fn from(schema: &'a CsrGraphSchema) -> Self {
let vertex_label_num = schema.vertex_type_to_id.len();
let edge_label_num = schema.edge_type_to_id.len();
let mut vertex_info_vec = vec![
VertexInfo {
label: "".to_string(),
partition_type: PartitionType::Dynamic,
properties: vec![]
};
vertex_label_num
];
let mut edge_info_vec = vec![];
let mut vertex_names = vec!["".to_string(); vertex_label_num];
let mut edge_names = vec!["".to_string(); edge_label_num];
for (vertex_label, label) in &schema.vertex_type_to_id {
vertex_names[*label as usize] = vertex_label.clone();
let partition_type = schema.vertex_partition_type.get(label).unwrap();
if let Some(column) = schema.vertex_prop_vec.get(label) {
let mut properties = vec![];
for (col_name, data_type) in column {
properties.push(ColumnInfo { name: col_name.clone(), data_type: data_type.clone() });
}
vertex_info_vec[*label as usize] = VertexInfo {
label: vertex_label.clone(),
partition_type: partition_type.clone(),
properties: properties,
}
}
}
for (edge_label, label) in &schema.edge_type_to_id {
edge_names[*label as usize] = edge_label.clone();
}
for ((src_label, label, dst_label), columns) in &schema.edge_prop_vec {
let src_label_name = vertex_names[*src_label as usize].clone();
let label_name = edge_names[*label as usize].clone();
let dst_label_name = vertex_names[*dst_label as usize].clone();
let ie_strategy = if schema
.edge_single_ie
.contains(&(*src_label, *label, *dst_label))
{
Some(EdgeStrategy::Single)
} else {
None
};
let oe_strategy = if schema
.edge_single_oe
.contains(&(*src_label, *label, *dst_label))
{
Some(EdgeStrategy::Single)
} else {
None
};
if columns.len() > 0 {
let mut properties = vec![];
for (col_name, data_type) in columns {
properties.push(ColumnInfo { name: col_name.clone(), data_type: data_type.clone() });
}
edge_info_vec.push(EdgeInfo {
src_label: src_label_name,
dst_label: dst_label_name,
label: label_name,
ie_strategy: ie_strategy,
oe_strategy: oe_strategy,
properties: Some(properties),
});
} else {
edge_info_vec.push(EdgeInfo {
src_label: src_label_name,
dst_label: dst_label_name,
label: label_name,
ie_strategy: ie_strategy,
oe_strategy: oe_strategy,
properties: None,
});
}
}
edge_info_vec
.sort_by(|a, b| schema.edge_type_to_id[&a.label].cmp(&schema.edge_type_to_id[&b.label]));
Self { vertex: vertex_info_vec, edge: edge_info_vec }
}
}
impl Schema for CsrGraphSchema {
fn get_vertex_header(&self, vertex_type_id: LabelId) -> Option<&[(String, DataType)]> {
self.vertex_prop_vec
.get(&vertex_type_id)
.map(|vec| vec.as_slice())
}
fn get_edge_header(
&self, src_label: LabelId, edge_label: LabelId, dst_label: LabelId,
) -> Option<&[(String, DataType)]> {
self.edge_prop_vec
.get(&(src_label, edge_label, dst_label))
.map(|vec| vec.as_slice())
}
fn get_vertex_schema(&self, vertex_type_id: LabelId) -> Option<&HashMap<String, (DataType, usize)>> {
self.vertex_prop_meta.get(&vertex_type_id)
}
fn get_edge_schema(
&self, edge_type_id: (LabelId, LabelId, LabelId),
) -> Option<&HashMap<String, (DataType, usize)>> {
self.edge_prop_meta.get(&edge_type_id)
}
fn get_vertex_label_id(&self, vertex_type: &str) -> Option<LabelId> {
self.vertex_type_to_id.get(vertex_type).cloned()
}
fn get_edge_label_id(&self, edge_type: &str) -> Option<LabelId> {
self.edge_type_to_id.get(edge_type).cloned()
}
}