vermeer/apps/graphio/load_graph.go (358 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package graphio
import (
"errors"
"fmt"
"sync"
"time"
"vermeer/apps/common"
"vermeer/apps/options"
"vermeer/apps/serialize"
"vermeer/apps/structure"
"github.com/sirupsen/logrus"
)
const (
LoadPartStatusPrepared = "prepared"
LoadPartStatusLoading = "loading"
LoadPartStatusDone = "done"
)
const (
LoadTypeLocal = "local"
LoadTypeHdfs = "hdfs"
LoadTypeHugegraph = "hugegraph"
LoadTypeAFS = "afs"
LoadTypeNone = "none"
)
var FetchEof = errors.New("EOF")
var LoadMakers = map[string]GraphLoadMaker{}
type GraphLoadMaker interface {
CreateGraphLoader() GraphLoader
CreateGraphWriter() GraphWriter
MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error)
}
type GraphLoadMakerBase struct{}
func init() {
LoadMakers[LoadTypeNone] = &GraphLoadMakerBase{}
}
func (g *GraphLoadMakerBase) CreateGraphLoader() GraphLoader {
return &GraphLoaderBase{}
}
func (g *GraphLoadMakerBase) CreateGraphWriter() GraphWriter {
return &GraphWriterBase{}
}
func (g *GraphLoadMakerBase) MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error) {
return nil, errors.New("not implemented")
}
type GraphLoader interface {
Init(params map[string]string, schema structure.PropertySchema) error
ReadVertex(vertex *structure.Vertex, property *structure.PropertyValue) error
ReadEdge(edge *structure.Edge, property *structure.PropertyValue) error
Name() string
ReadCount() int
Close()
}
type GraphLoaderBase struct{}
func (g *GraphLoaderBase) Init(params map[string]string, schema structure.PropertySchema) error {
return nil
}
func (g *GraphLoaderBase) ReadVertex(vertex *structure.Vertex, property *structure.PropertyValue) error {
return nil
}
func (g *GraphLoaderBase) ReadEdge(edge *structure.Edge, property *structure.PropertyValue) error {
return nil
}
func (g *GraphLoaderBase) Name() string {
return "none"
}
func (g *GraphLoaderBase) ReadCount() int {
return 0
}
func (g *GraphLoaderBase) Close() {}
type GraphWriter interface {
Init(info WriterInitInfo) error
WriteVertex(info WriteVertexValue)
WriteCount() int
WriteStatistics(statistics map[string]any) error
Close()
}
type GraphWriterBase struct {
}
func (g *GraphWriterBase) Init(info WriterInitInfo) error {
return nil
}
func (g *GraphWriterBase) WriteVertex(info WriteVertexValue) {}
func (g *GraphWriterBase) WriteCount() int {
return 0
}
func (g *GraphWriterBase) WriteStatistics(statistics map[string]any) error {
return nil
}
func (g *GraphWriterBase) Close() {}
type WriteMode int
const (
WriteModeVertexValue WriteMode = iota
WriteModeStatistics
)
type WriterInitInfo struct {
Params map[string]string
Mode WriteMode
PartID int
MaxID int
HgVertexSchema structure.PropertySchema
OutputType string
}
type WriteVertexValue struct {
VertexID string
Value serialize.MarshalAble
HgLabel string
}
func MakeLoader(loadType string) GraphLoader {
maker, ok := LoadMakers[loadType]
if !ok {
logrus.Errorf("no matched loader: %s", loadType)
return nil
}
return maker.CreateGraphLoader()
}
func MakeWriter(outType string) GraphWriter {
maker, ok := LoadMakers[outType]
if !ok {
logrus.Errorf("no matched writer: %s", outType)
return nil
}
return maker.CreateGraphWriter()
}
type LoadGraphTask struct {
Task *structure.TaskInfo
loadParts []LoadPartition
LoadType string
Parallel *int32
LoadWg *sync.WaitGroup
RecvWg *common.SpinWaiter
SendCount map[string]*int32
RecvCount map[string]*int32
Locker *sync.Mutex
}
func (lt *LoadGraphTask) GetPartition(partIdx int32) LoadPartition {
return lt.loadParts[partIdx]
}
func (lt *LoadGraphTask) MakeTask() error {
lt.Parallel = new(int32)
maker, ok := LoadMakers[lt.LoadType]
if !ok {
return fmt.Errorf("no matched load type: %s", lt.LoadType)
}
var err error
lt.loadParts, err = maker.MakeTasks(lt.Task.Params, lt.Task.ID)
if err != nil {
return err
}
for i := range lt.loadParts {
lt.loadParts[i].Params["load.use_property"] = lt.Task.Params["load.use_property"]
lt.loadParts[i].Params["load.delimiter"] = lt.Task.Params["load.delimiter"]
}
return nil
}
//func (lt *LoadGraphTask) Install() {
// ctx := context.Background()
// parallel := options.GetInt(lt.Task.Params, "load.parallel")
// for i := 0; i < parallel; i++ {
// go lt.Run(ctx)
// }
//}
//
//func (lt *LoadGraphTask) Run(ctx context.WContext) {
// _ = ctx
// for {
// _, err := lt.handler.FetchPartition(lt.Task.Id)
// if err != nil {
// if err == FetchEof {
// logrus.Infof("fetch partition eof")
// break
// }
// logrus.Infof("fetch partition eof")
// }
// _ = lt.handler.ScatterGraph(lt.Task.Id, nil)
// }
//}
func (lt *LoadGraphTask) SetPartStatus(partIdx int32, status string) {
lt.loadParts[partIdx].SetStatus(status)
}
func (lt *LoadGraphTask) FreeMemory() {
lt.loadParts = nil
lt.SendCount = nil
lt.RecvCount = nil
lt.RecvWg = nil
lt.LoadWg = nil
}
const (
LoadPartTypeVertex = "vertex"
LoadPartTypeEdge = "edge"
)
type LoadPartition struct {
Id int32
TaskId int32
CreateTime time.Time
UpdateTime time.Time
Status string
Type string
IpAddr string
Params map[string]string
}
func (lp *LoadPartition) Init(id int32, taskId int32, partType string) {
lp.Id = id
lp.TaskId = taskId
lp.CreateTime = time.Now()
lp.UpdateTime = time.Now()
lp.Status = LoadPartStatusPrepared
lp.Type = partType
}
func (lp *LoadPartition) SetStatus(status string) {
lp.Status = status
lp.UpdateTime = time.Now()
}
// --------------------LoadGraphMaster---------------------
var LoadGraphMaster = &loadGraphMaster{}
type loadGraphMaster struct {
//LoadGraphSpaceMap map[string]*loadGraphMasterSpace
//sync.Mutex
loadTasks map[int32]*LoadGraphTask
locker sync.RWMutex
}
// type LoadGraphMasterSpace struct {
// loadTasks map[int32]LoadGraphTask
// locker sync.RWMutex
// }
func (lm *loadGraphMaster) Init() {
lm.loadTasks = make(map[int32]*LoadGraphTask)
}
// func (lm *loadGraphMasterSpace) Init() {
// lm.locker = sync.RWMutex{}
// lm.loadTasks = make(map[int32]LoadGraphTask, 0)
// }
// func (lm *loadGraphMaster) AddSpace(spaceName string) error {
// lm.Lock()
// defer lm.Unlock()
// if lm.LoadGraphSpaceMap[spaceName] != nil {
// return fmt.Errorf("LoadGraphMaster space exists:%s", spaceName)
// }
// loadGraphMasterSpace := &LoadGraphMasterSpace{}
// loadGraphMasterSpace.Init()
// lm.LoadGraphSpaceMap[spaceName] = loadGraphMasterSpace
// return nil
// }
func (lm *loadGraphMaster) MakeLoadTasks(taskInfo *structure.TaskInfo) (*LoadGraphTask, error) {
lm.locker.Lock()
defer lm.locker.Unlock()
if t, ok := lm.loadTasks[taskInfo.ID]; ok {
msg := fmt.Sprintln("MakeLoadTasks error: task exists.")
logrus.Errorf(msg)
return t, errors.New(msg)
}
loadTask := &LoadGraphTask{}
loadTask.Task = taskInfo
loadTask.LoadWg = &sync.WaitGroup{}
loadTask.RecvWg = new(common.SpinWaiter)
loadTask.SendCount = make(map[string]*int32, len(loadTask.Task.Workers))
loadTask.RecvCount = make(map[string]*int32, len(loadTask.Task.Workers))
for _, worker := range loadTask.Task.Workers {
loadTask.SendCount[worker.Name] = new(int32)
loadTask.RecvCount[worker.Name] = new(int32)
}
loadTask.Locker = &sync.Mutex{}
loadTask.LoadType = options.GetString(taskInfo.Params, "load.type")
err := loadTask.MakeTask()
if err != nil {
return loadTask, err
}
lm.loadTasks[taskInfo.ID] = loadTask
loadTask.Task.State = structure.TaskStateLoadVertex
return loadTask, nil
}
func (lm *loadGraphMaster) GetLoadTask(taskID int32) *LoadGraphTask {
lm.locker.Lock()
defer lm.locker.Unlock()
if v, ok := lm.loadTasks[taskID]; ok {
return v
}
logrus.Errorf("GetLoadTask task not exists: %d", taskID)
return nil
}
func (lm *loadGraphMaster) FetchPreparedPart(taskID int32, workerIP string) (LoadPartition, error) {
lm.locker.Lock()
defer lm.locker.Unlock()
loadTask, ok := lm.loadTasks[taskID]
if !ok {
return LoadPartition{}, fmt.Errorf("load task not exists: %d", taskID)
}
if loadTask.Task.State == structure.TaskStateLoaded {
return LoadPartition{}, FetchEof
}
for i := range loadTask.loadParts {
if loadTask.LoadType == LoadTypeLocal &&
loadTask.loadParts[i].IpAddr != workerIP {
continue
}
if loadTask.Task.State == structure.TaskStateLoadVertex {
if loadTask.loadParts[i].Type != LoadPartTypeVertex {
continue
}
} else if loadTask.Task.State == structure.TaskStateLoadEdge {
if loadTask.loadParts[i].Type != LoadPartTypeEdge {
continue
}
} else {
logrus.Errorf("fetch task status error: %s", loadTask.Task.State)
return LoadPartition{}, fmt.Errorf("fetch task status error: %s", loadTask.Task.State)
}
if loadTask.loadParts[i].Status == LoadPartStatusPrepared {
loadTask.SetPartStatus(int32(i), LoadPartStatusLoading)
return loadTask.loadParts[i], nil
}
}
return LoadPartition{}, FetchEof
}
func (lm *loadGraphMaster) LoadTaskDone(taskID, partID int32) {
lm.locker.Lock()
defer lm.locker.Unlock()
loadTask, ok := lm.loadTasks[taskID]
if !ok {
return
}
loadTask.SetPartStatus(partID-1, LoadPartStatusDone)
if loadTask.Task.State == structure.TaskStateLoadVertex {
for _, t := range loadTask.loadParts {
if t.Type == LoadPartTypeVertex && t.Status != LoadPartStatusDone {
return
}
}
loadTask.Task.SetState(structure.TaskStateLoadScatter)
} else if loadTask.Task.State == structure.TaskStateLoadEdge {
for _, t := range loadTask.loadParts {
if t.Status != LoadPartStatusDone {
return
}
}
loadTask.Task.SetState(structure.TaskStateLoaded)
}
}
func (lm *loadGraphMaster) CheckLoadTaskStatus(taskID int32) bool {
lm.locker.Lock()
defer lm.locker.Unlock()
loadTask, ok := lm.loadTasks[taskID]
if !ok {
return false
}
for _, t := range loadTask.loadParts {
if t.Status != LoadPartStatusDone {
return false
}
}
return true
}
func (lm *loadGraphMaster) DeleteTask(taskID int32) {
lm.locker.Lock()
defer lm.locker.Unlock()
delete(lm.loadTasks, taskID)
}
// --------------------LoadGraphWorker--------------------
var LoadGraphWorker = &loadGraphWorker{}
type loadGraphWorker struct {
loadTasks map[int32]*LoadGraphTask
locker sync.RWMutex
}
func (lw *loadGraphWorker) Init() {
lw.loadTasks = make(map[int32]*LoadGraphTask)
}
func (lw *loadGraphWorker) GetLoadTask(taskID int32) *LoadGraphTask {
lw.locker.Lock()
defer lw.locker.Unlock()
return lw.loadTasks[taskID]
}
func (lw *loadGraphWorker) InstallTask(taskInfo *structure.TaskInfo) *LoadGraphTask {
lw.locker.Lock()
defer lw.locker.Unlock()
if _, ok := lw.loadTasks[taskInfo.ID]; ok {
return lw.loadTasks[taskInfo.ID]
}
loadTask := LoadGraphTask{
Task: taskInfo,
loadParts: make([]LoadPartition, 0),
Parallel: new(int32),
LoadWg: &sync.WaitGroup{},
RecvWg: new(common.SpinWaiter),
Locker: &sync.Mutex{},
}
loadTask.SendCount = make(map[string]*int32, len(loadTask.Task.Workers))
loadTask.RecvCount = make(map[string]*int32, len(loadTask.Task.Workers))
for _, worker := range loadTask.Task.Workers {
loadTask.SendCount[worker.Name] = new(int32)
loadTask.RecvCount[worker.Name] = new(int32)
}
lw.loadTasks[taskInfo.ID] = &loadTask
// TODO: why not a pointer?
return &loadTask
}
func (lw *loadGraphWorker) RunTask(info structure.TaskInfo) {
if info.Type == LoadTypeLocal {
} else if info.Type == LoadTypeHdfs {
}
}
func (lw *loadGraphWorker) DeleteTask(taskID int32) {
lw.locker.Lock()
defer lw.locker.Unlock()
delete(lw.loadTasks, taskID)
}