interactive_engine/executor/store/bmcsr/src/graph_modifier.rs (2,815 lines of code) (raw):

// //! Copyright 2020 Alibaba Group Holding Limited. //! //! Licensed under the Apache License, Version 2.0 (the "License"); //! you may not use this file except in compliance with the License. //! You may obtain a copy of the License at //! //! http://www.apache.org/licenses/LICENSE-2.0 //! //! Unless required by applicable law or agreed to in writing, software //! distributed under the License is distributed on an "AS IS" BASIS, //! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. //! See the License for the specific language governing permissions and //! limitations under the License. use std::any::Any; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::fs::File; use std::io::{BufReader, Write}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Instant; use csv::ReaderBuilder; use pegasus_common::codec::{Decode, Encode}; use pegasus_common::io::{ReadExt, WriteExt}; use rayon::prelude::*; use rust_htslib::bgzf::Reader as GzReader; use crate::bmscsr::BatchMutableSingleCsr; use crate::col_table::{parse_properties, parse_properties_by_mappings, ColTable}; use crate::columns::*; use crate::columns::*; use crate::csr::CsrTrait; use crate::date::Date; use crate::date_time::DateTime; use crate::error::GDBResult; use crate::graph::{Direction, IndexType}; use crate::graph_db::GraphDB; use crate::graph_loader::{get_files_list, get_files_list_beta}; use crate::ldbc_parser::{LDBCEdgeParser, LDBCVertexParser}; use crate::schema::{CsrGraphSchema, InputSchema, Schema}; use crate::types::{DefaultId, LabelId}; #[derive(Clone, Copy)] pub enum WriteType { Insert, Delete, Set, } #[derive(Clone)] pub struct ColumnInfo { index: i32, name: String, data_type: DataType, } impl ColumnInfo { pub fn index(&self) -> i32 { self.index } pub fn name(&self) -> &String { &self.name } pub fn data_type(&self) -> DataType { self.data_type } } #[derive(Clone)] pub struct ColumnMappings { column: ColumnInfo, property_name: String, } impl ColumnMappings { pub fn new(index: i32, name: String, data_type: DataType, property_name: String) -> Self { ColumnMappings { column: ColumnInfo { index, name, data_type }, property_name } } } impl Encode for ColumnMappings { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { writer.write_i32(self.column.index); self.column.name.write_to(writer)?; self.column.data_type.write_to(writer)?; self.property_name.write_to(writer)?; Ok(()) } } impl Decode for ColumnMappings { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let index = reader.read_i32()?; let name = String::read_from(reader)?; let data_type = DataType::read_from(reader)?; let property_name = String::read_from(reader)?; Ok(ColumnMappings { column: ColumnInfo { index, name, data_type }, property_name }) } } impl ColumnMappings { pub fn column(&self) -> &ColumnInfo { &self.column } pub fn property_name(&self) -> &String { &self.property_name } } #[derive(Clone, Copy, PartialEq)] pub enum DataSource { File, Memory, } #[derive(Clone)] pub struct FileInput { pub delimiter: String, pub header_row: bool, pub quoting: bool, pub quote_char: String, pub double_quote: bool, pub escape_char: String, pub block_size: String, pub location: String, } impl Encode for FileInput { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { self.delimiter.write_to(writer)?; self.header_row.write_to(writer)?; self.quoting.write_to(writer)?; self.quote_char.write_to(writer)?; self.double_quote.write_to(writer)?; self.escape_char.write_to(writer)?; self.block_size.write_to(writer)?; self.location.write_to(writer)?; Ok(()) } } impl Decode for FileInput { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let delimiter = String::read_from(reader)?; let header_row = bool::read_from(reader)?; let quoting = bool::read_from(reader)?; let quote_char = String::read_from(reader)?; let double_quote = bool::read_from(reader)?; let escape_char = String::read_from(reader)?; let block_size = String::read_from(reader)?; let location = String::read_from(reader)?; Ok(FileInput { delimiter, header_row, quoting, quote_char, double_quote, escape_char, block_size, location, }) } } impl FileInput { pub fn new(delimiter: String, header_row: bool, location: String) -> Self { FileInput { delimiter, header_row, quoting: true, quote_char: "'".to_string(), double_quote: true, escape_char: "".to_string(), block_size: "4Mb".to_string(), location, } } } fn write_column<W: WriteExt>(column: &Box<dyn Column>, writer: &mut W) -> std::io::Result<()> { if let Some(int32_column) = column.as_any().downcast_ref::<Int32Column>() { writer.write_u8(0); writer.write_u64(column.len() as u64); for i in int32_column.data.iter() { writer.write_i32(*i); } } if let Some(uint32_column) = column.as_any().downcast_ref::<UInt32Column>() { writer.write_u8(1); writer.write_u64(column.len() as u64); for i in uint32_column.data.iter() { writer.write_u32(*i); } } if let Some(int64_column) = column.as_any().downcast_ref::<Int64Column>() { writer.write_u8(2); writer.write_u64(column.len() as u64); for i in int64_column.data.iter() { writer.write_i64(*i); } } if let Some(uint64_column) = column.as_any().downcast_ref::<UInt64Column>() { writer.write_u8(3); writer.write_u64(column.len() as u64); for i in uint64_column.data.iter() { writer.write_u64(*i); } } if let Some(id_column) = column.as_any().downcast_ref::<IDColumn>() { writer.write_u8(4); writer.write_u64(column.len() as u64); for i in id_column.data.iter() { writer.write_u64(*i as u64); } } if let Some(double_column) = column.as_any().downcast_ref::<DoubleColumn>() { writer.write_u8(5); writer.write_u64(column.len() as u64); for i in double_column.data.iter() { writer.write_f64(*i); } } if let Some(string_column) = column.as_any().downcast_ref::<StringColumn>() { writer.write_u8(6); writer.write_u64(column.len() as u64); for i in string_column.data.iter() { i.write_to(writer); } } if let Some(lc_string_column) = column.as_any().downcast_ref::<LCStringColumn>() { writer.write_u8(7); writer.write_u64(lc_string_column.list.len() as u64); for i in lc_string_column.list.iter() { i.write_to(writer); } writer.write_u64(lc_string_column.data.len() as u64); for i in lc_string_column.data.iter() { writer.write_u16(*i); } } if let Some(date_column) = column.as_any().downcast_ref::<DateColumn>() { writer.write_u8(8); writer.write_u64(column.len() as u64); for i in date_column.data.iter() { writer.write_i32(i.to_i32()); } } if let Some(datetime_column) = column.as_any().downcast_ref::<DateTimeColumn>() { writer.write_u8(9); writer.write_u64(column.len() as u64); for i in datetime_column.data.iter() { writer.write_i64(i.to_i64()); } } Ok(()) } fn read_column<R: ReadExt>(reader: &mut R) -> std::io::Result<Box<dyn Column>> { let data: Box<dyn Column> = match reader.read_u8()? { 0 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<i32>::with_capacity(data_len); for i in 0..data_len { data.push(reader.read_i32()?); } Box::new(Int32Column { data }) } 1 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<u32>::with_capacity(data_len); for i in 0..data_len { data.push(reader.read_u32()?); } Box::new(UInt32Column { data }) } 2 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<i64>::with_capacity(data_len); for i in 0..data_len { data.push(reader.read_i64()?); } Box::new(Int64Column { data }) } 3 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<u64>::with_capacity(data_len); for i in 0..data_len { data.push(reader.read_u64()?); } Box::new(UInt64Column { data }) } 4 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<usize>::with_capacity(data_len); for i in 0..data_len { data.push(reader.read_u64()? as usize); } Box::new(IDColumn { data }) } 5 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<f64>::with_capacity(data_len); for i in 0..data_len { data.push(reader.read_f64()?); } Box::new(DoubleColumn { data }) } 6 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<String>::with_capacity(data_len); for i in 0..data_len { data.push(String::read_from(reader)?); } Box::new(StringColumn { data }) } 7 => { let mut list = Vec::<String>::new(); let mut table = HashMap::<String, u16>::new(); let list_len = reader.read_u64()? as usize; for i in 0..list_len { let name = String::read_from(reader)?; list.push(name.clone()); table.insert(name, i as u16); } let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<u16>::with_capacity(data_len); for i in 0..data_len { data.push(reader.read_u16()?); } Box::new(LCStringColumn { data, table, list }) } 8 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<Date>::with_capacity(data_len); for i in 0..data_len { data.push(Date::from_i32(reader.read_i32()?)); } Box::new(DateColumn { data }) } 9 => { let data_len = reader.read_u64()? as usize; let mut data = ColumnContainer::<DateTime>::with_capacity(data_len); for i in 0..data_len { data.push(DateTime::new(reader.read_i64()?)); } Box::new(DateTimeColumn { data }) } _ => panic!("Unknown column type"), }; Ok(data) } fn clone_column(input: &Box<dyn Column>) -> Box<dyn Column> { if let Some(int32_column) = input.as_any().downcast_ref::<Int32Column>() { Box::new(Int32Column::clone_from(int32_column)) } else if let Some(uint32_column) = input.as_any().downcast_ref::<UInt32Column>() { Box::new(UInt32Column::clone_from(uint32_column)) } else if let Some(int64_column) = input.as_any().downcast_ref::<Int64Column>() { Box::new(Int64Column::clone_from(int64_column)) } else if let Some(uint64_column) = input.as_any().downcast_ref::<UInt64Column>() { Box::new(UInt64Column::clone_from(uint64_column)) } else if let Some(id_column) = input.as_any().downcast_ref::<IDColumn>() { Box::new(IDColumn::clone_from(id_column)) } else if let Some(doule_column) = input.as_any().downcast_ref::<DoubleColumn>() { Box::new(DoubleColumn::clone_from(doule_column)) } else if let Some(string_column) = input.as_any().downcast_ref::<StringColumn>() { Box::new(StringColumn::clone_from(string_column)) } else if let Some(lc_string_column) = input.as_any().downcast_ref::<LCStringColumn>() { Box::new(LCStringColumn::clone_from(lc_string_column)) } else if let Some(date_column) = input.as_any().downcast_ref::<DateColumn>() { Box::new(DateColumn::clone_from(date_column)) } else if let Some(datetime_column) = input.as_any().downcast_ref::<DateTimeColumn>() { Box::new(DateTimeColumn::clone_from(datetime_column)) } else { panic!("Unknown column type") } } pub struct ColumnMetadata { data: Box<dyn Column>, column_name: String, data_type: DataType, } impl Encode for ColumnMetadata { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { write_column(&self.data, writer)?; self.column_name.write_to(writer)?; self.data_type.write_to(writer)?; Ok(()) } } impl Decode for ColumnMetadata { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let data: Box<dyn Column> = read_column(reader)?; let column_name = String::read_from(reader)?; let data_type = DataType::read_from(reader)?; Ok(ColumnMetadata { data, column_name, data_type }) } } impl Clone for ColumnMetadata { fn clone(&self) -> Self { let data = clone_column(&self.data); ColumnMetadata { data, column_name: self.column_name.clone(), data_type: self.data_type.clone() } } } impl ColumnMetadata { pub fn new(data: Box<dyn Column>, column_name: String, data_type: DataType) -> Self { ColumnMetadata { data, column_name, data_type } } pub fn data(&self) -> &Box<dyn Column> { &self.data } pub fn take_data(&mut self) -> Box<dyn Column> { std::mem::replace(&mut self.data, Box::new(Int32Column::new())) } pub fn column_name(&self) -> String { self.column_name.clone() } pub fn data_type(&self) -> DataType { self.data_type } } #[derive(Clone)] pub struct DataFrame { columns: Vec<ColumnMetadata>, } impl Encode for DataFrame { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { self.columns.write_to(writer)?; Ok(()) } } impl Decode for DataFrame { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let columns = Vec::<ColumnMetadata>::read_from(reader)?; Ok(DataFrame { columns }) } } impl DataFrame { pub fn new_vertices_ids(data: Vec<u64>) -> Self { let columns = vec![ColumnMetadata::new(Box::new(UInt64Column { data }), "id".to_string(), DataType::ID)]; DataFrame { columns } } pub fn new_edges_ids(data: Vec<usize>) -> Self { let columns = vec![ColumnMetadata::new(Box::new(IDColumn { data }), "id".to_string(), DataType::ID)]; DataFrame { columns } } pub fn add_column(&mut self, column: ColumnMetadata) { self.columns.push(column); } pub fn columns(&self) -> &Vec<ColumnMetadata> { &self.columns } pub fn take_columns(&mut self) -> Vec<ColumnMetadata> { std::mem::replace(&mut self.columns, Vec::new()) } } #[derive(Clone)] pub struct Input { data_source: DataSource, file_input: Option<FileInput>, memory_data: Option<DataFrame>, } impl Encode for Input { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { match self.data_source { DataSource::File => writer.write_u8(0), DataSource::Memory => writer.write_u8(1), }; self.file_input.write_to(writer)?; self.memory_data.write_to(writer)?; Ok(()) } } impl Decode for Input { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let data_source = match reader.read_u8()? { 0 => DataSource::File, 1 => DataSource::Memory, _ => panic!("Unknown DataSource type"), }; let file_input = Option::<FileInput>::read_from(reader)?; let memory_data = Option::<DataFrame>::read_from(reader)?; Ok(Input { data_source, file_input, memory_data }) } } impl Input { pub fn data_source(&self) -> DataSource { self.data_source } pub fn file_input(&self) -> Option<&FileInput> { self.file_input.as_ref() } pub fn memory_data(&self) -> Option<&DataFrame> { self.memory_data.as_ref() } pub fn take_memory_data(&mut self) -> Option<DataFrame> { self.memory_data.take() } pub fn file(file: FileInput) -> Self { Input { data_source: DataSource::File, file_input: Some(file), memory_data: None } } pub fn memory(memory_data: DataFrame) -> Self { Input { data_source: DataSource::Memory, file_input: None, memory_data: Some(memory_data) } } } #[derive(Clone)] pub struct VertexMappings { label_id: LabelId, inputs: Vec<Input>, column_mappings: Vec<ColumnMappings>, } impl Encode for VertexMappings { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { writer.write_u8(self.label_id); self.inputs.write_to(writer)?; self.column_mappings.write_to(writer)?; Ok(()) } } impl Decode for VertexMappings { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let label_id = reader.read_u8()?; let inputs = Vec::<Input>::read_from(reader)?; let column_mappings = Vec::<ColumnMappings>::read_from(reader)?; Ok(VertexMappings { label_id, inputs, column_mappings }) } } impl VertexMappings { pub fn new(label_id: LabelId, inputs: Vec<Input>, column_mappings: Vec<ColumnMappings>) -> Self { VertexMappings { label_id, inputs, column_mappings } } pub fn vertex_label(&self) -> LabelId { self.label_id } pub fn inputs(&self) -> &Vec<Input> { &self.inputs } pub fn take_inputs(&mut self) -> Vec<Input> { std::mem::replace(&mut self.inputs, Vec::new()) } pub fn column_mappings(&self) -> &Vec<ColumnMappings> { &self.column_mappings } } #[derive(Clone)] pub struct EdgeMappings { src_label: LabelId, edge_label: LabelId, dst_label: LabelId, inputs: Vec<Input>, src_column_mappings: Vec<ColumnMappings>, dst_column_mappings: Vec<ColumnMappings>, column_mappings: Vec<ColumnMappings>, } impl Encode for EdgeMappings { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { writer.write_u8(self.src_label); writer.write_u8(self.edge_label); writer.write_u8(self.dst_label); self.inputs.write_to(writer)?; self.src_column_mappings.write_to(writer)?; self.dst_column_mappings.write_to(writer)?; self.column_mappings.write_to(writer)?; Ok(()) } } impl Decode for EdgeMappings { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let src_label = reader.read_u8()?; let edge_label = reader.read_u8()?; let dst_label = reader.read_u8()?; let inputs = Vec::<Input>::read_from(reader)?; let src_column_mappings = Vec::<ColumnMappings>::read_from(reader)?; let dst_column_mappings = Vec::<ColumnMappings>::read_from(reader)?; let column_mappings = Vec::<ColumnMappings>::read_from(reader)?; Ok(EdgeMappings { src_label, edge_label, dst_label, inputs, src_column_mappings, dst_column_mappings, column_mappings, }) } } impl EdgeMappings { pub fn new( src_label: LabelId, edge_label: LabelId, dst_label: LabelId, inputs: Vec<Input>, src_column_mappings: Vec<ColumnMappings>, dst_column_mappings: Vec<ColumnMappings>, column_mappings: Vec<ColumnMappings>, ) -> Self { EdgeMappings { src_label, edge_label, dst_label, inputs, src_column_mappings, dst_column_mappings, column_mappings, } } pub fn src_label(&self) -> LabelId { self.src_label } pub fn edge_label(&self) -> LabelId { self.edge_label } pub fn dst_label(&self) -> LabelId { self.dst_label } pub fn inputs(&self) -> &Vec<Input> { &self.inputs } pub fn take_inputs(&mut self) -> Vec<Input> { std::mem::replace(&mut self.inputs, Vec::new()) } pub fn src_column_mappings(&self) -> &Vec<ColumnMappings> { &self.src_column_mappings } pub fn dst_column_mappings(&self) -> &Vec<ColumnMappings> { &self.dst_column_mappings } pub fn column_mappings(&self) -> &Vec<ColumnMappings> { &self.column_mappings } } #[derive(Clone)] pub struct WriteOperation { write_type: WriteType, vertex_mappings: Option<VertexMappings>, edge_mappings: Option<EdgeMappings>, } impl Debug for WriteOperation { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "This is a write operation") } } impl Encode for WriteOperation { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { match self.write_type { WriteType::Insert => writer.write_u8(0), WriteType::Delete => writer.write_u8(1), WriteType::Set => writer.write_u8(2), }; self.vertex_mappings.write_to(writer)?; self.edge_mappings.write_to(writer)?; Ok(()) } } impl Decode for WriteOperation { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let write_type = match reader.read_u8()? { 0 => WriteType::Insert, 1 => WriteType::Delete, 2 => WriteType::Set, _ => panic!("Unknown write type"), }; let vertex_mappings = Option::<VertexMappings>::read_from(reader)?; let edge_mappings = Option::<EdgeMappings>::read_from(reader)?; Ok(WriteOperation { write_type, vertex_mappings, edge_mappings }) } } unsafe impl Send for WriteOperation {} unsafe impl Sync for WriteOperation {} impl WriteOperation { pub fn insert_vertices(vertex_mappings: VertexMappings) -> Self { WriteOperation { write_type: WriteType::Insert, vertex_mappings: Some(vertex_mappings), edge_mappings: None, } } pub fn insert_edges(edge_mappings: EdgeMappings) -> Self { WriteOperation { write_type: WriteType::Insert, vertex_mappings: None, edge_mappings: Some(edge_mappings), } } pub fn delete_vertices(vertex_mappings: VertexMappings) -> Self { WriteOperation { write_type: WriteType::Delete, vertex_mappings: Some(vertex_mappings), edge_mappings: None, } } pub fn delete_edges(edge_mappings: EdgeMappings) -> Self { WriteOperation { write_type: WriteType::Delete, vertex_mappings: None, edge_mappings: Some(edge_mappings), } } pub fn set_vertices(vertex_mappings: VertexMappings) -> Self { WriteOperation { write_type: WriteType::Set, vertex_mappings: Some(vertex_mappings), edge_mappings: None, } } pub fn set_edges(edge_mappings: EdgeMappings) -> Self { WriteOperation { write_type: WriteType::Set, vertex_mappings: None, edge_mappings: Some(edge_mappings), } } pub fn write_type(&self) -> WriteType { self.write_type } pub fn has_vertex_mappings(&self) -> bool { self.vertex_mappings.is_some() } pub fn vertex_mappings(&self) -> Option<&VertexMappings> { self.vertex_mappings.as_ref() } pub fn take_vertex_mappings(&mut self) -> Option<VertexMappings> { self.vertex_mappings.take() } pub fn has_edge_mappings(&self) -> bool { self.edge_mappings.is_some() } pub fn edge_mappings(&self) -> Option<&EdgeMappings> { self.edge_mappings.as_ref() } pub fn take_edge_mappings(&mut self) -> Option<EdgeMappings> { self.edge_mappings.take() } } pub struct AliasData { pub alias_index: i32, pub column_data: Box<dyn Column>, } impl Debug for AliasData { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Alias index: {}, data: {:?}", self.alias_index, self.column_data) } } impl Encode for AliasData { fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> { writer.write_i32(self.alias_index)?; write_column(&self.column_data, writer)?; Ok(()) } } impl Decode for AliasData { fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> { let alias_index = reader.read_i32()?; let column_data = read_column(reader)?; Ok(AliasData { alias_index, column_data }) } } impl Clone for AliasData { fn clone(&self) -> Self { let column_data = clone_column(&self.column_data); AliasData { alias_index: self.alias_index, column_data } } } unsafe impl Send for AliasData {} unsafe impl Sync for AliasData {} pub fn apply_write_operations( graph: &mut GraphDB<usize, usize>, mut write_operations: Vec<WriteOperation>, parallel: u32, ) { let mut merged_delete_vertices_data: HashMap<LabelId, Vec<u64>> = HashMap::new(); for mut write_op in write_operations.drain(..) { match write_op.write_type() { WriteType::Insert => { if let Some(mut vertex_mappings) = write_op.take_vertex_mappings() { let vertex_label = vertex_mappings.vertex_label(); let inputs = vertex_mappings.inputs(); let column_mappings = vertex_mappings.column_mappings(); for input in inputs.iter() { insert_vertices(graph, vertex_label, input, column_mappings, parallel); } } if let Some(edge_mappings) = write_op.take_edge_mappings() { let src_label = edge_mappings.src_label(); let edge_label = edge_mappings.edge_label(); let dst_label = edge_mappings.dst_label(); let inputs = edge_mappings.inputs(); let src_column_mappings = edge_mappings.src_column_mappings(); let dst_column_mappings = edge_mappings.dst_column_mappings(); let column_mappings = edge_mappings.column_mappings(); for input in inputs.iter() { insert_edges( graph, src_label, edge_label, dst_label, input, src_column_mappings, dst_column_mappings, column_mappings, parallel, ); } } } WriteType::Delete => { if let Some(mut vertex_mappings) = write_op.take_vertex_mappings() { let vertex_label = vertex_mappings.vertex_label(); let inputs = vertex_mappings.take_inputs(); let column_mappings = vertex_mappings.column_mappings(); for mut input in inputs.into_iter() { match input.data_source() { DataSource::Memory => { let mut id_col = -1; for column_mapping in column_mappings { let column = column_mapping.column(); let column_index = column.index(); let property_name = column_mapping.property_name(); if property_name == "id" { id_col = column_index; break; } } if input.data_source() == DataSource::Memory { let mut memory_data = input.take_memory_data().unwrap(); let mut data = memory_data.take_columns(); let mut vertex_id_column = data .get_mut(id_col as usize) .expect("Failed to get id column"); let mut data = vertex_id_column.take_data(); if let Some(uint64_column) = data.as_any().downcast_ref::<UInt64Column>() { if let Some(mut combined_data) = merged_delete_vertices_data.get_mut(&vertex_label) { combined_data.append(&mut uint64_column.data.clone()) } else { merged_delete_vertices_data .insert(vertex_label, uint64_column.data.clone()); } } else { panic!("Unknown data type"); } } continue; } _ => {} } delete_vertices(graph, vertex_label, &input, column_mappings, parallel); } } if let Some(edge_mappings) = write_op.take_edge_mappings() { let src_label = edge_mappings.src_label(); let edge_label = edge_mappings.edge_label(); let dst_label = edge_mappings.dst_label(); let inputs = edge_mappings.inputs(); let src_column_mappings = edge_mappings.src_column_mappings(); let dst_column_mappings = edge_mappings.dst_column_mappings(); let column_mappings = edge_mappings.column_mappings(); for input in inputs.iter() { delete_edges( graph, src_label, edge_label, dst_label, input, src_column_mappings, dst_column_mappings, column_mappings, parallel, ); } } } WriteType::Set => { if let Some(mut vertex_mappings) = write_op.take_vertex_mappings() { let vertex_label = vertex_mappings.vertex_label(); let mut inputs = vertex_mappings.take_inputs(); let column_mappings = vertex_mappings.column_mappings(); for mut input in inputs.drain(..) { set_vertices(graph, vertex_label, input, column_mappings, parallel); } } if let Some(mut edge_mappings) = write_op.take_edge_mappings() { let src_label = edge_mappings.src_label(); let edge_label = edge_mappings.edge_label(); let dst_label = edge_mappings.dst_label(); let mut inputs = edge_mappings.take_inputs(); let src_column_mappings = edge_mappings.src_column_mappings(); let dst_column_mappings = edge_mappings.dst_column_mappings(); let column_mappings = edge_mappings.column_mappings(); for mut input in inputs.drain(..) { set_edges( graph, src_label, edge_label, dst_label, input, src_column_mappings, dst_column_mappings, column_mappings, parallel, ); } } } }; } for (vertex_label, vertex_ids) in merged_delete_vertices_data.into_iter() { let column_mappings = vec![ColumnMappings::new(0, "id".to_string(), DataType::ID, "id".to_string())]; let input = Input::memory(DataFrame::new_vertices_ids(vertex_ids)); delete_vertices(graph, vertex_label, &input, &column_mappings, parallel); } } fn insert_vertices<G, I>( graph: &mut GraphDB<G, I>, vertex_label: LabelId, input: &Input, column_mappings: &Vec<ColumnMappings>, parallel: u32, ) where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let mut column_map = HashMap::new(); let mut max_col = 0; for column_mapping in column_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); column_map.insert(property_name.clone(), (column_index, data_type)); if column_index >= max_col { max_col = column_index + 1; } } let mut id_col = -1; if let Some((column_index, _)) = column_map.get("id") { id_col = *column_index; } match input.data_source() { DataSource::File => { if let Some(file_input) = input.file_input() { let file_location = &file_input.location; let path = Path::new(file_location); let input_dir = path .parent() .unwrap_or(Path::new("")) .to_str() .unwrap() .to_string(); let filename = path .file_name() .expect("Can not find filename") .to_str() .unwrap_or("") .to_string(); let filenames = vec![filename]; let mut modifier = GraphModifier::new(input_dir); if file_input.header_row { modifier.skip_header(); } modifier.parallel(parallel); let mut mappings = vec![-1; max_col as usize]; if let Some(vertex_header) = graph .graph_schema .get_vertex_header(vertex_label) { for (i, (property_name, data_type)) in vertex_header.iter().enumerate() { if let Some((column_index, column_data_type)) = column_map.get(property_name) { mappings[*column_index as usize] = i as i32; } } } else { panic!("vertex label {} not found", vertex_label) } modifier .apply_vertices_insert_with_filename(graph, vertex_label, &filenames, id_col, &mappings) .unwrap(); } } DataSource::Memory => { if let Some(memory_data) = input.memory_data() { todo!() } } } } pub fn insert_edges<G, I>( graph: &mut GraphDB<G, I>, src_label: LabelId, edge_label: LabelId, dst_label: LabelId, input: &Input, src_vertex_mappings: &Vec<ColumnMappings>, dst_vertex_mappings: &Vec<ColumnMappings>, column_mappings: &Vec<ColumnMappings>, parallel: u32, ) where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let mut column_map = HashMap::new(); let mut max_col = 0; for column_mapping in src_vertex_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); if property_name == "id" { column_map.insert("src_id".to_string(), (column_index, data_type)); } if column_index >= max_col { max_col = column_index + 1; } } for column_mapping in dst_vertex_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); if property_name == "id" { column_map.insert("dst_id".to_string(), (column_index, data_type)); } if column_index >= max_col { max_col = column_index + 1; } } for column_mapping in column_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); column_map.insert(property_name.clone(), (column_index, data_type)); if column_index >= max_col { max_col = column_index + 1; } } let mut src_id_col = -1; let mut dst_id_col = -1; if let Some((column_index, _)) = column_map.get("src_id") { src_id_col = *column_index; } if let Some((column_index, _)) = column_map.get("dst_id") { dst_id_col = *column_index; } match input.data_source() { DataSource::File => { if let Some(file_input) = input.file_input() { let file_location = &file_input.location; let path = Path::new(file_location); let input_dir = path .parent() .unwrap_or(Path::new("")) .to_str() .unwrap() .to_string(); let filename = path .file_name() .expect("Can not find filename") .to_str() .unwrap_or("") .to_string(); let filenames = vec![filename]; let mut modifier = GraphModifier::new(input_dir); if file_input.header_row { modifier.skip_header(); } modifier.parallel(parallel); let mut mappings = vec![-1; max_col as usize]; if let Some(edge_header) = graph .graph_schema .get_edge_header(src_label, edge_label, dst_label) { for (i, (property_name, _)) in edge_header.iter().enumerate() { if let Some((column_index, _)) = column_map.get(property_name) { mappings[*column_index as usize] = i as i32; } } } else { panic!("edge label {}_{}_{} not found", src_label, edge_label, dst_label) } modifier .apply_edges_insert_with_filename( graph, src_label, edge_label, dst_label, &filenames, src_id_col, dst_id_col, &mappings, ) .unwrap(); } } DataSource::Memory => { if let Some(memory_data) = input.memory_data() { todo!() } } } } pub fn delete_vertices( graph: &mut GraphDB<usize, usize>, vertex_label: LabelId, input: &Input, column_mappings: &Vec<ColumnMappings>, parallel: u32, ) { let mut column_map = HashMap::new(); for column_mapping in column_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); column_map.insert(property_name.clone(), (column_index, data_type)); } let mut id_col = -1; if let Some((column_index, _)) = column_map.get("id") { id_col = *column_index; } match input.data_source() { DataSource::File => { if let Some(file_input) = input.file_input() { let file_location = &file_input.location; let path = Path::new(file_location); let input_dir = path .parent() .unwrap_or(Path::new("")) .to_str() .unwrap() .to_string(); let filename = path .file_name() .expect("Can not find filename") .to_str() .unwrap_or("") .to_string(); let filenames = vec![filename]; let mut modifier = GraphModifier::new(input_dir); if file_input.header_row { modifier.skip_header(); } modifier.parallel(parallel); modifier .apply_vertices_delete_with_filename(graph, vertex_label, &filenames, id_col) .unwrap(); } } DataSource::Memory => { if let Some(memory_data) = input.memory_data() { let data = memory_data.columns(); let vertex_id_column = data .get(id_col as usize) .expect("Failed to get id column"); if let Some(uint64_column) = vertex_id_column .data() .as_any() .downcast_ref::<UInt64Column>() { let data = uint64_column .data .iter() .map(|&x| x as usize) .collect(); delete_vertices_by_ids(graph, vertex_label, &data, parallel); } } } } } pub fn delete_edges( graph: &mut GraphDB<usize, usize>, src_label: LabelId, edge_label: LabelId, dst_label: LabelId, input: &Input, src_vertex_mappings: &Vec<ColumnMappings>, dst_vertex_mappings: &Vec<ColumnMappings>, column_mappings: &Vec<ColumnMappings>, parallel: u32, ) { let mut column_map = HashMap::new(); for column_mapping in src_vertex_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); if property_name == "id" { column_map.insert("src_id".to_string(), (column_index, data_type)); } } for column_mapping in dst_vertex_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); if property_name == "id" { column_map.insert("dst_id".to_string(), (column_index, data_type)); } } for column_mapping in column_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); column_map.insert(property_name.clone(), (column_index, data_type)); } let mut src_id_col = -1; let mut dst_id_col = -1; if let Some((column_index, _)) = column_map.get("src_id") { src_id_col = *column_index; } if let Some((column_index, _)) = column_map.get("dst_id") { dst_id_col = *column_index; } match input.data_source() { DataSource::File => { if let Some(file_input) = input.file_input() { let file_location = &file_input.location; let path = Path::new(file_location); let input_dir = path .parent() .unwrap_or(Path::new("")) .to_str() .unwrap() .to_string(); let filename = path .file_name() .expect("Can not find filename") .to_str() .unwrap_or("") .to_string(); let filenames = vec![filename]; let mut modifier = GraphModifier::new(input_dir); if file_input.header_row { modifier.skip_header(); } modifier.parallel(parallel); modifier .apply_edges_delete_with_filename( graph, src_label, edge_label, dst_label, &filenames, src_id_col, dst_id_col, ) .unwrap(); } } DataSource::Memory => { if let Some(memory_data) = input.memory_data() { todo!() } } } } pub fn delete_vertices_by_ids<G, I>( graph: &mut GraphDB<G, I>, vertex_label: LabelId, global_ids: &Vec<G>, parallel: u32, ) where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let mut lids = HashSet::new(); for v in global_ids.iter() { if v.index() as u64 == u64::MAX { continue; } if let Some(internal_id) = graph.vertex_map.get_internal_id(*v) { lids.insert(internal_id.1); } } let vertex_label_num = graph.vertex_label_num; let edge_label_num = graph.edge_label_num; for e_label_i in 0..edge_label_num { for src_label_i in 0..vertex_label_num { if graph .graph_schema .get_edge_header(src_label_i as LabelId, e_label_i as LabelId, vertex_label as LabelId) .is_none() { continue; } let index = graph.edge_label_to_index( src_label_i as LabelId, vertex_label as LabelId, e_label_i as LabelId, Direction::Outgoing, ); let mut ie_csr = std::mem::replace(&mut graph.ie[index], Box::new(BatchMutableSingleCsr::new())); let mut ie_prop = graph.ie_edge_prop_table.remove(&index); let mut oe_csr = std::mem::replace(&mut graph.oe[index], Box::new(BatchMutableSingleCsr::new())); let mut oe_prop = graph.oe_edge_prop_table.remove(&index); let mut ie_to_delete = Vec::new(); for v in lids.iter() { if let Some(ie_list) = ie_csr.get_edges(*v) { for e in ie_list { ie_to_delete.push((*e, *v)); } } } ie_csr.delete_vertices(&lids); if let Some(table) = oe_prop.as_mut() { oe_csr.parallel_delete_edges_with_props(&ie_to_delete, false, table, parallel); } else { oe_csr.parallel_delete_edges(&ie_to_delete, false, parallel); } graph.ie[index] = ie_csr; if let Some(table) = ie_prop { graph.ie_edge_prop_table.insert(index, table); } graph.oe[index] = oe_csr; if let Some(table) = oe_prop { graph.oe_edge_prop_table.insert(index, table); } } for dst_label_i in 0..vertex_label_num { if graph .graph_schema .get_edge_header(vertex_label as LabelId, e_label_i as LabelId, dst_label_i as LabelId) .is_none() { continue; } let index = graph.edge_label_to_index( vertex_label as LabelId, dst_label_i as LabelId, e_label_i as LabelId, Direction::Outgoing, ); let mut ie_csr = std::mem::replace(&mut graph.ie[index], Box::new(BatchMutableSingleCsr::new())); let mut ie_prop = graph.ie_edge_prop_table.remove(&index); let mut oe_csr = std::mem::replace(&mut graph.oe[index], Box::new(BatchMutableSingleCsr::new())); let mut oe_prop = graph.oe_edge_prop_table.remove(&index); let mut oe_to_delete = Vec::new(); for v in lids.iter() { if let Some(oe_list) = oe_csr.get_edges(*v) { for e in oe_list { oe_to_delete.push((*v, *e)); } } } oe_csr.delete_vertices(&lids); if let Some(table) = ie_prop.as_mut() { ie_csr.parallel_delete_edges_with_props(&oe_to_delete, true, table, parallel); } else { ie_csr.parallel_delete_edges(&oe_to_delete, true, parallel); } graph.ie[index] = ie_csr; if let Some(table) = ie_prop { graph.ie_edge_prop_table.insert(index, table); } graph.oe[index] = oe_csr; if let Some(table) = oe_prop { graph.oe_edge_prop_table.insert(index, table); } } } // delete vertices for v in lids.iter() { graph.vertex_map.remove_vertex(vertex_label, v); } } pub fn set_vertices( graph: &mut GraphDB<usize, usize>, vertex_label: LabelId, mut input: Input, column_mappings: &Vec<ColumnMappings>, parallel: u32, ) { let mut column_map = HashMap::new(); for column_mapping in column_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); column_map.insert(property_name.clone(), (column_index, data_type)); } let mut id_col = -1; if let Some((column_index, _)) = column_map.get("id") { id_col = *column_index; } match input.data_source() { DataSource::File => { todo!() } DataSource::Memory => { if let Some(mut memory_data) = input.take_memory_data() { let mut column_data = memory_data.take_columns(); let id_column = column_data .get_mut(id_col as usize) .expect("Failed to find id column"); let mut data = id_column.take_data(); let global_ids = { if let Some(id_column) = data.as_any().downcast_ref::<IDColumn>() { id_column.data.clone() } else if let Some(uint64_column) = data.as_any().downcast_ref::<UInt64Column>() { let mut lid = vec![]; for i in uint64_column.data.iter() { lid.push(graph.get_internal_id(*i as usize)); } lid } else { panic!("DataType of id col is not VertexId") } }; for (k, v) in column_map.iter() { if k == "id" { continue; } let column_index = v.0; let column_data_type = v.1; graph.init_vertex_index_prop(k.clone(), vertex_label, column_data_type); let column = column_data .get_mut(column_index as usize) .expect("Failed to find column"); graph.set_vertex_index_prop(k.clone(), vertex_label, &global_ids, column.take_data()); } } } } } pub fn set_edges( graph: &mut GraphDB<usize, usize>, src_label: LabelId, edge_label: LabelId, dst_label: LabelId, mut input: Input, src_vertex_mappings: &Vec<ColumnMappings>, dst_vertex_mappings: &Vec<ColumnMappings>, column_mappings: &Vec<ColumnMappings>, parallel: u32, ) { let mut column_map = HashMap::new(); for column_mapping in column_mappings { let column = column_mapping.column(); let column_index = column.index(); let data_type = column.data_type(); let property_name = column_mapping.property_name(); column_map.insert(property_name.clone(), (column_index, data_type)); } match input.data_source() { DataSource::File => { todo!() } DataSource::Memory => { if let Some(mut memory_data) = input.take_memory_data() { let mut column_data = memory_data.take_columns(); if !src_vertex_mappings.is_empty() { let offset_col_id = src_vertex_mappings[0].column().index(); let offset_column = column_data .get_mut(offset_col_id as usize) .expect("Failed to find id column"); let mut data = offset_column.take_data(); let offsets = { if let Some(id_column) = data.as_any().downcast_ref::<IDColumn>() { id_column.data.clone() } else { panic!("DataType of id col is not VertexId") } }; for (k, v) in column_map.iter() { let column_index = v.0; let column_data_type = v.1; graph.init_edge_index_prop( k.clone(), src_label, edge_label, dst_label, column_data_type, ); let mut column = column_data .get_mut(column_index as usize) .expect("Failed to find column"); graph.set_edge_index_prop( k.clone(), src_label, edge_label, dst_label, None, None, Some(&offsets), Some(column.take_data()), ); } } if !dst_vertex_mappings.is_empty() { let offset_col_id = dst_vertex_mappings[0].column().index(); let offset_column = column_data .get_mut(offset_col_id as usize) .expect("Failed to find id column"); let mut data = offset_column.take_data(); let offsets = { if let Some(id_column) = data.as_any().downcast_ref::<IDColumn>() { id_column.data.clone() } else { panic!("DataType of id col is not VertexId") } }; for (k, v) in column_map.iter() { let column_index = v.0; let column_data_type = v.1; graph.init_edge_index_prop( k.clone(), src_label, edge_label, dst_label, column_data_type, ); let mut column = column_data .get_mut(column_index as usize) .expect("Failed to find column"); graph.set_edge_index_prop( k.clone(), src_label, edge_label, dst_label, Some(&offsets), Some(column.take_data()), None, None, ); } } } } } } fn process_csv_rows<F>(path: &PathBuf, mut process_row: F, skip_header: bool, delim: u8) where F: FnMut(&csv::StringRecord), { if let Some(path_str) = path.clone().to_str() { if path_str.ends_with(".csv.gz") { if let Ok(gz_reader) = GzReader::from_path(&path) { let mut rdr = ReaderBuilder::new() .delimiter(delim) .buffer_capacity(4096) .comment(Some(b'#')) .flexible(true) .has_headers(skip_header) .from_reader(gz_reader); for result in rdr.records() { if let Ok(record) = result { process_row(&record); } } } } else if path_str.ends_with(".csv") { if let Ok(file) = File::open(&path) { let reader = BufReader::new(file); let mut rdr = ReaderBuilder::new() .delimiter(delim) .buffer_capacity(4096) .comment(Some(b'#')) .flexible(true) .has_headers(skip_header) .from_reader(reader); for result in rdr.records() { if let Ok(record) = result { process_row(&record); } } } } } } pub struct DeleteGenerator<G: FromStr + Send + Sync + IndexType + std::fmt::Display = DefaultId> { input_dir: PathBuf, delim: u8, skip_header: bool, persons: Vec<(String, G)>, comments: Vec<(String, G)>, posts: Vec<(String, G)>, forums: Vec<(String, G)>, person_set: HashSet<G>, comment_set: HashSet<G>, post_set: HashSet<G>, forum_set: HashSet<G>, } impl<G: FromStr + Send + Sync + IndexType + Eq + std::fmt::Display> DeleteGenerator<G> { pub fn new(input_dir: &PathBuf) -> DeleteGenerator<G> { Self { input_dir: input_dir.clone(), delim: b'|', skip_header: false, persons: vec![], comments: vec![], posts: vec![], forums: vec![], person_set: HashSet::new(), comment_set: HashSet::new(), post_set: HashSet::new(), forum_set: HashSet::new(), } } fn load_vertices(&self, input_prefix: PathBuf, label: LabelId) -> Vec<(String, G)> { let mut ret = vec![]; let suffixes = vec!["*.csv.gz".to_string(), "*.csv".to_string()]; let files = get_files_list(&input_prefix, &suffixes); if files.is_err() { warn!( "Get vertex files {:?}/{:?} failed: {:?}", &input_prefix, &suffixes, files.err().unwrap() ); return ret; } let files = files.unwrap(); if files.is_empty() { return ret; } let parser = LDBCVertexParser::<G>::new(label, 1); for file in files { process_csv_rows( &file, |record| { let vertex_meta = parser.parse_vertex_meta(&record); ret.push(( record .get(0) .unwrap() .parse::<String>() .unwrap(), vertex_meta.global_id, )); }, self.skip_header, self.delim, ); } ret } pub fn with_delimiter(mut self, delim: u8) -> Self { self.delim = delim; self } pub fn skip_header(&mut self) { self.skip_header = true; } fn iterate_persons<I>(&mut self, graph: &GraphDB<G, I>) where I: Send + Sync + IndexType, { let person_label = graph .graph_schema .get_vertex_label_id("PERSON") .unwrap(); let comment_label = graph .graph_schema .get_vertex_label_id("COMMENT") .unwrap(); let post_label = graph .graph_schema .get_vertex_label_id("POST") .unwrap(); let forum_label = graph .graph_schema .get_vertex_label_id("FORUM") .unwrap(); let hasCreator_label = graph .graph_schema .get_edge_label_id("HASCREATOR") .unwrap(); let hasModerator_label = graph .graph_schema .get_edge_label_id("HASMODERATOR") .unwrap(); let comment_hasCreator_person = graph.get_sub_graph(person_label, hasCreator_label, comment_label, Direction::Incoming); let post_hasCreator_person = graph.get_sub_graph(person_label, hasCreator_label, post_label, Direction::Incoming); let forum_hasModerator_person = graph.get_sub_graph(person_label, hasModerator_label, forum_label, Direction::Incoming); let forum_title_column = graph.vertex_prop_table[forum_label as usize] .get_column_by_name("title") .as_any() .downcast_ref::<StringColumn>() .unwrap(); for (dt, id) in self.persons.iter() { if let Some((got_label, lid)) = graph.vertex_map.get_internal_id(*id) { if got_label != person_label { warn!("Vertex {} is not a person", LDBCVertexParser::<G>::get_original_id(*id)); continue; } for e in comment_hasCreator_person .get_adj_list(lid) .unwrap() { let oid = graph .vertex_map .get_global_id(comment_label, *e) .unwrap(); self.comments.push((dt.clone(), oid)); } for e in post_hasCreator_person .get_adj_list(lid) .unwrap() { let oid = graph .vertex_map .get_global_id(post_label, *e) .unwrap(); self.posts.push((dt.clone(), oid)); } for e in forum_hasModerator_person .get_adj_list(lid) .unwrap() { let title = forum_title_column.get(e.index()).unwrap(); let title_string = title.to_string(); if title_string.starts_with("Album") || title_string.starts_with("Wall") { let oid = graph .vertex_map .get_global_id(forum_label, *e) .unwrap(); self.forums.push((dt.clone(), oid)); } } } else { warn!("Vertex Person - {} does not exist", LDBCVertexParser::<G>::get_original_id(*id)); continue; } } } fn iterate_forums<I>(&mut self, graph: &GraphDB<G, I>) where I: Send + Sync + IndexType, { let forum_label = graph .graph_schema .get_vertex_label_id("FORUM") .unwrap(); let post_label = graph .graph_schema .get_vertex_label_id("POST") .unwrap(); let containerOf_label = graph .graph_schema .get_edge_label_id("CONTAINEROF") .unwrap(); let forum_containerOf_post = graph.get_sub_graph(forum_label, containerOf_label, post_label, Direction::Outgoing); for (dt, id) in self.forums.iter() { if let Some((got_label, lid)) = graph.vertex_map.get_internal_id(*id) { if got_label != forum_label { warn!("Vertex {} is not a forum", LDBCVertexParser::<G>::get_original_id(*id)); continue; } for e in forum_containerOf_post .get_adj_list(lid) .unwrap() { let oid = graph .vertex_map .get_global_id(post_label, *e) .unwrap(); self.posts.push((dt.clone(), oid)); } } else { warn!("Vertex Forum - {} does not exist", LDBCVertexParser::<G>::get_original_id(*id)); continue; } } } fn iterate_posts<I>(&mut self, graph: &GraphDB<G, I>) where I: Send + Sync + IndexType, { let post_label = graph .graph_schema .get_vertex_label_id("POST") .unwrap(); let comment_label = graph .graph_schema .get_vertex_label_id("COMMENT") .unwrap(); let replyOf_label = graph .graph_schema .get_edge_label_id("REPLYOF") .unwrap(); let comment_replyOf_post = graph.get_sub_graph(post_label, replyOf_label, comment_label, Direction::Incoming); for (dt, id) in self.posts.iter() { if let Some((got_label, lid)) = graph.vertex_map.get_internal_id(*id) { if got_label != post_label { warn!("Vertex {} is not a post", LDBCVertexParser::<G>::get_original_id(*id)); continue; } for e in comment_replyOf_post.get_adj_list(lid).unwrap() { let oid = graph .vertex_map .get_global_id(comment_label, *e) .unwrap(); self.comments.push((dt.clone(), oid)); } } else { warn!("Vertex Post - {} does not exist", LDBCVertexParser::<G>::get_original_id(*id)); continue; } } } fn iterate_comments<I>(&mut self, graph: &GraphDB<G, I>) where I: Send + Sync + IndexType, { let comment_label = graph .graph_schema .get_vertex_label_id("COMMENT") .unwrap(); let replyOf_label = graph .graph_schema .get_edge_label_id("REPLYOF") .unwrap(); let comment_replyOf_comment = graph.get_sub_graph(comment_label, replyOf_label, comment_label, Direction::Incoming); let mut index = 0; while index < self.comments.len() { let (dt, id) = self.comments[index].clone(); if let Some((got_label, lid)) = graph.vertex_map.get_internal_id(id) { if got_label != comment_label { warn!("Vertex {} is not a comment", LDBCVertexParser::<G>::get_original_id(id)); index += 1; continue; } for e in comment_replyOf_comment .get_adj_list(lid) .unwrap() { let oid = graph .vertex_map .get_global_id(comment_label, *e) .unwrap(); self.comments.push((dt.clone(), oid)); } index += 1; } else { warn!("Vertex Comment - {} does not exist", LDBCVertexParser::<G>::get_original_id(id)); index += 1; continue; } } } pub fn generate<I>(&mut self, graph: &GraphDB<G, I>, batch_id: &str) where I: Send + Sync + IndexType, { let output_dir = self .input_dir .join("extra_deletes") .join("dynamic"); std::fs::create_dir_all(&output_dir).unwrap(); let prefix = self.input_dir.join("deletes").join("dynamic"); let person_label = graph .graph_schema .get_vertex_label_id("PERSON") .unwrap(); self.persons = self.load_vertices( prefix .clone() .join("Person") .join(format!("batch_id={}", batch_id)), person_label, ); self.person_set = self.persons.iter().map(|(_, id)| *id).collect(); let comment_label = graph .graph_schema .get_vertex_label_id("COMMENT") .unwrap(); self.comments = self.load_vertices( prefix .clone() .join("Comment") .join(format!("batch_id={}", batch_id)), comment_label, ); self.comment_set = self .comments .iter() .map(|(_, id)| *id) .collect(); let post_label = graph .graph_schema .get_vertex_label_id("POST") .unwrap(); self.posts = self.load_vertices( prefix .clone() .join("Post") .join(format!("batch_id={}", batch_id)), post_label, ); self.post_set = self.posts.iter().map(|(_, id)| *id).collect(); let forum_label = graph .graph_schema .get_vertex_label_id("FORUM") .unwrap(); self.forums = self.load_vertices( prefix .clone() .join("Forum") .join(format!("batch_id={}", batch_id)), forum_label, ); self.forum_set = self.forums.iter().map(|(_, id)| *id).collect(); self.iterate_persons(graph); self.iterate_forums(graph); self.iterate_posts(graph); self.iterate_comments(graph); let batch_dir = format!("batch_id={}", batch_id); let person_dir_path = output_dir .clone() .join("Person") .join(&batch_dir); std::fs::create_dir_all(&person_dir_path).unwrap(); let mut person_file = File::create(person_dir_path.join("part-0.csv")).unwrap(); writeln!(person_file, "deletionDate|id").unwrap(); for (dt, id) in self.persons.iter() { if !self.person_set.contains(id) { self.person_set.insert(*id); writeln!(person_file, "{}|{}", dt, LDBCVertexParser::<G>::get_original_id(*id)).unwrap(); } } let forum_dir_path = output_dir .clone() .join("Forum") .join(&batch_dir); std::fs::create_dir_all(&forum_dir_path).unwrap(); let mut forum_file = File::create(forum_dir_path.join("part-0.csv")).unwrap(); writeln!(forum_file, "deletionDate|id").unwrap(); for (dt, id) in self.forums.iter() { if !self.forum_set.contains(id) { self.forum_set.insert(*id); writeln!(forum_file, "{}|{}", dt, LDBCVertexParser::<G>::get_original_id(*id)).unwrap(); } } let post_dir_path = output_dir.clone().join("Post").join(&batch_dir); std::fs::create_dir_all(&post_dir_path).unwrap(); let mut post_file = File::create(post_dir_path.join("part-0.csv")).unwrap(); writeln!(post_file, "deletionDate|id").unwrap(); for (dt, id) in self.posts.iter() { if !self.post_set.contains(id) { self.post_set.insert(*id); writeln!(post_file, "{}|{}", dt, LDBCVertexParser::<G>::get_original_id(*id)).unwrap(); } } let comment_dir_path = output_dir .clone() .join("Comment") .join(&batch_dir); std::fs::create_dir_all(&comment_dir_path).unwrap(); let mut comment_file = File::create(comment_dir_path.join("part-0.csv")).unwrap(); writeln!(comment_file, "deletionDate|id").unwrap(); for (dt, id) in self.comments.iter() { if !self.comment_set.contains(id) { self.comment_set.insert(*id); writeln!(comment_file, "{}|{}", dt, LDBCVertexParser::<G>::get_original_id(*id)).unwrap(); } } } } pub struct GraphModifier { input_dir: PathBuf, delim: u8, skip_header: bool, parallel: u32, } struct CsrRep<I> { src_label: LabelId, edge_label: LabelId, dst_label: LabelId, ie_csr: Box<dyn CsrTrait<I>>, ie_prop: Option<ColTable>, oe_csr: Box<dyn CsrTrait<I>>, oe_prop: Option<ColTable>, } impl GraphModifier { pub fn new<D: AsRef<Path>>(input_dir: D) -> GraphModifier { Self { input_dir: input_dir.as_ref().to_path_buf(), delim: b'|', skip_header: false, parallel: 0 } } pub fn with_delimiter(mut self, delim: u8) -> Self { self.delim = delim; self } pub fn skip_header(&mut self) { self.skip_header = true; } pub fn parallel(&mut self, parallel: u32) { self.parallel = parallel; } fn take_csr<G, I>( &self, graph: &mut GraphDB<G, I>, src_label_i: LabelId, dst_label_i: LabelId, e_label_i: LabelId, ) -> CsrRep<I> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let index = graph.edge_label_to_index(src_label_i, dst_label_i, e_label_i, Direction::Outgoing); CsrRep { src_label: src_label_i, edge_label: e_label_i, dst_label: dst_label_i, ie_csr: std::mem::replace(&mut graph.ie[index], Box::new(BatchMutableSingleCsr::new())), ie_prop: graph.ie_edge_prop_table.remove(&index), oe_csr: std::mem::replace(&mut graph.oe[index], Box::new(BatchMutableSingleCsr::new())), oe_prop: graph.oe_edge_prop_table.remove(&index), } } fn take_csrs_with_label<G, I>(&self, graph: &mut GraphDB<G, I>, label: LabelId) -> Vec<CsrRep<I>> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let vertex_label_num = graph.vertex_label_num; let edge_label_num = graph.edge_label_num; let mut results = vec![]; for e_label_i in 0..edge_label_num { for label_i in 0..vertex_label_num { if !graph .graph_schema .get_edge_header(label as LabelId, e_label_i as LabelId, label_i as LabelId) .is_none() { let index = graph.edge_label_to_index( label as LabelId, label_i as LabelId, e_label_i as LabelId, Direction::Outgoing, ); results.push(CsrRep { src_label: label as LabelId, edge_label: e_label_i as LabelId, dst_label: label_i as LabelId, ie_csr: std::mem::replace( &mut graph.ie[index], Box::new(BatchMutableSingleCsr::new()), ), ie_prop: graph.ie_edge_prop_table.remove(&index), oe_csr: std::mem::replace( &mut graph.oe[index], Box::new(BatchMutableSingleCsr::new()), ), oe_prop: graph.oe_edge_prop_table.remove(&index), }); } if !graph .graph_schema .get_edge_header(label_i as LabelId, e_label_i as LabelId, label as LabelId) .is_none() { if label_i as LabelId != label { let index = graph.edge_label_to_index( label_i as LabelId, label as LabelId, e_label_i as LabelId, Direction::Outgoing, ); results.push(CsrRep { src_label: label_i as LabelId, edge_label: e_label_i as LabelId, dst_label: label as LabelId, ie_csr: std::mem::replace( &mut graph.ie[index], Box::new(BatchMutableSingleCsr::new()), ), ie_prop: graph.ie_edge_prop_table.remove(&index), oe_csr: std::mem::replace( &mut graph.oe[index], Box::new(BatchMutableSingleCsr::new()), ), oe_prop: graph.oe_edge_prop_table.remove(&index), }); } } } } results } fn take_csrs<G, I>(&self, graph: &mut GraphDB<G, I>) -> Vec<CsrRep<I>> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let vertex_label_num = graph.vertex_label_num; let edge_label_num = graph.edge_label_num; let mut results = vec![]; for e_label_i in 0..edge_label_num { for src_label_i in 0..vertex_label_num { for dst_label_i in 0..vertex_label_num { if graph .graph_schema .get_edge_header( src_label_i as LabelId, e_label_i as LabelId, dst_label_i as LabelId, ) .is_none() { continue; } let index = graph.edge_label_to_index( src_label_i as LabelId, dst_label_i as LabelId, e_label_i as LabelId, Direction::Outgoing, ); results.push(CsrRep { src_label: src_label_i as LabelId, edge_label: e_label_i as LabelId, dst_label: dst_label_i as LabelId, ie_csr: std::mem::replace( &mut graph.ie[index], Box::new(BatchMutableSingleCsr::new()), ), ie_prop: graph.ie_edge_prop_table.remove(&index), oe_csr: std::mem::replace( &mut graph.oe[index], Box::new(BatchMutableSingleCsr::new()), ), oe_prop: graph.oe_edge_prop_table.remove(&index), }); } } } results } fn set_csr<G, I>(&self, graph: &mut GraphDB<G, I>, reps: CsrRep<I>) where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let index = graph.edge_label_to_index(reps.src_label, reps.dst_label, reps.edge_label, Direction::Outgoing); graph.ie[index] = reps.ie_csr; if let Some(table) = reps.ie_prop { graph.ie_edge_prop_table.insert(index, table); } graph.oe[index] = reps.oe_csr; if let Some(table) = reps.oe_prop { graph.oe_edge_prop_table.insert(index, table); } } fn set_csrs<G, I>(&self, graph: &mut GraphDB<G, I>, mut reps: Vec<CsrRep<I>>) where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { for result in reps.drain(..) { let index = graph.edge_label_to_index( result.src_label, result.dst_label, result.edge_label, Direction::Outgoing, ); graph.ie[index] = result.ie_csr; if let Some(table) = result.ie_prop { graph.ie_edge_prop_table.insert(index, table); } graph.oe[index] = result.oe_csr; if let Some(table) = result.oe_prop { graph.oe_edge_prop_table.insert(index, table); } } } fn parallel_delete_rep<G, I>( &self, input: &mut CsrRep<I>, graph: &GraphDB<G, I>, edge_file_strings: &Vec<String>, input_header: &[(String, DataType)], delete_sets: &Vec<HashSet<I>>, p: u32, ) where G: FromStr + Send + Sync + IndexType + Eq, I: Send + Sync + IndexType, { let src_label = input.src_label; let edge_label = input.edge_label; let dst_label = input.dst_label; let graph_header = graph .graph_schema .get_edge_header(src_label, edge_label, dst_label); if graph_header.is_none() { return (); } let src_delete_set = &delete_sets[src_label as usize]; let dst_delete_set = &delete_sets[dst_label as usize]; let mut delete_edge_set = Vec::new(); let mut src_col_id = 0; let mut dst_col_id = 1; for (index, (n, _)) in input_header.iter().enumerate() { if n == "start_id" { src_col_id = index; } if n == "end_id" { dst_col_id = index; } } let mut parser = LDBCEdgeParser::<G>::new(src_label, dst_label, edge_label); parser.with_endpoint_col_id(src_col_id, dst_col_id); let edge_files = get_files_list(&self.input_dir.clone(), edge_file_strings); if edge_files.is_err() { return (); } let edge_files = edge_files.unwrap(); for edge_file in edge_files.iter() { process_csv_rows( edge_file, |record| { let edge_meta = parser.parse_edge_meta(&record); if let Some((got_src_label, src_lid)) = graph .vertex_map .get_internal_id(edge_meta.src_global_id) { if let Some((got_dst_label, dst_lid)) = graph .vertex_map .get_internal_id(edge_meta.dst_global_id) { if got_src_label != src_label || got_dst_label != dst_label { return; } if src_delete_set.contains(&src_lid) || dst_delete_set.contains(&dst_lid) { return; } delete_edge_set.push((src_lid, dst_lid)); } } }, self.skip_header, self.delim, ); } if src_delete_set.is_empty() && dst_delete_set.is_empty() && delete_edge_set.is_empty() { return (); } let mut oe_to_delete = Vec::new(); let mut ie_to_delete = Vec::new(); for v in src_delete_set.iter() { if let Some(oe_list) = input.oe_csr.get_edges(*v) { for e in oe_list { if !dst_delete_set.contains(e) { oe_to_delete.push((*v, *e)); } } } } for v in dst_delete_set.iter() { if let Some(ie_list) = input.ie_csr.get_edges(*v) { for e in ie_list { if !src_delete_set.contains(e) { ie_to_delete.push((*e, *v)); } } } } input.oe_csr.delete_vertices(src_delete_set); if let Some(table) = input.oe_prop.as_mut() { input .oe_csr .parallel_delete_edges_with_props(&delete_edge_set, false, table, p); input .oe_csr .parallel_delete_edges_with_props(&ie_to_delete, false, table, p); } else { input .oe_csr .parallel_delete_edges(&delete_edge_set, false, p); input .oe_csr .parallel_delete_edges(&ie_to_delete, false, p); } input.ie_csr.delete_vertices(dst_delete_set); if let Some(table) = input.ie_prop.as_mut() { input .ie_csr .parallel_delete_edges_with_props(&delete_edge_set, true, table, p); input .ie_csr .parallel_delete_edges_with_props(&oe_to_delete, true, table, p); } else { input .ie_csr .parallel_delete_edges(&delete_edge_set, true, p); input .ie_csr .parallel_delete_edges(&oe_to_delete, true, p); } } pub fn apply_vertices_delete_with_filename<G, I>( &mut self, graph: &mut GraphDB<G, I>, label: LabelId, filenames: &Vec<String>, id_col: i32, ) -> GDBResult<()> where G: FromStr + Send + Sync + IndexType + Eq, I: Send + Sync + IndexType, { let mut delete_sets = vec![HashSet::new(); graph.vertex_label_num as usize]; let mut delete_set = HashSet::new(); info!("Deleting vertex - {}", graph.graph_schema.vertex_label_names()[label as usize]); let vertex_files_prefix = self.input_dir.clone(); let vertex_files = get_files_list(&vertex_files_prefix, filenames).unwrap(); if vertex_files.is_empty() { return Ok(()); } let parser = LDBCVertexParser::<G>::new(label as LabelId, id_col as usize); for vertex_file in vertex_files.iter() { process_csv_rows( vertex_file, |record| { let vertex_meta = parser.parse_vertex_meta(&record); let (got_label, lid) = graph .vertex_map .get_internal_id(vertex_meta.global_id) .unwrap(); if got_label == label as LabelId { delete_set.insert(lid); } }, self.skip_header, self.delim, ); } delete_sets[label as usize] = delete_set; let mut input_reps = self.take_csrs_with_label(graph, label); input_reps.iter_mut().for_each(|rep| { let edge_file_strings = vec![]; let input_header = graph .graph_schema .get_edge_header(rep.src_label, rep.edge_label, rep.dst_label) .unwrap(); self.parallel_delete_rep( rep, graph, &edge_file_strings, &input_header, &delete_sets, self.parallel, ); }); self.set_csrs(graph, input_reps); let delete_set = &delete_sets[label as usize]; for v in delete_set.iter() { graph.vertex_map.remove_vertex(label, v); } Ok(()) } pub fn apply_edges_delete_with_filename<G, I>( &mut self, graph: &mut GraphDB<G, I>, src_label: LabelId, edge_label: LabelId, dst_label: LabelId, filenames: &Vec<String>, src_id_col: i32, dst_id_col: i32, ) -> GDBResult<()> where G: FromStr + Send + Sync + IndexType + Eq, I: Send + Sync + IndexType, { let mut input_resp = self.take_csr(graph, src_label, dst_label, edge_label); let mut input_header: Vec<(String, DataType)> = vec![]; input_header.resize( std::cmp::max(src_id_col as usize, dst_id_col as usize) + 1, ("".to_string(), DataType::NULL), ); input_header[src_id_col as usize] = ("start_id".to_string(), DataType::ID); input_header[dst_id_col as usize] = ("end_id".to_string(), DataType::ID); let delete_sets = vec![HashSet::new(); graph.vertex_label_num as usize]; self.parallel_delete_rep( &mut input_resp, graph, filenames, &input_header, &delete_sets, self.parallel, ); self.set_csr(graph, input_resp); Ok(()) } fn apply_deletes<G, I>( &mut self, graph: &mut GraphDB<G, I>, delete_schema: &InputSchema, ) -> GDBResult<()> where G: FromStr + Send + Sync + IndexType + Eq, I: Send + Sync + IndexType, { let vertex_label_num = graph.vertex_label_num; let mut delete_sets = vec![]; for v_label_i in 0..vertex_label_num { let mut delete_set = HashSet::new(); if let Some(vertex_file_strings) = delete_schema.get_vertex_file(v_label_i as LabelId) { if !vertex_file_strings.is_empty() { info!( "Deleting vertex - {}", graph.graph_schema.vertex_label_names()[v_label_i as usize] ); let vertex_files_prefix = self.input_dir.clone(); let vertex_files = get_files_list_beta(&vertex_files_prefix, &vertex_file_strings); if vertex_files.is_empty() { delete_sets.push(delete_set); continue; } let input_header = delete_schema .get_vertex_header(v_label_i as LabelId) .unwrap(); let mut id_col = 0; for (index, (n, _)) in input_header.iter().enumerate() { if n == "id" { id_col = index; break; } } let parser = LDBCVertexParser::<G>::new(v_label_i as LabelId, id_col); for vertex_file in vertex_files.iter() { process_csv_rows( vertex_file, |record| { let vertex_meta = parser.parse_vertex_meta(&record); let (got_label, lid) = graph .vertex_map .get_internal_id(vertex_meta.global_id) .unwrap(); if got_label == v_label_i as LabelId { delete_set.insert(lid); } }, self.skip_header, self.delim, ); } } } delete_sets.push(delete_set); } let mut input_reps = self.take_csrs(graph); input_reps.iter_mut().for_each(|rep| { let default_vec: Vec<String> = vec![]; let edge_file_strings = delete_schema .get_edge_file(rep.src_label, rep.edge_label, rep.dst_label) .unwrap_or_else(|| &default_vec); let input_header = delete_schema .get_edge_header(rep.src_label, rep.edge_label, rep.dst_label) .unwrap_or_else(|| &[]); self.parallel_delete_rep( rep, graph, &edge_file_strings, &input_header, &delete_sets, self.parallel, ); }); self.set_csrs(graph, input_reps); for v_label_i in 0..vertex_label_num { let delete_set = &delete_sets[v_label_i as usize]; if delete_set.is_empty() { continue; } for v in delete_set.iter() { graph .vertex_map .remove_vertex(v_label_i as LabelId, v); } } Ok(()) } pub fn apply_vertices_insert_with_filename<G, I>( &mut self, graph: &mut GraphDB<G, I>, label: LabelId, filenames: &Vec<String>, id_col: i32, mappings: &Vec<i32>, ) -> GDBResult<()> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let graph_header = graph .graph_schema .get_vertex_header(label as LabelId) .unwrap(); let header = graph_header.to_vec(); let parser = LDBCVertexParser::<G>::new(label as LabelId, id_col as usize); let vertex_files_prefix = self.input_dir.clone(); let vertex_files = get_files_list(&vertex_files_prefix, filenames); if vertex_files.is_err() { warn!( "Get vertex files {:?}/{:?} failed: {:?}", &vertex_files_prefix, filenames, vertex_files.err().unwrap() ); return Ok(()); } let vertex_files = vertex_files.unwrap(); if vertex_files.is_empty() { return Ok(()); } for vertex_file in vertex_files.iter() { process_csv_rows( vertex_file, |record| { let vertex_meta = parser.parse_vertex_meta(&record); if let Ok(properties) = parse_properties_by_mappings(&record, &header, mappings) { graph.insert_vertex(vertex_meta.label, vertex_meta.global_id, Some(properties)); } }, self.skip_header, self.delim, ); } Ok(()) } fn apply_vertices_inserts<G, I>( &mut self, graph: &mut GraphDB<G, I>, input_schema: &InputSchema, ) -> GDBResult<()> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let v_label_num = graph.vertex_label_num; for v_label_i in 0..v_label_num { if let Some(vertex_file_strings) = input_schema.get_vertex_file(v_label_i as LabelId) { if vertex_file_strings.is_empty() { continue; } let input_header = input_schema .get_vertex_header(v_label_i as LabelId) .unwrap(); let graph_header = graph .graph_schema .get_vertex_header(v_label_i as LabelId) .unwrap(); let mut keep_set = HashSet::new(); for pair in graph_header { keep_set.insert(pair.0.clone()); } let mut selected = vec![false; input_header.len()]; let mut id_col_id = 0; for (index, (n, _)) in input_header.iter().enumerate() { if keep_set.contains(n) { selected[index] = true; } if n == "id" { id_col_id = index; } } let parser = LDBCVertexParser::<G>::new(v_label_i as LabelId, id_col_id); let vertex_files_prefix = self.input_dir.clone(); let vertex_files = get_files_list(&vertex_files_prefix, &vertex_file_strings); if vertex_files.is_err() { warn!( "Get vertex files {:?}/{:?} failed: {:?}", &vertex_files_prefix, &vertex_file_strings, vertex_files.err().unwrap() ); continue; } let vertex_files = vertex_files.unwrap(); if vertex_files.is_empty() { continue; } for vertex_file in vertex_files.iter() { process_csv_rows( vertex_file, |record| { let vertex_meta = parser.parse_vertex_meta(&record); if let Ok(properties) = parse_properties(&record, input_header, selected.as_slice()) { graph.insert_vertex( vertex_meta.label, vertex_meta.global_id, Some(properties), ); } }, self.skip_header, self.delim, ); } } } Ok(()) } fn load_insert_edges<G>( &self, src_label: LabelId, edge_label: LabelId, dst_label: LabelId, input_header: &[(String, DataType)], graph_schema: &CsrGraphSchema, files: &Vec<PathBuf>, ) -> GDBResult<(Vec<(G, G)>, Option<ColTable>)> where G: FromStr + Send + Sync + IndexType + Eq, { let mut edges = vec![]; let graph_header = graph_schema .get_edge_header(src_label, edge_label, dst_label) .unwrap(); let mut table_header = vec![]; let mut keep_set = HashSet::new(); for pair in graph_header { table_header.push((pair.1.clone(), pair.0.clone())); keep_set.insert(pair.0.clone()); } let mut selected = vec![false; input_header.len()]; let mut src_col_id = 0; let mut dst_col_id = 1; for (index, (n, _)) in input_header.iter().enumerate() { if keep_set.contains(n) { selected[index] = true; } if n == "start_id" { src_col_id = index; } if n == "end_id" { dst_col_id = index; } } let mut parser = LDBCEdgeParser::<G>::new(src_label, dst_label, edge_label); parser.with_endpoint_col_id(src_col_id, dst_col_id); if table_header.is_empty() { for file in files.iter() { process_csv_rows( file, |record| { let edge_meta = parser.parse_edge_meta(&record); edges.push((edge_meta.src_global_id, edge_meta.dst_global_id)); }, self.skip_header, self.delim, ); } Ok((edges, None)) } else { let mut prop_table = ColTable::new(table_header); for file in files.iter() { process_csv_rows( file, |record| { let edge_meta = parser.parse_edge_meta(&record); let properties = parse_properties(&record, input_header, selected.as_slice()).unwrap(); edges.push((edge_meta.src_global_id, edge_meta.dst_global_id)); prop_table.push(&properties); }, self.skip_header, self.delim, ) } Ok((edges, Some(prop_table))) } } fn parallel_insert_rep<G, I>( &self, input: &mut CsrRep<I>, graph: &GraphDB<G, I>, edge_file_strings: &Vec<String>, input_header: &[(String, DataType)], p: u32, ) where G: FromStr + Send + Sync + IndexType + Eq, I: Send + Sync + IndexType, { let t = Instant::now(); let src_label = input.src_label; let edge_label = input.edge_label; let dst_label = input.dst_label; let graph_header = graph .graph_schema .get_edge_header(src_label, edge_label, dst_label); if graph_header.is_none() { return; } if edge_file_strings.is_empty() { return; } let edge_files = get_files_list(&self.input_dir.clone(), edge_file_strings); if edge_files.is_err() { return; } let edge_files = edge_files.unwrap(); if edge_files.is_empty() { return; } let (edges, table) = self .load_insert_edges::<G>( src_label, edge_label, dst_label, input_header, &graph.graph_schema, &edge_files, ) .unwrap(); let parsed_edges: Vec<(I, I)> = edges .par_iter() .map(|(src, dst)| { let (got_src_label, src_lid) = graph.vertex_map.get_internal_id(*src).unwrap(); let (got_dst_label, dst_lid) = graph.vertex_map.get_internal_id(*dst).unwrap(); if got_src_label != src_label || got_dst_label != dst_label { warn!("insert edges with wrong label"); (<I as IndexType>::max(), <I as IndexType>::max()) } else { (src_lid, dst_lid) } }) .collect(); let new_src_num = graph.vertex_map.vertex_num(src_label); input.oe_prop = if let Some(old_table) = input.oe_prop.take() { Some(input.oe_csr.insert_edges_with_prop( new_src_num, &parsed_edges, table.as_ref().unwrap(), false, p, old_table, )) } else { input .oe_csr .insert_edges(new_src_num, &parsed_edges, false, p); None }; let new_dst_num = graph.vertex_map.vertex_num(dst_label); input.ie_prop = if let Some(old_table) = input.ie_prop.take() { Some(input.ie_csr.insert_edges_with_prop( new_dst_num, &parsed_edges, table.as_ref().unwrap(), true, p, old_table, )) } else { input .ie_csr .insert_edges(new_dst_num, &parsed_edges, true, p); None }; println!( "insert edge (parallel{}): {} - {} - {}: {}", p, graph.graph_schema.vertex_label_names()[src_label as usize], graph.graph_schema.edge_label_names()[edge_label as usize], graph.graph_schema.vertex_label_names()[dst_label as usize], t.elapsed().as_secs_f32(), ); } pub fn apply_edges_insert_with_filename<G, I>( &mut self, graph: &mut GraphDB<G, I>, src_label: LabelId, edge_label: LabelId, dst_label: LabelId, filenames: &Vec<String>, src_id_col: i32, dst_id_col: i32, mappings: &Vec<i32>, ) -> GDBResult<()> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let mut parser = LDBCEdgeParser::<G>::new(src_label, dst_label, edge_label); parser.with_endpoint_col_id(src_id_col as usize, dst_id_col as usize); let edge_files_prefix = self.input_dir.clone(); let edge_files = get_files_list(&edge_files_prefix, filenames); if edge_files.is_err() { warn!( "Get vertex files {:?}/{:?} failed: {:?}", &edge_files_prefix, filenames, edge_files.err().unwrap() ); return Ok(()); } let edge_files = edge_files.unwrap(); let mut input_reps = self.take_csr(graph, src_label, dst_label, edge_label); let mut edges = vec![]; let graph_header = graph .graph_schema .get_edge_header(src_label, edge_label, dst_label) .unwrap(); let mut table_header = vec![]; for pair in graph_header { table_header.push((pair.1.clone(), pair.0.clone())); } let mut prop_table = ColTable::new(table_header.clone()); if table_header.is_empty() { for file in edge_files { process_csv_rows( &file, |record| { let edge_meta = parser.parse_edge_meta(&record); edges.push((edge_meta.src_global_id, edge_meta.dst_global_id)); }, self.skip_header, self.delim, ); } } else { for file in edge_files { process_csv_rows( &file, |record| { let edge_meta = parser.parse_edge_meta(&record); edges.push((edge_meta.src_global_id, edge_meta.dst_global_id)); if let Ok(properties) = parse_properties_by_mappings(&record, &graph_header, mappings) { prop_table.push(&properties); } }, self.skip_header, self.delim, ) } } let parsed_edges: Vec<(I, I)> = edges .par_iter() .map(|(src, dst)| { let (got_src_label, src_lid) = graph.vertex_map.get_internal_id(*src).unwrap(); let (got_dst_label, dst_lid) = graph.vertex_map.get_internal_id(*dst).unwrap(); if got_src_label != src_label || got_dst_label != dst_label { warn!("insert edges with wrong label"); (<I as IndexType>::max(), <I as IndexType>::max()) } else { (src_lid, dst_lid) } }) .collect(); let new_src_num = graph.vertex_map.vertex_num(src_label); input_reps.oe_prop = if let Some(old_table) = input_reps.oe_prop.take() { Some(input_reps.oe_csr.insert_edges_with_prop( new_src_num, &parsed_edges, &prop_table, false, self.parallel, old_table, )) } else { input_reps .oe_csr .insert_edges(new_src_num, &parsed_edges, false, self.parallel); None }; let new_dst_num = graph.vertex_map.vertex_num(dst_label); input_reps.ie_prop = if let Some(old_table) = input_reps.ie_prop.take() { Some(input_reps.ie_csr.insert_edges_with_prop( new_dst_num, &parsed_edges, &prop_table, true, self.parallel, old_table, )) } else { input_reps .ie_csr .insert_edges(new_dst_num, &parsed_edges, true, self.parallel); None }; self.set_csr(graph, input_reps); Ok(()) } fn apply_edges_inserts<G, I>( &mut self, graph: &mut GraphDB<G, I>, input_schema: &InputSchema, ) -> GDBResult<()> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { let mut input_reps = self.take_csrs(graph); for ir in input_reps.iter_mut() { let edge_files = input_schema.get_edge_file(ir.src_label, ir.edge_label, ir.dst_label); if edge_files.is_none() { continue; } let input_header = input_schema .get_edge_header(ir.src_label, ir.edge_label, ir.dst_label) .unwrap(); self.parallel_insert_rep(ir, graph, edge_files.unwrap(), input_header, self.parallel); } self.set_csrs(graph, input_reps); Ok(()) } pub fn insert<G, I>(&mut self, graph: &mut GraphDB<G, I>, insert_schema: &InputSchema) -> GDBResult<()> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { self.apply_vertices_inserts(graph, &insert_schema)?; self.apply_edges_inserts(graph, &insert_schema)?; Ok(()) } pub fn delete<G, I>(&mut self, graph: &mut GraphDB<G, I>, delete_schema: &InputSchema) -> GDBResult<()> where I: Send + Sync + IndexType, G: FromStr + Send + Sync + IndexType + Eq, { self.apply_deletes(graph, &delete_schema)?; Ok(()) } }