in vermeer/apps/worker/compute_bl.go [485:612]
func (cb *ComputeBl) output(graph *structure.VermeerGraph, computeTask *compute.ComputerTask, parallel int) error {
wIdx := graph.GetSelfIndex()
worker := graph.GetSelfWorker()
outputType := options.GetString(computeTask.Task.Params, "output.type")
if outputType == graphio.LoadTypeHugegraph {
valueType, ok := graph.Data.VertexProperty.GetValueType("label")
if !ok {
logrus.Errorf("Hugegraph vertex property label not exist")
cb.SetStatusError(computeTask.Task.ID, "Hugegraph vertex property label not exist")
return fmt.Errorf("hugegraph vertex property label not exist")
}
if valueType != structure.ValueTypeString {
logrus.Errorf("Hugegraph vertex property label not string, get type: %v", valueType)
cb.SetStatusError(computeTask.Task.ID, "Hugegraph vertex property label not string")
return fmt.Errorf("hugegraph vertex property label not string")
}
pdAddr, err := common.FindValidPD(options.GetSliceString(computeTask.Task.Params, "output.hg_pd_peers"))
if err != nil {
logrus.Errorf("find valid pd error:%v", err)
cb.SetStatusError(computeTask.Task.ID, fmt.Sprintf("find valid pd error:%v", err))
return fmt.Errorf("find valid pd error:%w", err)
}
serverAddr, err := common.FindServerAddr(pdAddr,
options.GetString(computeTask.Task.Params, "output.hugegraph_name"),
options.GetString(computeTask.Task.Params, "output.hugegraph_username"),
options.GetString(computeTask.Task.Params, "output.hugegraph_password"))
if err != nil {
logrus.Errorf("find server address error:%v", err)
cb.SetStatusError(computeTask.Task.ID, fmt.Sprintf("find server address error:%v", err))
return fmt.Errorf("find server address error:%w", err)
}
computeTask.Task.Params["output.hugegraph_server"] = serverAddr
}
if options.GetInt(computeTask.Task.Params, "output.need_statistics") == 1 {
cb.statistics(computeTask, graph)
}
var uploadVertexValues [][]*pb.VertexValue
needQuery := options.GetInt(computeTask.Task.Params, "output.need_query") == 1
if needQuery {
uploadVertexValues = make([][]*pb.VertexValue, parallel)
for i := range uploadVertexValues {
uploadVertexValues[i] = make([]*pb.VertexValue, 0, graph.VertexCount/int64(parallel))
}
}
useOutputFilter := false
outputExprStr := options.GetString(computeTask.Task.Params, "output.filter_expr")
filteroutputProps := options.GetSliceString(computeTask.Task.Params, "output.filter_properties")
vertexFilters := make([]*compute.VertexFilter, parallel)
if outputExprStr != "" && len(filteroutputProps) > 0 {
useOutputFilter = true
for i := range vertexFilters {
vertexFilters[i] = &compute.VertexFilter{}
err := vertexFilters[i].Init(outputExprStr, filteroutputProps, graph.Data.VertexProperty)
if err != nil {
logrus.Errorf("output filter init error:%v", err)
useOutputFilter = false
}
}
}
part := int(worker.VertexCount)/parallel + 1
cId := int(worker.VertIdStart)
wg := sync.WaitGroup{}
for i := 0; i < parallel; i++ {
wg.Add(1)
go func(partId int, bId int, eId int) {
defer func() {
if r := recover(); r != nil {
cb.SetStatusError(computeTask.Task.ID, fmt.Sprintf("RunOutput panic recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("RunOutput panic recover taskID:%v, pID:%v, panic:%v, stack message: %s",
computeTask.Task.ID, partId, r, common.GetCurrentGoroutineStack())
}
}()
defer wg.Done()
if eId > int(worker.VertIdStart+worker.VertexCount) {
eId = int(worker.VertIdStart + worker.VertexCount)
}
writer := graphio.MakeWriter(outputType)
writerInitInfo := graphio.WriterInitInfo{
Params: computeTask.Task.Params,
PartID: wIdx*parallel + partId,
MaxID: len(graph.Workers) * parallel,
Mode: graphio.WriteModeVertexValue,
}
if outputType == graphio.LoadTypeHugegraph {
writerInitInfo.OutputType = computeTask.ComputeWorker.OutputValueType()
writerInitInfo.HgVertexSchema = graph.Data.VertexPropertySchema
}
err := writer.Init(writerInitInfo)
if err != nil {
logrus.Errorf("writer init error:%v", err)
cb.SetStatusError(computeTask.Task.ID, fmt.Sprintf("writer init error:%v", err))
return
}
defer writer.Close()
for vId := bId; vId < eId; vId++ {
if useOutputFilter && !vertexFilters[partId].Filter(uint32(vId)) {
continue
}
writeVertexInfo := graphio.WriteVertexValue{
VertexID: graph.Data.Vertex.GetVertex(uint32(vId)).ID,
Value: computeTask.ComputeWorker.VertexValue(uint32(vId)),
}
if needQuery {
uploadVertexValues[partId] = append(uploadVertexValues[partId], &pb.VertexValue{
ID: writeVertexInfo.VertexID,
Value: writeVertexInfo.Value.ToString(),
})
}
if outputType == graphio.LoadTypeHugegraph {
writeVertexInfo.HgLabel = string(graph.Data.VertexProperty.GetStringValue("label", uint32(vId)))
}
writer.WriteVertex(writeVertexInfo)
}
}(i, cId+i*part, cId+((i+1)*part))
}
wg.Wait()
cb.dealQuery(needQuery, uploadVertexValues, computeTask.Task.ID)
return nil
}