vermeer/apps/structure/property.go (848 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package structure import ( "encoding/binary" "fmt" "strconv" "strings" "vermeer/apps/common" "vermeer/apps/options" store "vermeer/apps/protos/hugegraph-store-grpc" "vermeer/apps/serialize" "github.com/sirupsen/logrus" ) const ( ValueTypeFloat32 ValueType = iota ValueTypeInt32 ValueTypeString ValueTypeUnknow ) type ValueType uint16 type PSchema struct { VType ValueType PropKey string } type HugegraphPSchema struct { //key is the hugegraph property_id, value is the vermeer property_id PropIndex map[int]int //key is the hugegraph label_id, value is hugegraph label_name Labels map[int]string } type PropertySchema struct { Schema []*PSchema HgPSchema *HugegraphPSchema } func (ps *PropertySchema) Init(schema map[string]string) { ps.Schema = make([]*PSchema, len(schema)) for k, v := range schema { vs := strings.Split(v, ",") if len(vs) < 2 { logrus.Errorf("Schema string Length Not enough,key:%v ", k) continue } vType, err := strconv.Atoi(vs[0]) if err != nil { return } index, err := strconv.Atoi(vs[1]) if err != nil { return } if index >= len(schema) { logrus.Errorf("Schema index out of range,index:%v", index) continue } ps.Schema[index] = &PSchema{} ps.Schema[index].PropKey = k ps.Schema[index].VType = ValueType(vType) } } func (ps *PropertySchema) initFromHugegraphVertex(hgSchema map[string]any, useProperty bool, vertexPropIDs []string) error { //从hugegraph中读取schema //edge和vertex的label和property不相同,需要分开读取。 ps.HgPSchema = &HugegraphPSchema{} vertexPropIDMap := make(map[int]struct{}) useVertexPropFilter := len(vertexPropIDs) > 0 if useVertexPropFilter { useVertexPropFilter = false for _, propID := range vertexPropIDs { if propID == "" { continue } iLabel, err := strconv.ParseInt(strings.TrimSpace(propID), 10, 64) if err != nil { logrus.Errorf("property schema label type not int :%v", propID) continue } else { useVertexPropFilter = true } vertexPropIDMap[int(iLabel)] = struct{}{} } } vertexLabels, ok := hgSchema["vertexlabels"].([]any) if !ok { return fmt.Errorf("get vertexlabels from hugegraph not correct %v", hgSchema["vertexlabels"]) } ps.HgPSchema.Labels = make(map[int]string) properties := make(map[string]struct{}) for _, vLabelAny := range vertexLabels { vLabel, ok := vLabelAny.(map[string]any) if !ok { return fmt.Errorf("get vertex label not correct %v", vLabelAny) } id, ok := vLabel["id"].(float64) if !ok { return fmt.Errorf("get vertex label id not correct %v", vLabel["id"]) } name, ok := vLabel["name"].(string) if !ok { return fmt.Errorf("get vertex label name not correct %v", vLabel["id"]) } ps.HgPSchema.Labels[int(id)] = name labelProperties, ok := vLabel["properties"].([]any) if !ok { return fmt.Errorf("get vertex label properties not correct %v", vLabel["properties"]) } for _, labelProperty := range labelProperties { propString, ok := labelProperty.(string) if !ok { return fmt.Errorf("get vertex label property not correct %v", labelProperty) } properties[propString] = struct{}{} } } if !useProperty { ps.Schema = make([]*PSchema, 1) ps.Schema[0] = &PSchema{ VType: ValueTypeString, PropKey: "label", } } else { propKeys, ok := hgSchema["propertykeys"].([]any) if !ok { return fmt.Errorf("get propertykeys from hugegraph not correct %v", hgSchema["propertykeys"]) } ps.Schema = make([]*PSchema, 1, len(properties)+1) ps.Schema[0] = &PSchema{ VType: ValueTypeString, PropKey: "label", } ps.HgPSchema.PropIndex = make(map[int]int, len(properties)) idx := 1 for _, propKeyAny := range propKeys { propKey, ok := propKeyAny.(map[string]any) if !ok { return fmt.Errorf("get property key not correct %v", propKeyAny) } propKeyName, ok := propKey["name"].(string) if !ok { return fmt.Errorf("get property name not correct %v", propKey["name"]) } if _, ok := properties[propKeyName]; !ok { continue } propID, ok := propKey["id"].(float64) if !ok { return fmt.Errorf("get property id not correct %v", propKey["id"]) } if _, ok := vertexPropIDMap[int(propID)]; useVertexPropFilter && !ok { continue } ps.Schema = append(ps.Schema, &PSchema{}) ps.Schema[idx].PropKey = propKeyName switch propKey["data_type"].(string) { case "INT", "LONG": ps.Schema[idx].VType = ValueTypeInt32 case "FLOAT", "DOUBLE": ps.Schema[idx].VType = ValueTypeFloat32 case "STRING", "BYTE", "DATE", "BOOL", "TEXT", "BOOLEAN": ps.Schema[idx].VType = ValueTypeString default: logrus.Errorf("hugegraph data_type:%v not match", propKey["data_type"].(string)) } ps.HgPSchema.PropIndex[int(propID)] = idx idx++ } } return nil } func (ps *PropertySchema) initFromHugegraphEdge(hgSchema map[string]any, useProperty bool, edgePropIDs []string) error { if !useProperty { return nil } ps.HgPSchema = &HugegraphPSchema{} edgePropIDMap := make(map[int]struct{}) useEdgePropFilter := len(edgePropIDs) > 0 if useEdgePropFilter { useEdgePropFilter = false for _, propID := range edgePropIDs { if propID == "" { continue } iLabel, err := strconv.ParseInt(strings.TrimSpace(propID), 10, 64) if err != nil { logrus.Errorf("property schema label type not int :%v", propID) continue } else { useEdgePropFilter = true } edgePropIDMap[int(iLabel)] = struct{}{} } } edgeLabels, ok := hgSchema["edgelabels"].([]any) if !ok { return fmt.Errorf("get edgelabels from hugegraph not correct %v", hgSchema["edgelabels"]) } ps.HgPSchema.Labels = make(map[int]string) properties := make(map[string]struct{}) for _, eLabelAny := range edgeLabels { eLabel, ok := eLabelAny.(map[string]any) if !ok { return fmt.Errorf("get edge label not correct %v", eLabelAny) } id, ok := eLabel["id"].(float64) if !ok { return fmt.Errorf("get edge label id not correct %v", eLabel["id"]) } name, ok := eLabel["name"].(string) if !ok { return fmt.Errorf("get edge label name not correct %v", eLabel["id"]) } edgeLabelType, ok := eLabel["edgelabel_type"].(string) if !ok { return fmt.Errorf("get edgelabel_type not correct %v", eLabel["edgelabel_type"]) } if edgeLabelType == "PARENT" { continue } ps.HgPSchema.Labels[int(id)] = name labelProperties, ok := eLabel["properties"].([]any) if !ok { return fmt.Errorf("get edge label properties not correct %v", eLabel["properties"]) } for _, labelProperty := range labelProperties { propString, ok := labelProperty.(string) if !ok { return fmt.Errorf("get edge label property not correct %v", labelProperty) } properties[propString] = struct{}{} } } propKeys, ok := hgSchema["propertykeys"].([]any) if !ok { return fmt.Errorf("get propertykeys from hugegraph not correct %v", hgSchema["propertykeys"]) } ps.Schema = make([]*PSchema, 1, len(properties)+1) ps.Schema[0] = &PSchema{ VType: ValueTypeString, PropKey: "label", } ps.HgPSchema.PropIndex = make(map[int]int, len(properties)) idx := 1 for _, propKeyAny := range propKeys { propKey, ok := propKeyAny.(map[string]any) if !ok { return fmt.Errorf("get property key not correct %v", propKeyAny) } propKeyName, ok := propKey["name"].(string) if !ok { return fmt.Errorf("get property name not correct %v", propKey["name"]) } if _, ok := properties[propKeyName]; !ok { continue } propID, ok := propKey["id"].(float64) if !ok { return fmt.Errorf("get property id not correct %v", propKey["id"]) } if _, ok := edgePropIDMap[int(propID)]; useEdgePropFilter && !ok { continue } ps.Schema = append(ps.Schema, &PSchema{}) ps.Schema[idx].PropKey = propKeyName switch propKey["data_type"].(string) { case "INT", "LONG": ps.Schema[idx].VType = ValueTypeInt32 case "FLOAT", "DOUBLE": ps.Schema[idx].VType = ValueTypeFloat32 case "STRING", "BYTE", "DATE", "BOOL", "TEXT", "BOOLEAN": ps.Schema[idx].VType = ValueTypeString default: logrus.Errorf("hugegraph data_type:%v not match", propKey["data_type"].(string)) } ps.HgPSchema.PropIndex[int(propID)] = idx idx++ } return nil } func (ps *PropertySchema) Marshal(buffer []byte) (int, error) { offset := 0 schemaCount := len(ps.Schema) binary.BigEndian.PutUint32(buffer[offset:], uint32(schemaCount)) offset += 4 for i := 0; i < schemaCount; i++ { binary.BigEndian.PutUint16(buffer[offset:], uint16(ps.Schema[i].VType)) offset += 2 propKey := serialize.SString(ps.Schema[i].PropKey) n, err := propKey.Marshal(buffer[offset:]) if err != nil { return 0, err } offset += n } if ps.HgPSchema == nil { binary.BigEndian.PutUint32(buffer[offset:], 0) offset += 4 return offset, nil } binary.BigEndian.PutUint32(buffer[offset:], 1) offset += 4 hgPSchemaPropIndexCount := len(ps.HgPSchema.PropIndex) binary.BigEndian.PutUint32(buffer[offset:], uint32(hgPSchemaPropIndexCount)) offset += 4 for k, v := range ps.HgPSchema.PropIndex { kk := serialize.SInt32(k) n, err := kk.Marshal(buffer[offset:]) if err != nil { return 0, err } offset += n vv := serialize.SInt32(v) n, err = vv.Marshal(buffer[offset:]) if err != nil { return 0, err } offset += n } hgPSchemaLabelsCount := len(ps.HgPSchema.Labels) binary.BigEndian.PutUint32(buffer[offset:], uint32(hgPSchemaLabelsCount)) offset += 4 for k, v := range ps.HgPSchema.Labels { kk := serialize.SInt32(k) n, err := kk.Marshal(buffer[offset:]) if err != nil { return 0, err } offset += n vv := serialize.SString(v) n, err = vv.Marshal(buffer[offset:]) if err != nil { return 0, err } offset += n } return offset, nil } func (ps *PropertySchema) Unmarshal(buffer []byte) (int, error) { offset := 0 schemaCount := binary.BigEndian.Uint32(buffer[offset:]) offset += 4 (*ps).Schema = make([]*PSchema, schemaCount) for i := range (*ps).Schema { vType := ValueType(binary.BigEndian.Uint16(buffer[offset:])) offset += 2 var propKey serialize.SString n, err := propKey.Unmarshal(buffer[offset:]) if err != nil { return 0, err } offset += n schema := new(PSchema) schema.VType = vType schema.PropKey = string(propKey) (*ps).Schema[i] = schema } hasHgPSchema := binary.BigEndian.Uint32(buffer[offset:]) == 1 offset += 4 if !hasHgPSchema { return offset, nil } hgPSchemaPropIndexCount := binary.BigEndian.Uint32(buffer[offset:]) offset += 4 hgPSchema := new(HugegraphPSchema) hgPSchema.PropIndex = make(map[int]int, hgPSchemaPropIndexCount) for i := 0; i < int(hgPSchemaPropIndexCount); i++ { var k serialize.SInt32 n, err := k.Unmarshal(buffer[offset:]) if err != nil { return 0, err } offset += n var v serialize.SInt32 n, err = v.Unmarshal(buffer[offset:]) if err != nil { return 0, err } offset += n hgPSchema.PropIndex[int(k)] = int(v) } hgPSchemaLabelsCount := binary.BigEndian.Uint32(buffer[offset:]) offset += 4 hgPSchema.Labels = make(map[int]string, hgPSchemaLabelsCount) for i := 0; i < int(hgPSchemaLabelsCount); i++ { var k serialize.SInt32 n, err := k.Unmarshal(buffer[offset:]) if err != nil { return 0, err } offset += n var v serialize.SString n, err = v.Unmarshal(buffer[offset:]) if err != nil { return 0, err } offset += n hgPSchema.Labels[int(k)] = string(v) } ps.HgPSchema = hgPSchema return offset, nil } func (ps *PropertySchema) ToString() string { return "" } func (ps *PropertySchema) PredictSize() int { size := 0 size += 4 for _, v := range ps.Schema { size += 2 propKey := serialize.SString(v.PropKey) size += propKey.PredictSize() } size += 4 if ps.HgPSchema == nil { return size } size += len(ps.HgPSchema.PropIndex) * 8 size += 4 for _, v := range ps.HgPSchema.Labels { size += 4 vv := serialize.SString(v) size += vv.PredictSize() } size += 4 return size } func GetSchemaFromHugegraph(params map[string]string) (PropertySchema, PropertySchema, error) { pdIPAddress := options.GetSliceString(params, "load.hg_pd_peers") hgName := options.GetString(params, "load.hugegraph_name") username := options.GetString(params, "load.hugegraph_username") password := options.GetString(params, "load.hugegraph_password") useProperty := options.GetInt(params, "load.use_property") == 1 vertexPropIDs := strings.Split(options.GetString(params, "load.vertex_property"), ",") edgePropIDs := strings.Split(options.GetString(params, "load.edge_property"), ",") logrus.Infof("vertex props:%v , edge props:%v", vertexPropIDs, edgePropIDs) pdAddr, err := common.FindValidPD(pdIPAddress) if err != nil { logrus.Errorf("find valid pd failed, %v", err.Error()) return PropertySchema{}, PropertySchema{}, err } serverAddr, err := common.FindServerAddr(pdAddr, hgName, username, password) if err != nil { logrus.Errorf("find server addr failed, %v", err.Error()) return PropertySchema{}, PropertySchema{}, err } propertyFromHg, err := common.GetHugegraphSchema(serverAddr, hgName, username, password) if err != nil { logrus.Errorf("get hugegraph schema failed, %v", err.Error()) return PropertySchema{}, PropertySchema{}, err } vertexSchema := PropertySchema{} edgeSchema := PropertySchema{} err = vertexSchema.initFromHugegraphVertex(propertyFromHg, useProperty, vertexPropIDs) if err != nil { logrus.Errorf("init vertex schema failed, %v", err.Error()) return PropertySchema{}, PropertySchema{}, err } err = edgeSchema.initFromHugegraphEdge(propertyFromHg, useProperty, edgePropIDs) if err != nil { logrus.Errorf("init edge schema failed, %v", err.Error()) return PropertySchema{}, PropertySchema{}, err } return vertexSchema, edgeSchema, nil } type PropertyValue []serialize.MarshalAble func (p *PropertyValue) Init(schema PropertySchema) { *p = make([]serialize.MarshalAble, len(schema.Schema)) for i, v := range schema.Schema { switch v.VType { case ValueTypeInt32: value := serialize.SInt32(0) (*p)[i] = &value case ValueTypeFloat32: value := serialize.SFloat32(0) (*p)[i] = &value case ValueTypeString: value := serialize.SString("") (*p)[i] = &value default: logrus.Errorf("PropertyValue Init err, No matching type:%v", v.VType) } } } func (p *PropertyValue) Marshal(buffer []byte) (int, error) { offset := 0 binary.BigEndian.PutUint32(buffer, uint32(len(*p))) offset += 4 for _, v := range *p { n, err := v.Marshal(buffer[offset:]) if err != nil { return 0, err } offset += n } return offset, nil } func (p *PropertyValue) Unmarshal(buffer []byte) (int, error) { offset := 0 length := binary.BigEndian.Uint32(buffer) offset += 4 for i := 0; i < int(length); i++ { n, err := (*p)[i].Unmarshal(buffer[offset:]) if err != nil { return 0, err } offset += n } return offset, nil } func (p *PropertyValue) ToString() string { //TODO return "" } func (p *PropertyValue) PredictSize() int { return 0 } func (p *PropertyValue) CopyProperty(schema PropertySchema) PropertyValue { newPropertyValue := make([]serialize.MarshalAble, len(*p)) for i, v := range schema.Schema { switch v.VType { case ValueTypeInt32: value := *(*p)[i].(*serialize.SInt32) newPropertyValue[i] = &value case ValueTypeFloat32: value := *(*p)[i].(*serialize.SFloat32) newPropertyValue[i] = &value case ValueTypeString: value := *(*p)[i].(*serialize.SString) newPropertyValue[i] = &value default: logrus.Errorf("PropertyValue Init err, No matching type:%v", v.VType) } } return newPropertyValue } func (p *PropertyValue) LoadFromString(str string, schema PropertySchema) { strs := make([]string, 0) hasQuotationMark := false split := 0 for i := range str { if !hasQuotationMark && str[i] == ',' { strs = append(strs, strings.TrimSpace(str[split:i])) split = i + 1 } if str[i] == '"' { hasQuotationMark = !hasQuotationMark } } if split < len(str) { strs = append(strs, strings.TrimSpace(str[split:])) } for i, v := range schema.Schema { if i >= len(strs) { continue } switch v.VType { case ValueTypeInt32: n, err := strconv.Atoi(strs[i]) if err != nil { logrus.Errorf("Property LoadFromString Atoi err:%v", err) continue } value := serialize.SInt32(n) (*p)[i] = &value case ValueTypeFloat32: n, err := strconv.ParseFloat(strs[i], 32) if err != nil { logrus.Errorf("Property LoadFromString ParseFloat err:%v", err) continue } value := serialize.SFloat32(n) (*p)[i] = &value case ValueTypeString: value := serialize.SString(strs[i]) (*p)[i] = &value } } } func (p *PropertyValue) LoadFromHugegraph(prop []*store.Property, schema PropertySchema) { for _, prop := range prop { index, ok := schema.HgPSchema.PropIndex[int(prop.Label)] if !ok { logrus.Warnf("property index not exist:%v", prop.Label) continue } switch schema.Schema[index].VType { case ValueTypeInt32: valueInt, err := common.VariantToInt(prop.Value) if err != nil { logrus.Infof("VariantToInt error:%v", err) } value := serialize.SInt32(valueInt) (*p)[index] = &value case ValueTypeFloat32: valueFloat, err := common.VariantToFloat(prop.Value) if err != nil { logrus.Infof("VariantToFloat error:%v", err) } value := serialize.SFloat32(valueFloat) (*p)[index] = &value case ValueTypeString: valueString, err := common.VariantToString(prop.Value) if err != nil { logrus.Infof("VariantToString error:%v", err) } value := serialize.SString(valueString) (*p)[index] = &value } } } type VValues struct { VType ValueType Values interface{} } type VertexProperties map[string]*VValues func (vp *VertexProperties) Init(schema PropertySchema) { *vp = make(map[string]*VValues, len(schema.Schema)) for _, v := range schema.Schema { (*vp)[v.PropKey] = &VValues{} (*vp)[v.PropKey].VType = v.VType switch v.VType { case ValueTypeInt32: (*vp)[v.PropKey].Values = make([]serialize.SInt32, 0) case ValueTypeFloat32: (*vp)[v.PropKey].Values = make([]serialize.SFloat32, 0) case ValueTypeString: (*vp)[v.PropKey].Values = make([]serialize.SString, 0) } } } func (vp *VertexProperties) AppendProp(prop PropertyValue, schema PropertySchema) { values := prop.CopyProperty(schema) for i, v := range schema.Schema { switch v.VType { case ValueTypeInt32: (*vp)[v.PropKey].Values = append((*vp)[v.PropKey].Values.([]serialize.SInt32), *values[i].(*serialize.SInt32)) case ValueTypeFloat32: (*vp)[v.PropKey].Values = append((*vp)[v.PropKey].Values.([]serialize.SFloat32), *values[i].(*serialize.SFloat32)) case ValueTypeString: (*vp)[v.PropKey].Values = append((*vp)[v.PropKey].Values.([]serialize.SString), *values[i].(*serialize.SString)) } } } func (vp *VertexProperties) AppendProps(prop VertexProperties) { for k := range *vp { switch (*vp)[k].VType { case ValueTypeInt32: (*vp)[k].Values = append((*vp)[k].Values.([]serialize.SInt32), prop[k].Values.([]serialize.SInt32)...) case ValueTypeFloat32: (*vp)[k].Values = append((*vp)[k].Values.([]serialize.SFloat32), prop[k].Values.([]serialize.SFloat32)...) case ValueTypeString: (*vp)[k].Values = append((*vp)[k].Values.([]serialize.SString), prop[k].Values.([]serialize.SString)...) } } } func (vp *VertexProperties) GetValueType(propKey string) (ValueType, bool) { prop, ok := (*vp)[propKey] return prop.VType, ok } func (vp *VertexProperties) GetValue(propKey string, idx uint32) serialize.MarshalAble { var value serialize.MarshalAble switch (*vp)[propKey].VType { case ValueTypeInt32: value = &(*vp)[propKey].Values.([]serialize.SInt32)[idx] case ValueTypeFloat32: value = &(*vp)[propKey].Values.([]serialize.SFloat32)[idx] case ValueTypeString: value = &(*vp)[propKey].Values.([]serialize.SString)[idx] } return value } func (vp *VertexProperties) SetValue(propKey string, idx uint32, value serialize.MarshalAble) { switch (*vp)[propKey].VType { case ValueTypeInt32: sInt32 := value.(*serialize.SInt32) (*vp)[propKey].Values.([]serialize.SInt32)[idx] = *sInt32 case ValueTypeFloat32: sFloat32 := value.(*serialize.SFloat32) (*vp)[propKey].Values.([]serialize.SFloat32)[idx] = *sFloat32 case ValueTypeString: sString := value.(*serialize.SString) (*vp)[propKey].Values.([]serialize.SString)[idx] = *sString } } func (vp *VertexProperties) GetInt32Value(propKey string, idx uint32) serialize.SInt32 { value := (*vp)[propKey].Values.([]serialize.SInt32)[idx] return value } func (vp *VertexProperties) GetStringValue(propKey string, idx uint32) serialize.SString { value := (*vp)[propKey].Values.([]serialize.SString)[idx] return value } func (vp *VertexProperties) GetFloat32Value(propKey string, idx uint32) serialize.SFloat32 { value := (*vp)[propKey].Values.([]serialize.SFloat32)[idx] return value } func (vp *VertexProperties) GetValues(propKey string, idx uint32) (any, error) { var value any _, ok := (*vp)[propKey] // if idx < 0 { // return nil, fmt.Errorf("idx:%v out of range", idx) // } if !ok { return nil, fmt.Errorf("property key:%v not exist", propKey) } switch (*vp)[propKey].VType { case ValueTypeInt32: if int(idx) >= len((*vp)[propKey].Values.([]serialize.SInt32)) { return nil, fmt.Errorf("idx:%v out of range", idx) } value = (*vp)[propKey].Values.([]serialize.SInt32)[idx] case ValueTypeFloat32: if int(idx) >= len((*vp)[propKey].Values.([]serialize.SFloat32)) { return nil, fmt.Errorf("idx:%v out of range", idx) } value = (*vp)[propKey].Values.([]serialize.SFloat32)[idx] case ValueTypeString: if int(idx) >= len((*vp)[propKey].Values.([]serialize.SString)) { return nil, fmt.Errorf("idx:%v out of range", idx) } value = (*vp)[propKey].Values.([]serialize.SString)[idx] } return value, nil } func (vp *VertexProperties) Recast(totalCount int64, vertStart uint32, schema PropertySchema) { //VertexProperty Recast // var oldVertexProp VertexProperties oldVertexProp := make(map[string]*VValues, len(schema.Schema)) for _, k := range schema.Schema { oldVertexProp[k.PropKey] = &VValues{ VType: (*vp)[k.PropKey].VType, Values: (*vp)[k.PropKey].Values, } switch (*vp)[k.PropKey].VType { case ValueTypeInt32: (*vp)[k.PropKey].Values = make([]serialize.SInt32, totalCount) for i, v := range oldVertexProp[k.PropKey].Values.([]serialize.SInt32) { (*vp)[k.PropKey].Values.([]serialize.SInt32)[vertStart+uint32(i)] = v } case ValueTypeFloat32: (*vp)[k.PropKey].Values = make([]serialize.SFloat32, totalCount) for i, v := range oldVertexProp[k.PropKey].Values.([]serialize.SFloat32) { (*vp)[k.PropKey].Values.([]serialize.SFloat32)[vertStart+uint32(i)] = v } case ValueTypeString: (*vp)[k.PropKey].Values = make([]serialize.SString, totalCount) for i, v := range oldVertexProp[k.PropKey].Values.([]serialize.SString) { (*vp)[k.PropKey].Values.([]serialize.SString)[vertStart+uint32(i)] = v } } } } type EdgeProperties map[string]*VValues func (ep *EdgeProperties) Init(schema PropertySchema, vertexCount uint32) { *ep = make(map[string]*VValues, len(schema.Schema)) for _, v := range schema.Schema { (*ep)[v.PropKey] = &VValues{} (*ep)[v.PropKey].VType = v.VType switch v.VType { case ValueTypeInt32: (*ep)[v.PropKey].Values = make([][]serialize.SInt32, vertexCount) case ValueTypeFloat32: (*ep)[v.PropKey].Values = make([][]serialize.SFloat32, vertexCount) case ValueTypeString: (*ep)[v.PropKey].Values = make([][]serialize.SString, vertexCount) } } } func (ep *EdgeProperties) GetInt32Value(propKey string, vertID, idx uint32) serialize.SInt32 { value := (*ep)[propKey].Values.([][]serialize.SInt32)[vertID][idx] return value } func (ep *EdgeProperties) GetStringValue(propKey string, vertID, idx uint32) serialize.SString { value := (*ep)[propKey].Values.([][]serialize.SString)[vertID][idx] return value } func (ep *EdgeProperties) GetFloat32Value(propKey string, vertID, idx uint32) serialize.SFloat32 { value := (*ep)[propKey].Values.([][]serialize.SFloat32)[vertID][idx] return value } func (ep *EdgeProperties) GetValue(propKey string, vertID, idx uint32) (serialize.MarshalAble, error) { var value serialize.MarshalAble // if idx < 0 { // return nil, fmt.Errorf("idx:%v out of range", idx) // } _, ok := (*ep)[propKey] if !ok { return nil, fmt.Errorf("property key:%v not exist", propKey) } switch (*ep)[propKey].VType { case ValueTypeInt32: if int(idx) >= len((*ep)[propKey].Values.([][]serialize.SInt32)[vertID]) { return nil, fmt.Errorf("idx:%v out of range", idx) } value = &(*ep)[propKey].Values.([][]serialize.SInt32)[vertID][idx] case ValueTypeFloat32: if int(idx) >= len((*ep)[propKey].Values.([][]serialize.SFloat32)[vertID]) { return nil, fmt.Errorf("idx:%v out of range", idx) } value = &(*ep)[propKey].Values.([][]serialize.SFloat32)[vertID][idx] case ValueTypeString: if int(idx) >= len((*ep)[propKey].Values.([][]serialize.SString)[vertID]) { return nil, fmt.Errorf("idx:%v out of range", idx) } value = &(*ep)[propKey].Values.([][]serialize.SString)[vertID][idx] } return value, nil } func (ep *EdgeProperties) GetValueType(propKey string) (ValueType, bool) { prop, ok := (*ep)[propKey] if !ok { return ValueTypeUnknow, false } return prop.VType, ok } func (ep *EdgeProperties) AppendProp(prop PropertyValue, inIdx uint32, schema PropertySchema) { values := prop.CopyProperty(schema) for i, v := range schema.Schema { switch v.VType { case ValueTypeInt32: (*ep)[v.PropKey].Values.([][]serialize.SInt32)[inIdx] = append((*ep)[v.PropKey].Values.([][]serialize.SInt32)[inIdx], *values[i].(*serialize.SInt32)) case ValueTypeFloat32: (*ep)[v.PropKey].Values.([][]serialize.SFloat32)[inIdx] = append((*ep)[v.PropKey].Values.([][]serialize.SFloat32)[inIdx], *values[i].(*serialize.SFloat32)) case ValueTypeString: (*ep)[v.PropKey].Values.([][]serialize.SString)[inIdx] = append((*ep)[v.PropKey].Values.([][]serialize.SString)[inIdx], *values[i].(*serialize.SString)) } } } func (ep *EdgeProperties) OptimizeMemory() { for k, v := range *ep { switch v.VType { case ValueTypeInt32: for i := range (*ep)[k].Values.([][]serialize.SInt32) { values := make([]serialize.SInt32, 0, len((*ep)[k].Values.([][]serialize.SInt32)[i])) values = append(values, (*ep)[k].Values.([][]serialize.SInt32)[i]...) (*ep)[k].Values.([][]serialize.SInt32)[i] = values } case ValueTypeFloat32: for i := range (*ep)[k].Values.([][]serialize.SFloat32) { values := make([]serialize.SFloat32, 0, len((*ep)[k].Values.([][]serialize.SFloat32)[i])) values = append(values, (*ep)[k].Values.([][]serialize.SFloat32)[i]...) (*ep)[k].Values.([][]serialize.SFloat32)[i] = values } case ValueTypeString: for i := range (*ep)[k].Values.([][]serialize.SString) { values := make([]serialize.SString, 0, len((*ep)[k].Values.([][]serialize.SString)[i])) values = append(values, (*ep)[k].Values.([][]serialize.SString)[i]...) (*ep)[k].Values.([][]serialize.SString)[i] = values } } } }