interactive_engine/executor/store/bmcsr/src/columns.rs (1,538 lines of code) (raw):
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Write};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use dyn_type::object::RawType;
use dyn_type::CastError;
#[cfg(feature = "hugepage_table")]
use huge_container::HugeVec;
use pegasus_common::codec::{Decode, Encode};
use pegasus_common::io::{ReadExt, WriteExt};
use serde::{Deserialize, Serialize};
use crate::date::Date;
use crate::date_time::DateTime;
use crate::types::DefaultId;
#[cfg(feature = "hugepage_table")]
pub type ColumnContainer<T> = HugeVec<T>;
#[cfg(not(feature = "hugepage_table"))]
pub type ColumnContainer<T> = Vec<T>;
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub enum DataType {
Int32 = 1,
UInt32 = 2,
Int64 = 3,
UInt64 = 4,
Double = 5,
String = 6,
Date = 7,
DateTime = 8,
LCString = 9,
ID = 10,
NULL = 0,
}
impl Encode for DataType {
fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> {
match *self {
DataType::NULL => writer.write_u8(0),
DataType::Int32 => writer.write_u8(1),
DataType::UInt32 => writer.write_u8(2),
DataType::Int64 => writer.write_u8(3),
DataType::UInt64 => writer.write_u8(4),
DataType::Double => writer.write_u8(5),
DataType::String => writer.write_u8(6),
DataType::Date => writer.write_u8(7),
DataType::DateTime => writer.write_u8(8),
DataType::LCString => writer.write_u8(9),
DataType::ID => writer.write_u8(10),
};
Ok(())
}
}
impl Decode for DataType {
fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> {
let data_type = match reader.read_u8()? {
0 => DataType::NULL,
1 => DataType::Int32,
2 => DataType::UInt32,
3 => DataType::Int64,
4 => DataType::UInt64,
5 => DataType::Double,
6 => DataType::String,
7 => DataType::Date,
8 => DataType::DateTime,
9 => DataType::LCString,
10 => DataType::ID,
_ => panic!("Unknown data type"),
};
Ok(data_type)
}
}
impl DataType {
pub fn from_i32(n: i32) -> Option<Self> {
match n {
0 => Some(Self::NULL),
1 => Some(Self::Int32),
2 => Some(Self::UInt32),
3 => Some(Self::Int64),
4 => Some(Self::UInt64),
5 => Some(Self::Double),
6 => Some(Self::String),
7 => Some(Self::Date),
8 => Some(Self::DateTime),
9 => Some(Self::LCString),
10 => Some(Self::ID),
_ => None,
}
}
pub fn to_i32(&self) -> i32 {
match self {
Self::NULL => 0,
Self::Int32 => 1,
Self::UInt32 => 2,
Self::Int64 => 3,
Self::UInt64 => 4,
Self::Double => 5,
Self::String => 6,
Self::Date => 7,
Self::DateTime => 8,
Self::LCString => 9,
Self::ID => 10,
}
}
}
impl<'a> From<&'a str> for DataType {
fn from(_token: &'a str) -> Self {
info!("token = {}", _token);
let token_str = _token.to_uppercase();
let token = token_str.as_str();
if token == "INT32" {
DataType::Int32
} else if token == "UINT32" {
DataType::UInt32
} else if token == "INT64" {
DataType::Int64
} else if token == "UINT64" {
DataType::UInt64
} else if token == "DOUBLE" {
DataType::Double
} else if token == "STRING" {
DataType::String
} else if token == "DATE" {
DataType::Date
} else if token == "DATETIME" {
DataType::DateTime
} else if token == "ID" {
DataType::ID
} else if token == "LCString" {
DataType::LCString
} else {
error!("Unsupported type {:?}", token);
DataType::NULL
}
}
}
#[derive(Clone)]
pub enum Item {
Boolean(bool),
Int32(i32),
UInt32(u32),
Int64(i64),
UInt64(u64),
Float(f32),
Double(f64),
String(String),
Date(Date),
DateTime(DateTime),
VertexId(usize),
EdgeId((u64, u64)),
Null,
}
#[derive(Clone)]
pub enum RefItem<'a> {
Boolean(&'a bool),
Int32(&'a i32),
UInt32(&'a u32),
Int64(&'a i64),
UInt64(&'a u64),
Float(&'a f32),
Double(&'a f64),
Date(&'a Date),
DateTime(&'a DateTime),
VertexId(&'a usize),
EdgeId((&'a u64, &'a u64)),
String(&'a String),
Null,
}
impl<'a> RefItem<'a> {
pub fn to_owned(self) -> Item {
match self {
RefItem::Boolean(v) => Item::Boolean(*v),
RefItem::Int32(v) => Item::Int32(*v),
RefItem::UInt32(v) => Item::UInt32(*v),
RefItem::Int64(v) => Item::Int64(*v),
RefItem::UInt64(v) => Item::UInt64(*v),
RefItem::Float(v) => Item::Float(*v),
RefItem::Double(v) => Item::Double(*v),
RefItem::Date(v) => Item::Date(*v),
RefItem::DateTime(v) => Item::DateTime(*v),
RefItem::VertexId(v) => Item::VertexId(*v),
RefItem::EdgeId((src, dst)) => Item::EdgeId((*src, *dst)),
RefItem::String(v) => Item::String(v.clone()),
RefItem::Null => Item::Null,
}
}
}
pub trait ConvertItem {
fn to_ref_item(&self) -> RefItem;
fn from_item(v: Item) -> Self;
}
impl ConvertItem for i32 {
fn to_ref_item(&self) -> RefItem {
RefItem::Int32(self)
}
fn from_item(v: Item) -> Self {
match v {
Item::Int32(v) => v,
_ => 0,
}
}
}
impl ConvertItem for DateTime {
fn to_ref_item(&self) -> RefItem {
RefItem::DateTime(self)
}
fn from_item(v: Item) -> Self {
match v {
Item::DateTime(v) => v,
_ => DateTime::empty(),
}
}
}
impl ConvertItem for () {
fn to_ref_item(&self) -> RefItem {
RefItem::Null
}
fn from_item(_v: Item) -> Self {
()
}
}
impl Debug for Item {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Item::Int32(v) => {
write!(f, "int32[{}]", v)
}
Item::UInt32(v) => {
write!(f, "uint32[{}]", v)
}
Item::Int64(v) => {
write!(f, "int64[{}]", v)
}
Item::UInt64(v) => {
write!(f, "uint64[{}]", v)
}
Item::Double(v) => {
write!(f, "double[{}]", v)
}
Item::Date(v) => {
write!(f, "date[{}]", v.to_string())
}
Item::DateTime(v) => {
write!(f, "datetime[{}]", v.to_string())
}
Item::VertexId(v) => {
write!(f, "id[{}]", v)
}
Item::String(v) => {
write!(f, "string[{}]", v)
}
_ => {
write!(f, "")
}
}
}
}
impl ToString for Item {
fn to_string(&self) -> String {
match self {
Item::Int32(v) => v.to_string(),
Item::UInt32(v) => v.to_string(),
Item::Int64(v) => v.to_string(),
Item::UInt64(v) => v.to_string(),
Item::Double(v) => v.to_string(),
Item::Date(v) => v.to_string(),
Item::DateTime(v) => v.to_string(),
Item::VertexId(v) => v.to_string(),
Item::String(v) => v.to_string(),
_ => "".to_string(),
}
}
}
impl<'a> Debug for RefItem<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
RefItem::Int32(v) => {
write!(f, "int32[{}]", v)
}
RefItem::UInt32(v) => {
write!(f, "uint32[{}]", v)
}
RefItem::Int64(v) => {
write!(f, "int64[{}]", v)
}
RefItem::UInt64(v) => {
write!(f, "uint64[{}]", v)
}
RefItem::Double(v) => {
write!(f, "double[{}]", v)
}
RefItem::Date(v) => {
write!(f, "date[{}]", v.to_string())
}
RefItem::DateTime(v) => {
write!(f, "datetime[{}]", v.to_string())
}
RefItem::VertexId(v) => {
write!(f, "id[{}]", v)
}
RefItem::String(v) => {
write!(f, "string[{}]", v)
}
_ => {
write!(f, "")
}
}
}
}
impl<'a> ToString for RefItem<'a> {
fn to_string(&self) -> String {
match self {
RefItem::Int32(v) => v.to_string(),
RefItem::UInt32(v) => v.to_string(),
RefItem::Int64(v) => v.to_string(),
RefItem::UInt64(v) => v.to_string(),
RefItem::Double(v) => v.to_string(),
RefItem::Date(v) => v.to_string(),
RefItem::DateTime(v) => v.to_string(),
RefItem::VertexId(v) => v.to_string(),
RefItem::String(v) => v.to_string(),
_ => "".to_string(),
}
}
}
impl<'a> RefItem<'a> {
#[inline]
pub fn as_u64(&self) -> Result<u64, CastError> {
match self {
RefItem::Int32(v) => Ok(**v as u64),
RefItem::UInt32(v) => Ok(**v as u64),
RefItem::Int64(v) => Ok(**v as u64),
RefItem::UInt64(v) => Ok(**v),
RefItem::Double(v) => Ok(**v as u64),
RefItem::Date(_) => Ok(0_u64),
RefItem::DateTime(v) => Ok(v.to_i64() as u64),
RefItem::VertexId(v) => Ok(**v as u64),
RefItem::String(_) => Err(CastError::new::<u64>(RawType::String)),
_ => Ok(0_u64),
}
}
#[inline]
pub fn as_i32(&self) -> Result<i32, CastError> {
match self {
RefItem::Int32(v) => Ok(**v),
RefItem::UInt32(v) => Ok(**v as i32),
RefItem::Int64(v) => Ok(**v as i32),
RefItem::UInt64(v) => Ok(**v as i32),
RefItem::Double(v) => Ok(**v as i32),
RefItem::Date(_) => Ok(0),
RefItem::DateTime(_) => Ok(0),
RefItem::VertexId(v) => Ok(**v as i32),
RefItem::String(_) => Err(CastError::new::<i32>(RawType::String)),
_ => Ok(0),
}
}
#[inline]
pub fn as_str(&self) -> Result<Cow<'_, str>, CastError> {
match self {
RefItem::String(str) => Ok(Cow::Borrowed(*str)),
_ => Err(CastError::new::<String>(RawType::Unknown)),
}
}
#[inline]
pub fn as_datetime(&self) -> Result<DateTime, CastError> {
match self {
RefItem::Int32(_) => Err(CastError::new::<u64>(RawType::Integer)),
RefItem::UInt32(_) => Err(CastError::new::<u64>(RawType::Integer)),
RefItem::Int64(_) => Err(CastError::new::<u64>(RawType::Long)),
RefItem::UInt64(_) => Err(CastError::new::<u64>(RawType::Long)),
RefItem::Double(_) => Err(CastError::new::<u64>(RawType::Float)),
RefItem::Date(_) => Err(CastError::new::<u64>(RawType::Unknown)),
RefItem::DateTime(v) => Ok(**v),
RefItem::VertexId(_) => Err(CastError::new::<u64>(RawType::Long)),
RefItem::String(_) => Err(CastError::new::<u64>(RawType::String)),
_ => Err(CastError::new::<u64>(RawType::Unknown)),
}
}
}
pub trait Column: Debug {
fn get_type(&self) -> DataType;
fn get(&self, index: usize) -> Option<RefItem>;
fn set(&mut self, index: usize, val: Item);
fn push(&mut self, val: Item);
fn len(&self) -> usize;
fn as_any(&self) -> &dyn Any;
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>);
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize);
fn move_elem(&mut self, from: usize, to: usize);
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize);
fn resize(&mut self, size: usize);
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()>;
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()>;
}
pub struct Int32Column {
pub data: ColumnContainer<i32>,
}
unsafe impl Send for Int32Column {}
unsafe impl Sync for Int32Column {}
impl Int32Column {
pub fn new() -> Self {
Self { data: ColumnContainer::new() }
}
pub fn is_same(&self, other: &Self) -> bool {
if self.data.len() != other.data.len() {
return false;
}
let num = self.data.len();
for k in 0..num {
if self.data[k] != other.data[k] {
return false;
}
}
return true;
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<i32>) -> Int32Column {
Int32Column { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<i32>) -> Int32Column {
Int32Column { data }
}
pub fn clone_from(other: &Int32Column) -> Int32Column {
Int32Column { data: other.data.clone() }
}
}
impl Debug for Int32Column {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Int32Column: {:?}", self.data)
}
}
impl Column for Int32Column {
fn get_type(&self) -> DataType {
DataType::Int32
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data.get(index).map(|x| RefItem::Int32(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::Int32(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = 0;
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::Int32(v) => {
self.data.push(v);
}
_ => {
self.data.push(0);
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<i32>::with_capacity(row_num);
for _ in 0..row_num {
data.push(reader.read_i32::<LittleEndian>()?);
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_i32::<LittleEndian>(*v)?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, 0);
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index];
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index];
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index..self_index + num]
.copy_from_slice(&casted_col.data[col_index..col_index + num]);
}
}
pub struct UInt32Column {
pub data: ColumnContainer<u32>,
}
unsafe impl Send for UInt32Column {}
unsafe impl Sync for UInt32Column {}
impl UInt32Column {
pub fn new() -> Self {
Self { data: ColumnContainer::new() }
}
pub fn is_same(&self, other: &Self) -> bool {
if self.data.len() != other.data.len() {
return false;
}
let num = self.data.len();
for k in 0..num {
if self.data[k] != other.data[k] {
return false;
}
}
return true;
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<u32>) -> UInt32Column {
UInt32Column { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<u32>) -> UInt32Column {
UInt32Column { data }
}
pub fn clone_from(other: &UInt32Column) -> UInt32Column {
UInt32Column { data: other.data.clone() }
}
}
impl Debug for UInt32Column {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "UInt32Column: {:?}", self.data)
}
}
impl Column for UInt32Column {
fn get_type(&self) -> DataType {
DataType::UInt32
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data.get(index).map(|x| RefItem::UInt32(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::UInt32(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = 0;
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::UInt32(v) => {
self.data.push(v);
}
_ => {
self.data.push(0);
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<u32>::with_capacity(row_num);
for _ in 0..row_num {
data.push(reader.read_u32::<LittleEndian>()?);
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_u32::<LittleEndian>(*v)?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, 0);
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index];
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index];
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index..self_index + num]
.copy_from_slice(&casted_col.data[col_index..col_index + num]);
}
}
pub struct Int64Column {
pub data: ColumnContainer<i64>,
}
unsafe impl Send for Int64Column {}
unsafe impl Sync for Int64Column {}
impl Int64Column {
pub fn new() -> Self {
Self { data: ColumnContainer::new() }
}
pub fn is_same(&self, other: &Self) -> bool {
if self.data.len() != other.data.len() {
return false;
}
let num = self.data.len();
for k in 0..num {
if self.data[k] != other.data[k] {
return false;
}
}
return true;
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<i64>) -> Int64Column {
Int64Column { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<i64>) -> Int64Column {
Int64Column { data }
}
pub fn clone_from(other: &Int64Column) -> Int64Column {
Int64Column { data: other.data.clone() }
}
}
impl Debug for Int64Column {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Int64Column: {:?}", self.data)
}
}
impl Column for Int64Column {
fn get_type(&self) -> DataType {
DataType::Int32
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data.get(index).map(|x| RefItem::Int64(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::Int64(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = 0;
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::Int64(v) => {
self.data.push(v);
}
_ => {
self.data.push(0);
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<i64>::with_capacity(row_num);
for _ in 0..row_num {
data.push(reader.read_i64::<LittleEndian>()?);
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_i64::<LittleEndian>(*v)?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, 0);
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index];
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index];
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index..self_index + num]
.copy_from_slice(&casted_col.data[col_index..col_index + num]);
}
}
pub struct UInt64Column {
pub data: ColumnContainer<u64>,
}
unsafe impl Send for UInt64Column {}
unsafe impl Sync for UInt64Column {}
impl UInt64Column {
pub fn new() -> Self {
Self { data: ColumnContainer::new() }
}
pub fn is_same(&self, other: &Self) -> bool {
if self.data.len() != other.data.len() {
return false;
}
let num = self.data.len();
for k in 0..num {
if self.data[k] != other.data[k] {
return false;
}
}
return true;
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<u64>) -> UInt64Column {
UInt64Column { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<u64>) -> UInt64Column {
UInt64Column { data }
}
pub fn clone_from(other: &UInt64Column) -> UInt64Column {
UInt64Column { data: other.data.clone() }
}
}
impl Debug for UInt64Column {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "UInt64Column: {:?}", self.data)
}
}
impl Column for UInt64Column {
fn get_type(&self) -> DataType {
DataType::UInt64
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data.get(index).map(|x| RefItem::UInt64(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::UInt64(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = 0;
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::UInt64(v) => {
self.data.push(v);
}
_ => {
self.data.push(0);
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<u64>::with_capacity(row_num);
for _ in 0..row_num {
data.push(reader.read_u64::<LittleEndian>()?);
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_u64::<LittleEndian>(*v)?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, 0);
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index];
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index];
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index..self_index + num]
.copy_from_slice(&casted_col.data[col_index..col_index + num]);
}
}
pub struct IDColumn {
pub data: ColumnContainer<DefaultId>,
}
unsafe impl Send for IDColumn {}
unsafe impl Sync for IDColumn {}
impl IDColumn {
pub fn new() -> Self {
Self { data: ColumnContainer::new() }
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<usize>) -> IDColumn {
IDColumn { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<usize>) -> IDColumn {
IDColumn { data }
}
pub fn clone_from(other: &IDColumn) -> IDColumn {
IDColumn { data: other.data.clone() }
}
}
impl Debug for IDColumn {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "IDColumn: {:?}", self.data)
}
}
impl Column for IDColumn {
fn get_type(&self) -> DataType {
DataType::ID
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data
.get(index)
.map(|x| RefItem::VertexId(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::VertexId(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = 0;
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::VertexId(v) => {
self.data.push(v);
}
_ => {
self.data.push(0);
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<DefaultId>::with_capacity(row_num);
for _ in 0..row_num {
data.push(reader.read_u64::<LittleEndian>()? as DefaultId);
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_u64::<LittleEndian>(*v as u64)?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, 0);
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index];
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index];
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index..self_index + num]
.copy_from_slice(&casted_col.data[col_index..col_index + num]);
}
}
pub struct DoubleColumn {
pub data: ColumnContainer<f64>,
}
unsafe impl Send for DoubleColumn {}
unsafe impl Sync for DoubleColumn {}
impl DoubleColumn {
pub fn new() -> Self {
Self { data: ColumnContainer::new() }
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<f64>) -> DoubleColumn {
DoubleColumn { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<f64>) -> DoubleColumn {
DoubleColumn { data }
}
pub fn clone_from(other: &DoubleColumn) -> DoubleColumn {
DoubleColumn { data: other.data.clone() }
}
}
impl Debug for DoubleColumn {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DoubleColumn: {:?}", self.data)
}
}
impl Column for DoubleColumn {
fn get_type(&self) -> DataType {
DataType::Double
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data.get(index).map(|x| RefItem::Double(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::Double(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = 0_f64;
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::Double(v) => {
self.data.push(v);
}
_ => {
self.data.push(0_f64);
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<f64>::with_capacity(row_num);
for _ in 0..row_num {
data.push(reader.read_f64::<LittleEndian>()?);
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_f64::<LittleEndian>(*v)?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, 0.0);
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index];
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index];
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index..self_index + num]
.copy_from_slice(&casted_col.data[col_index..col_index + num]);
}
}
pub struct StringColumn {
pub data: Vec<String>,
}
unsafe impl Send for StringColumn {}
unsafe impl Sync for StringColumn {}
impl StringColumn {
pub fn new() -> Self {
Self { data: Vec::new() }
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<String>) -> StringColumn {
StringColumn { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<String>) -> StringColumn {
StringColumn { data }
}
pub fn clone_from(other: &StringColumn) -> StringColumn {
StringColumn { data: other.data.clone() }
}
}
impl Debug for StringColumn {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "StringColumn: {:?}", self.data)
}
}
impl Column for StringColumn {
fn get_type(&self) -> DataType {
DataType::String
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data.get(index).map(|x| RefItem::String(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::String(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = String::from("");
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::String(v) => {
self.data.push(v);
}
_ => {
self.data.push(String::from(""));
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = Vec::<String>::with_capacity(row_num);
for _ in 0..row_num {
let length = reader.read_i32::<LittleEndian>()?;
let mut string_bytes = vec![0u8; length as usize];
reader.read_exact(&mut string_bytes)?;
data.push(String::from_utf8(string_bytes).unwrap());
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_i32::<LittleEndian>(v.len() as i32)?;
writer.write_all(v.as_bytes())?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, String::new());
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index].clone();
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index].clone();
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from].clone();
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for i in 0..num {
self.data[self_index + i] = casted_col.data[col_index + i].clone();
}
}
}
pub struct LCStringColumn {
pub data: ColumnContainer<u16>,
pub table: HashMap<String, u16>,
pub list: Vec<String>,
}
unsafe impl Send for LCStringColumn {}
unsafe impl Sync for LCStringColumn {}
impl LCStringColumn {
pub fn new() -> Self {
Self { data: ColumnContainer::new(), table: HashMap::new(), list: Vec::new() }
}
pub fn is_same(&self, other: &Self) -> bool {
if self.data.len() != other.data.len() {
return false;
}
let num = self.data.len();
if self.list != other.list {
return false;
}
for k in 0..num {
if self.data[k] != other.data[k] {
return false;
}
}
return true;
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<u16>, table: HashMap<String, u16>, list: Vec<String>) -> LCStringColumn {
LCStringColumn { data, table, list }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<u16>, table: HashMap<String, u16>, list: Vec<String>) -> LCStringColumn {
LCStringColumn { data, table, list }
}
pub fn clone_from(other: &LCStringColumn) -> LCStringColumn {
LCStringColumn { data: other.data.clone(), table: other.table.clone(), list: other.list.clone() }
}
}
impl Debug for LCStringColumn {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "LCStringColumn: {:?},{:?},{:?}", self.data, self.table, self.list)
}
}
impl Column for LCStringColumn {
fn get_type(&self) -> DataType {
DataType::LCString
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data
.get(index)
.map(|x| RefItem::String(&self.list[*x as usize]))
}
fn set(&mut self, index: usize, val: Item) {
let value = match val {
Item::String(v) => v,
_ => "".to_string(),
};
if let Some(v) = self.table.get(&value) {
self.data[index] = *v;
} else {
assert!(self.list.len() < 65535);
let cur = self.list.len() as u16;
self.list.push(value.clone());
self.table.insert(value, cur);
self.data[index] = cur;
}
}
fn push(&mut self, val: Item) {
let value = match val {
Item::String(v) => v,
_ => "".to_string(),
};
if let Some(v) = self.table.get(&value) {
self.data.push(*v);
} else {
assert!(self.list.len() < 65535);
let cur = self.list.len() as u16;
self.list.push(value.clone());
self.table.insert(value, cur);
self.data.push(cur);
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<u16>::with_capacity(row_num);
for _ in 0..row_num {
data.push(reader.read_u16::<LittleEndian>()?);
}
let list_size = reader.read_u16::<LittleEndian>()? as usize;
let mut list = Vec::<String>::with_capacity(list_size);
let mut table = HashMap::new();
for i in 0..list_size {
let length = reader.read_i32::<LittleEndian>()?;
let mut string_bytes = vec![0u8; length as usize];
reader.read_exact(&mut string_bytes)?;
let parsed_string = String::from_utf8(string_bytes).unwrap();
list.push(parsed_string.clone());
table.insert(parsed_string, i as u16);
}
self.data = data;
self.table = table;
self.list = list;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_u16::<LittleEndian>(*v)?;
}
writer.write_u16::<LittleEndian>(self.list.len() as u16)?;
for s in self.list.iter() {
let length = s.len() as i32;
writer.write_i32::<LittleEndian>(length)?;
writer.write_all(s.as_bytes())?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, 0);
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
let val = casted_col.list[casted_col.data[index] as usize].clone();
if let Some(idx) = self.table.get(&val) {
self.data[*i] = *idx;
} else {
let idx = self.table.len() as u16;
self.list.push(val.clone());
self.table.insert(val, idx);
self.data[*i] = idx;
}
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
let val = casted_col.list[casted_col.data[col_index] as usize].clone();
if let Some(idx) = self.table.get(&val) {
self.data[self_index] = *idx;
} else {
let idx = self.table.len() as u16;
self.list.push(val.clone());
self.table.insert(val, idx);
self.data[self_index] = idx;
}
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for i in 0..num {
let val = casted_col.list[casted_col.data[col_index + i] as usize].clone();
if let Some(idx) = self.table.get(&val) {
self.data[self_index + i] = *idx;
} else {
let idx = self.table.len() as u16;
self.list.push(val.clone());
self.table.insert(val, idx);
self.data[self_index + i] = idx;
}
}
}
}
pub struct DateColumn {
pub data: ColumnContainer<Date>,
}
unsafe impl Send for DateColumn {}
unsafe impl Sync for DateColumn {}
impl DateColumn {
pub fn new() -> Self {
Self { data: ColumnContainer::new() }
}
pub fn is_same(&self, other: &Self) -> bool {
if self.data.len() != other.data.len() {
return false;
}
let num = self.data.len();
for k in 0..num {
if self.data[k].to_i32() != other.data[k].to_i32() {
return false;
}
}
return true;
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<Date>) -> DateColumn {
DateColumn { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<Date>) -> DateColumn {
DateColumn { data }
}
pub fn clone_from(other: &DateColumn) -> DateColumn {
DateColumn { data: other.data.clone() }
}
}
impl Debug for DateColumn {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DateColumn: {:?}", self.data)
}
}
impl Column for DateColumn {
fn get_type(&self) -> DataType {
DataType::Date
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data.get(index).map(|x| RefItem::Date(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::Date(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = Date::empty();
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::Date(v) => {
self.data.push(v);
}
_ => {
self.data.push(Date::empty());
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<Date>::with_capacity(row_num);
for _ in 0..row_num {
data.push(Date::from_i32(reader.read_i32::<LittleEndian>()?));
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_i32::<LittleEndian>(v.to_i32())?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, Date::empty());
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index];
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index];
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index..self_index + num]
.copy_from_slice(&casted_col.data[col_index..col_index + num]);
}
}
pub struct DateTimeColumn {
pub data: ColumnContainer<DateTime>,
}
unsafe impl Send for DateTimeColumn {}
unsafe impl Sync for DateTimeColumn {}
impl DateTimeColumn {
pub fn new() -> Self {
Self { data: ColumnContainer::new() }
}
pub fn is_same(&self, other: &Self) -> bool {
if self.data.len() != other.data.len() {
return false;
}
let num = self.data.len();
for k in 0..num {
if self.data[k].to_i64() != other.data[k].to_i64() {
return false;
}
}
return true;
}
#[cfg(feature = "hugepage_table")]
pub fn from(data: HugeVec<DateTime>) -> DateTimeColumn {
DateTimeColumn { data }
}
#[cfg(not(feature = "hugepage_table"))]
pub fn from(data: Vec<DateTime>) -> DateTimeColumn {
DateTimeColumn { data }
}
pub fn clone_from(other: &DateTimeColumn) -> DateTimeColumn {
DateTimeColumn { data: other.data.clone() }
}
}
impl Debug for DateTimeColumn {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DateTimeColumn: {:?}", self.data)
}
}
impl Column for DateTimeColumn {
fn get_type(&self) -> DataType {
DataType::DateTime
}
fn get(&self, index: usize) -> Option<RefItem> {
self.data
.get(index)
.map(|x| RefItem::DateTime(x))
}
fn set(&mut self, index: usize, val: Item) {
match val {
Item::DateTime(v) => {
self.data[index] = v;
}
_ => {
self.data[index] = DateTime::empty();
}
}
}
fn push(&mut self, val: Item) {
match val {
Item::DateTime(v) => {
self.data.push(v);
}
_ => {
self.data.push(DateTime::empty());
}
}
}
fn len(&self) -> usize {
self.data.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn deserialize(&mut self, reader: &mut BufReader<File>) -> std::io::Result<()> {
let row_num = reader.read_u64::<LittleEndian>()? as usize;
let mut data = ColumnContainer::<DateTime>::with_capacity(row_num);
for _ in 0..row_num {
data.push(DateTime::new(reader.read_i64::<LittleEndian>()?));
}
self.data = data;
Ok(())
}
fn serialize(&self, writer: &mut BufWriter<File>) -> std::io::Result<()> {
writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
for v in self.data.iter() {
writer.write_i64::<LittleEndian>(v.to_i64())?;
}
Ok(())
}
fn resize(&mut self, size: usize) {
self.data.resize(size, DateTime::empty());
}
fn set_column_batch(&mut self, index: &Vec<usize>, col: &Box<dyn Column>) {
if col.as_any().is::<Self>() {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
for (index, i) in index.iter().enumerate() {
self.data[*i] = casted_col.data[index];
}
}
}
fn set_column_elem(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index] = casted_col.data[col_index];
}
fn move_elem(&mut self, from: usize, to: usize) {
self.data[to] = self.data[from];
}
fn copy_range(&mut self, self_index: usize, col: &Box<dyn Column>, col_index: usize, num: usize) {
let casted_col = col.as_any().downcast_ref::<Self>().unwrap();
self.data[self_index..self_index + num]
.copy_from_slice(&casted_col.data[col_index..col_index + num]);
}
}