vermeer/apps/structure/graph_meta.go (802 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package structure
import (
"encoding/json"
"errors"
"os"
"path"
"strconv"
"sync"
"time"
"vermeer/apps/common"
"github.com/sirupsen/logrus"
"vermeer/apps/serialize"
)
type GraphMeta struct {
Name string `json:"name,omitempty"`
VertIDStart uint32 `json:"vert_id_start,omitempty"`
VertexCount uint32 `json:"vertex_count,omitempty"`
TotalVertex serialize.PartsMeta `json:"total_vertex,omitempty"`
//BothEdges serialize.PartsMeta `json:"both_edges,omitempty"`
InEdges serialize.PartsMeta `json:"in_edges,omitempty"`
OutEdges serialize.PartsMeta `json:"out_edges,omitempty"`
OutDegree serialize.PartsMeta `json:"out_degree,omitempty"`
VertexLongIDMap serialize.PartsMeta `json:"vertex_long_id_map,omitempty"`
VertexPropertySchema serialize.PartsMeta `json:"vertex_property_schema,omitempty"`
VertexProperty map[string]serialize.PartsMeta `json:"vertex_property,omitempty"`
InEdgesPropertySchema serialize.PartsMeta `json:"in_edges_property_schema,omitempty"`
InEdgesProperty map[string]serialize.PartsMeta `json:"in_edges_property,omitempty"`
}
func SerializeToFile(m serialize.MarshalAble, size int, fileName string, group *sync.WaitGroup) {
defer func() {
group.Done()
if r := recover(); r != nil {
logrus.Errorf("SerializeToFile recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack())
}
}()
t := time.Now()
buffer := make([]byte, size)
_, err := m.Marshal(buffer)
if err != nil {
logrus.Errorf("SerializeToFile error,fileName=%v,err=%v", fileName, err)
}
err = writeToFile(fileName, buffer)
if err != nil {
logrus.Errorf("SerializeToFile error,fileName=%v,err=%v", fileName, err)
}
logrus.Debugf("SerializeToFile cost=%v,fileName=%v", time.Since(t), fileName)
}
func writeToFile(fileName string, buffer []byte) error {
f, err := os.OpenFile(fileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0660))
if err != nil {
return err
}
defer f.Close()
writer := serialize.NewWriter(f)
_, err = writer.Write(buffer)
if err != nil {
return err
}
err = writer.Flush()
if err != nil {
return err
}
return nil
}
func DeserializeFromFile(m serialize.MarshalAble, fileName string, group *sync.WaitGroup) {
defer func() {
group.Done()
if r := recover(); r != nil {
logrus.Errorf("DeserializeFromFile recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack())
}
}()
t := time.Now()
buffer, err := readFromFile(fileName)
if err != nil {
logrus.Errorf("DeserializeFromFile error fileName=%v,error=%v", fileName, err)
}
_, err = m.Unmarshal(buffer)
if err != nil {
logrus.Errorf("DeserializeFromFile Unmarshal error, fileName=%v,error=%v", fileName, err)
}
logrus.Debugf("DeserializeFromFile cost=%v,fileName=%s", time.Since(t), fileName)
}
func readFromFile(fileName string) ([]byte, error) {
f, err := os.Open(fileName)
if err != nil {
logrus.Errorf("open file err: %v", err)
return nil, err
}
reader := serialize.NewReader(f)
defer f.Close()
fi, err := f.Stat()
if err != nil {
logrus.Errorf("stat file err: %v", err)
return nil, err
}
fs := fi.Size()
buffer := make([]byte, fs)
_, err = reader.Read(buffer)
if err != nil {
logrus.Errorf("read file err: %v", err)
return nil, err
}
return buffer, nil
}
type SerializeTask struct {
m serialize.MarshalAble
size int
fileName string
}
const FileSizeLimit1 = 100 * 1024 * 1024
const FileSizeLimit2 = 20 * 1024 * 1024
func (gd *GraphData) Save(dir string) error {
if !common.IsFileOrDirExist(dir) {
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return err
}
}
var graphMeta GraphMeta
graphMeta.Name = gd.graphName
graphMeta.VertIDStart = gd.VertIDStart
graphMeta.VertexCount = gd.VertexCount
wg := sync.WaitGroup{}
if gd.VertexProperty != nil {
wg.Add(1)
go gd.VertexProperty.save(&graphMeta, dir, &wg)
//saveVertexProperty(gd, &graphMeta, FileSizeLimit1, dir, &wg)
}
if gd.InEdgesProperty != nil {
wg.Add(1)
go gd.InEdgesProperty.save(&graphMeta, dir, &wg)
//go saveInEdgesProperty(gd, &graphMeta, FileSizeLimit1, dir, &wg)
}
if gd.Vertex != nil {
gd.Vertex.save(&graphMeta, dir, &wg)
}
if gd.Edges != nil {
gd.Edges.save(&graphMeta, dir, &wg)
//go saveInEdges(gd, &graphMeta, FileSizeLimit1, dir, &wg)
}
if gd.VertexPropertySchema.HgPSchema != nil || gd.VertexPropertySchema.Schema != nil {
wg.Add(1)
go gd.VertexPropertySchema.saveVertexPropertySchema(&graphMeta, dir, &wg)
//go saveVertexPropertySchema(gd, &graphMeta, dir, &wg)
}
if gd.InEdgesPropertySchema.HgPSchema != nil || gd.InEdgesPropertySchema.Schema != nil {
wg.Add(1)
go gd.InEdgesPropertySchema.saveInEdgesPropertySchema(&graphMeta, dir, &wg)
//go saveInEdgesPropertySchema(gd, &graphMeta, dir, &wg)
}
wg.Wait()
mb, err := json.Marshal(graphMeta)
if err != nil {
return err
}
err = writeToFile(path.Join(dir, "data_meta"), mb)
if err != nil {
return nil
}
return nil
}
func (ps *PropertySchema) saveInEdgesPropertySchema(graphMeta *GraphMeta, dir string, wg *sync.WaitGroup) {
defer wg.Done()
inEdgesPropertySchemaFile := "in_edges_property_schema"
wg.Add(1)
go SerializeToFile(ps, ps.PredictSize(), path.Join(dir, inEdgesPropertySchemaFile), wg)
var meta serialize.PartsMeta
meta.FilePrefix = inEdgesPropertySchemaFile
meta.PartNum = 1
graphMeta.InEdgesPropertySchema = meta
}
func (ps *PropertySchema) saveVertexPropertySchema(graphMeta *GraphMeta, dir string, wg *sync.WaitGroup) {
defer wg.Done()
vertexPropertySchemaFile := "vertex_property_schema"
wg.Add(1)
go SerializeToFile(ps, ps.PredictSize(), path.Join(dir, vertexPropertySchemaFile), wg)
var meta serialize.PartsMeta
meta.FilePrefix = vertexPropertySchemaFile
meta.PartNum = 1
graphMeta.VertexPropertySchema = meta
}
func (em *EdgeMem) save(graphMeta *GraphMeta, dir string, wg *sync.WaitGroup) {
if em.InEdges != nil {
wg.Add(1)
go em.saveInEdges(graphMeta, FileSizeLimit1, dir, wg)
}
if em.OutEdges != nil {
wg.Add(1)
go em.saveOutEdges(graphMeta, FileSizeLimit1, dir, wg)
}
if em.OutDegree != nil {
wg.Add(1)
go em.saveOutDegree(graphMeta, FileSizeLimit1, dir, wg)
}
}
func (em *EdgeMem) saveOutDegree(graphMeta *GraphMeta, fileLimit int, dir string, wg *sync.WaitGroup) {
defer wg.Done()
outDegree := serialize.SliceUint32(em.OutDegree)
tt := time.Now()
parts := outDegree.Partition(fileLimit)
logrus.Debugf("Serialize OutDegree Partition cost=%v", time.Since(tt))
filePrefix := "out_degree_"
for id, one := range parts {
part := outDegree[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
var meta serialize.PartsMeta
meta.FilePrefix = filePrefix
meta.PartNum = len(parts)
graphMeta.OutDegree = meta
}
func (em *EdgeMem) saveOutEdges(graphMeta *GraphMeta, fileLimit int, dir string, wg *sync.WaitGroup) {
defer wg.Done()
tt := time.Now()
parts := em.OutEdges.Partition(fileLimit)
logrus.Debugf("Serialize OutEdges Partition cost=%v", time.Since(tt))
filePrefix := "out_edges_"
for id, one := range parts {
part := em.OutEdges[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
var meta serialize.PartsMeta
meta.FilePrefix = filePrefix
meta.PartNum = len(parts)
graphMeta.OutEdges = meta
}
func (em *EdgeMem) saveInEdges(graphMeta *GraphMeta, fileLimit int, dir string, wg *sync.WaitGroup) {
defer wg.Done()
tt := time.Now()
parts := em.InEdges.Partition(fileLimit)
logrus.Debugf("Serialize InEdges Partition cost=%v", time.Since(tt))
filePrefix := "in_edges_"
for id, one := range parts {
part := em.InEdges[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
var meta serialize.PartsMeta
meta.FilePrefix = filePrefix
meta.PartNum = len(parts)
graphMeta.InEdges = meta
}
func (vm *VertexMem) save(graphMeta *GraphMeta, dir string, wg *sync.WaitGroup) {
if vm.VertexLongIDMap != nil {
wg.Add(1)
go vm.saveVertexLongIDMap(graphMeta, FileSizeLimit2, dir, wg)
}
if vm.TotalVertex != nil {
wg.Add(1)
go vm.saveTotalVertex(graphMeta, FileSizeLimit1, dir, wg)
}
}
func (vm *VertexMem) saveTotalVertex(graphMeta *GraphMeta, fileLimit int, dir string, wg *sync.WaitGroup) {
defer wg.Done()
totalVertex := SliceVertex(vm.TotalVertex)
tt := time.Now()
parts := totalVertex.Partition(fileLimit)
logrus.Debugf("Serialize TotalVertex Partition cost=%v", time.Since(tt))
filePrefix := "total_vertex_"
for id, one := range parts {
part := totalVertex[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
var meta serialize.PartsMeta
meta.FilePrefix = filePrefix
meta.PartNum = len(parts)
graphMeta.TotalVertex = meta
}
func (vm *VertexMem) saveVertexLongIDMap(graphMeta *GraphMeta, fileLimit int, dir string, wg *sync.WaitGroup) {
defer wg.Done()
length := len(vm.VertexLongIDMap)
t1 := time.Now()
s := make([]serialize.KVpair, length)
index := 0
for k, v := range vm.VertexLongIDMap {
entry := serialize.KVpair{K: k, V: v}
s[index] = entry
index++
}
logrus.Debugf("Serialize vertexLongIDMap map to slice cost=%v,s length=%v", time.Since(t1), len(s))
filePrefix := "vertex_long_id_map_"
vertexLongIDSlice := serialize.SliceKVpair(s)
parts := vertexLongIDSlice.Partition(fileLimit)
logrus.Debugf("Serialize vertexLongIDMap parts=%v", parts)
for id, one := range parts {
part := vertexLongIDSlice[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
var meta serialize.PartsMeta
meta.FilePrefix = filePrefix
meta.PartNum = len(parts)
graphMeta.VertexLongIDMap = meta
}
func (vp *VertexProperties) save(meta *GraphMeta, dir string, wg *sync.WaitGroup) {
defer wg.Done()
t := time.Now()
var vertexPropertyMeta = make(map[string]serialize.PartsMeta)
fileLimit := FileSizeLimit1
for k, v := range *vp {
var partMeta serialize.PartsMeta
filePrefix := "vertex_property_" + k + "_"
partMeta.FilePrefix = filePrefix
switch v.VType {
case ValueTypeInt32:
{
values := serialize.SliceInt32(v.Values.([]serialize.SInt32))
tt := time.Now()
parts := values.Partition(fileLimit)
logrus.Debugf("Serialize VertexProperty ValueTypeInt32 Partition cost=%v", time.Since(tt))
partMeta.PartNum = len(parts)
partMeta.ValueType = uint16(ValueTypeInt32)
for id, one := range parts {
part := values[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
}
case ValueTypeFloat32:
{
values := serialize.SliceFloat32(v.Values.([]serialize.SFloat32))
tt := time.Now()
parts := values.Partition(fileLimit)
logrus.Debugf("Serialize VertexProperty ValueTypeFloat32 Partition cost=%v", time.Since(tt))
partMeta.PartNum = len(parts)
partMeta.ValueType = uint16(ValueTypeFloat32)
for id, one := range parts {
part := values[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
}
case ValueTypeString:
{
values := serialize.SliceString(v.Values.([]serialize.SString))
tt := time.Now()
parts := values.Partition(fileLimit)
logrus.Debugf("Serialize VertexProperty ValueTypeString Partition cost=%v", time.Since(tt))
partMeta.PartNum = len(parts)
partMeta.ValueType = uint16(ValueTypeString)
for id, one := range parts {
part := values[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
}
}
vertexPropertyMeta[k] = partMeta
}
meta.VertexProperty = vertexPropertyMeta
logrus.Debugf("Serialize serializeVertexProperty2 cost=%v", time.Since(t))
}
func (ep *EdgeProperties) save(meta *GraphMeta, dir string, wg *sync.WaitGroup) {
defer wg.Done()
t := time.Now()
var inEdgesPropertyMeta = make(map[string]serialize.PartsMeta)
fileLimit := FileSizeLimit1
for k, v := range *ep {
var partMeta serialize.PartsMeta
filePrefix := "in_edges_property_" + k + "_"
partMeta.FilePrefix = filePrefix
switch v.VType {
case ValueTypeInt32:
{
values := serialize.TwoDimSliceInt32(v.Values.([][]serialize.SInt32))
tt := time.Now()
parts := values.Partition(fileLimit)
logrus.Debugf("Serialize InEdgesProperty ValueTypeInt32 Partition cost=%v", time.Since(tt))
partMeta.PartNum = len(parts)
partMeta.ValueType = uint16(ValueTypeInt32)
for id, one := range parts {
part := values[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
}
case ValueTypeFloat32:
{
values := serialize.TwoDimSliceFloat32(v.Values.([][]serialize.SFloat32))
tt := time.Now()
parts := values.Partition(fileLimit)
logrus.Debugf("Serialize InEdgesProperty ValueTypeFloat32 Partition cost=%v", time.Since(tt))
partMeta.PartNum = len(parts)
partMeta.ValueType = uint16(ValueTypeFloat32)
for id, one := range parts {
part := values[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
}
case ValueTypeString:
{
values := serialize.TwoDimSliceString(v.Values.([][]serialize.SString))
tt := time.Now()
parts := values.Partition(fileLimit)
logrus.Debugf("Serialize InEdgesProperty ValueTypeString Partition cost=%v", time.Since(tt))
partMeta.PartNum = len(parts)
partMeta.ValueType = uint16(ValueTypeString)
for id, one := range parts {
part := values[one.Start:one.End]
wg.Add(1)
go SerializeToFile(&part, one.Size, path.Join(dir, filePrefix)+strconv.Itoa(id), wg)
}
}
}
inEdgesPropertyMeta[k] = partMeta
}
meta.InEdgesProperty = inEdgesPropertyMeta
logrus.Debugf("Serialize serializeInEdgesProperty2 cost=%v", time.Since(t))
}
func (gd *GraphData) Load(dir string) error {
if len(gd.graphName) == 0 {
return errors.New("need db name")
}
dataDir := path.Join(dir, "data")
metaFile := path.Join(dataDir, "data_meta")
buffer, err := readFromFile(metaFile)
if err != nil {
return err
}
var meta GraphMeta
err = json.Unmarshal(buffer, &meta)
if err != nil {
return err
}
gd.graphName = meta.Name
gd.VertIDStart = meta.VertIDStart
gd.VertexCount = meta.VertexCount
wg := sync.WaitGroup{}
gd.Vertex.load(meta, dataDir, &wg)
gd.Edges.load(meta, dataDir, &wg)
if meta.VertexPropertySchema.PartNum != 0 {
wg.Add(1)
go gd.VertexPropertySchema.load(&wg, meta.VertexPropertySchema.FilePrefix, dataDir)
}
if meta.InEdgesPropertySchema.PartNum != 0 {
wg.Add(1)
go gd.InEdgesPropertySchema.load(&wg, meta.InEdgesPropertySchema.FilePrefix, dataDir)
}
if len(meta.VertexProperty) != 0 {
wg.Add(1)
go gd.VertexProperty.load(&wg, meta, dataDir)
}
if len(meta.InEdgesProperty) != 0 {
wg.Add(1)
go gd.InEdgesProperty.load(&wg, meta, dataDir)
}
wg.Wait()
return nil
}
func (ps *PropertySchema) load(wg *sync.WaitGroup, filePrefix string, dir string) {
defer wg.Done()
w := sync.WaitGroup{}
fileName := path.Join(dir, filePrefix)
w.Add(1)
go DeserializeFromFile(ps, fileName, &w)
w.Wait()
}
func (em *EdgeMem) load(meta GraphMeta, dataDir string, wg *sync.WaitGroup) {
if meta.InEdges.PartNum != 0 {
wg.Add(1)
go em.loadInEdges(wg, meta, dataDir)
}
if meta.OutEdges.PartNum != 0 {
wg.Add(1)
go em.loadOutEdges(wg, meta, dataDir)
}
if meta.OutDegree.PartNum != 0 {
wg.Add(1)
go em.loadOutDegree(wg, meta, dataDir)
}
}
func (em *EdgeMem) loadOutDegree(wg *sync.WaitGroup, meta GraphMeta, dir string) {
tt := time.Now()
defer wg.Done()
w := sync.WaitGroup{}
partsNum := meta.OutDegree.PartNum
p := make([]serialize.SliceUint32, partsNum)
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, meta.OutDegree.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
ttt := time.Now()
length := 0
for _, s := range p {
length += len(s)
}
outDegree := make([]serialize.SUint32, length)
offset := 0
for _, s := range p {
n := copy(outDegree[offset:], s)
offset += n
}
em.OutDegree = outDegree
logrus.Debugf("Deserialize outDegree wait cost=%v", time.Since(ttt))
logrus.Debugf("Deserialize OutDegree cost=%v", time.Since(tt))
}
func (em *EdgeMem) loadOutEdges(wg *sync.WaitGroup, meta GraphMeta, dir string) {
tt := time.Now()
defer wg.Done()
w := sync.WaitGroup{}
partsNum := meta.OutEdges.PartNum
p := make([]serialize.TwoDimSliceUint32, partsNum)
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, meta.OutEdges.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
ttt := time.Now()
length := 0
for _, s := range p {
length += len(s)
}
outEdges := make([]serialize.SliceUint32, length)
offset := 0
for _, s := range p {
n := copy(outEdges[offset:], s)
offset += n
}
em.OutEdges = outEdges
logrus.Debugf("Deserialize outEdges wait cost=%v", time.Since(ttt))
logrus.Debugf("Deserialize OutEdges cost=%v", time.Since(tt))
}
func (em *EdgeMem) loadInEdges(wg *sync.WaitGroup, meta GraphMeta, dir string) {
tt := time.Now()
defer wg.Done()
w := sync.WaitGroup{}
partsNum := meta.InEdges.PartNum
p := make([]serialize.TwoDimSliceUint32, partsNum)
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, meta.InEdges.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
ttt := time.Now()
length := 0
for _, s := range p {
length += len(s)
}
inEdges := make([]serialize.SliceUint32, length)
offset := 0
for _, s := range p {
n := copy(inEdges[offset:], s)
offset += n
}
em.InEdges = inEdges
logrus.Debugf("Deserialize InEdges wait cost=%v", time.Since(ttt))
logrus.Debugf("Deserialize InEdges cost=%v", time.Since(tt))
}
func (vm *VertexMem) load(meta GraphMeta, dataDir string, wg *sync.WaitGroup) {
if meta.VertexLongIDMap.PartNum != 0 {
wg.Add(1)
go vm.loadVertexLongIDMap(wg, meta, dataDir)
}
if meta.TotalVertex.PartNum != 0 {
wg.Add(1)
go vm.loadTotalVertex(wg, meta, dataDir)
}
}
func (vm *VertexMem) loadTotalVertex(wg *sync.WaitGroup, meta GraphMeta, dir string) {
tt := time.Now()
defer wg.Done()
w := sync.WaitGroup{}
partsNum := meta.TotalVertex.PartNum
p := make([]SliceVertex, partsNum)
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, meta.TotalVertex.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
length := 0
for _, s := range p {
length += len(s)
}
totalVertex := make([]Vertex, length)
offset := 0
for _, s := range p {
n := copy(totalVertex[offset:], s)
offset += n
}
vm.TotalVertex = totalVertex
logrus.Debugf("Deserialize TotalVertex cost=%v", time.Since(tt))
}
func (vm *VertexMem) loadVertexLongIDMap(wg *sync.WaitGroup, meta GraphMeta, dir string) {
defer wg.Done()
tt := time.Now()
w := sync.WaitGroup{}
partsNum := meta.VertexLongIDMap.PartNum
p := make([]serialize.SliceKVpair, partsNum)
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, meta.VertexLongIDMap.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
length := 0
for _, s := range p {
length += len(s)
}
t1 := time.Now()
vertexLongIDMap := make(map[string]uint32, length)
for _, s := range p {
for _, v := range s {
vertexLongIDMap[v.K] = v.V
}
}
logrus.Debugf("Deserialize VertexLongIDMap build map cost=%v", time.Since(t1))
vm.VertexLongIDMap = vertexLongIDMap
logrus.Debugf("Deserialize VertexLongIDMap cost=%v", time.Since(tt))
}
func (vp *VertexProperties) load(wg *sync.WaitGroup, meta GraphMeta, dir string) {
defer wg.Done()
tt := time.Now()
vertexProperty := make(map[string]*VValues)
ww := sync.WaitGroup{}
rwLock := sync.RWMutex{}
for k, v := range meta.VertexProperty {
ww.Add(1)
go func(wg *sync.WaitGroup, kk string, vv serialize.PartsMeta) {
defer wg.Done()
vValues := new(VValues)
vValues.VType = ValueType(vv.ValueType)
switch vValues.VType {
case ValueTypeInt32:
{
partsNum := vv.PartNum
p := make([]serialize.SliceInt32, partsNum)
w := sync.WaitGroup{}
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, vv.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
length := 0
for _, s := range p {
length += len(s)
}
values := make([]serialize.SInt32, length)
offset := 0
for _, s := range p {
n := copy(values[offset:], s)
offset += n
}
vValues.Values = values
}
case ValueTypeFloat32:
{
partsNum := vv.PartNum
p := make([]serialize.SliceFloat32, partsNum)
w := sync.WaitGroup{}
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, vv.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
length := 0
for _, s := range p {
length += len(s)
}
values := make([]serialize.SFloat32, length)
offset := 0
for _, s := range p {
n := copy(values[offset:], s)
offset += n
}
vValues.Values = values
}
case ValueTypeString:
{
partsNum := vv.PartNum
p := make([]serialize.SliceString, partsNum)
w := sync.WaitGroup{}
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, vv.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
length := 0
for _, s := range p {
length += len(s)
}
values := make([]serialize.SString, length)
offset := 0
for _, s := range p {
n := copy(values[offset:], s)
offset += n
}
vValues.Values = values
}
}
rwLock.Lock()
vertexProperty[kk] = vValues
rwLock.Unlock()
}(&ww, k, v)
}
ww.Wait()
*vp = vertexProperty
logrus.Debugf("Deserialize VertexProperty cost=%v", time.Since(tt))
}
func (ep *EdgeProperties) load(wg *sync.WaitGroup, meta GraphMeta, dir string) {
defer wg.Done()
tt := time.Now()
inEdgeProperty := make(map[string]*VValues)
ww := sync.WaitGroup{}
rwLock := sync.RWMutex{}
for k, v := range meta.InEdgesProperty {
ww.Add(1)
go func(wg *sync.WaitGroup, kk string, vv serialize.PartsMeta) {
defer wg.Done()
vValues := new(VValues)
vValues.VType = ValueType(vv.ValueType)
switch vValues.VType {
case ValueTypeInt32:
{
partsNum := vv.PartNum
p := make([]serialize.TwoDimSliceInt32, partsNum)
w := sync.WaitGroup{}
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, vv.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
length := 0
for _, s := range p {
length += len(s)
}
values := make([][]serialize.SInt32, length)
offset := 0
for _, s := range p {
n := copy(values[offset:], s)
offset += n
}
vValues.Values = values
}
case ValueTypeFloat32:
{
partsNum := vv.PartNum
p := make([]serialize.TwoDimSliceFloat32, partsNum)
w := sync.WaitGroup{}
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, vv.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
length := 0
for _, s := range p {
length += len(s)
}
values := make([][]serialize.SFloat32, length)
offset := 0
for _, s := range p {
n := copy(values[offset:], s)
offset += n
}
vValues.Values = values
}
case ValueTypeString:
{
partsNum := vv.PartNum
p := make([]serialize.TwoDimSliceString, partsNum)
w := sync.WaitGroup{}
for i := 0; i < partsNum; i++ {
fileName := path.Join(dir, vv.FilePrefix) + strconv.Itoa(i)
w.Add(1)
go DeserializeFromFile(&p[i], fileName, &w)
}
w.Wait()
length := 0
for _, s := range p {
length += len(s)
}
values := make([][]serialize.SString, length)
offset := 0
for _, s := range p {
n := copy(values[offset:], s)
offset += n
}
vValues.Values = values
}
}
rwLock.Lock()
inEdgeProperty[kk] = vValues
rwLock.Unlock()
}(&ww, k, v)
}
ww.Wait()
*ep = inEdgeProperty
logrus.Debugf("Deserialize InEdgesProperty cost=%v", time.Since(tt))
}
func (gd *GraphData) Remove(dir string) error {
err := os.RemoveAll(dir)
return err
}