func()

in vermeer/apps/worker/compute_bl.go [485:612]


func (cb *ComputeBl) output(graph *structure.VermeerGraph, computeTask *compute.ComputerTask, parallel int) error {
	wIdx := graph.GetSelfIndex()
	worker := graph.GetSelfWorker()
	outputType := options.GetString(computeTask.Task.Params, "output.type")
	if outputType == graphio.LoadTypeHugegraph {
		valueType, ok := graph.Data.VertexProperty.GetValueType("label")
		if !ok {
			logrus.Errorf("Hugegraph vertex property label not exist")
			cb.SetStatusError(computeTask.Task.ID, "Hugegraph vertex property label not exist")
			return fmt.Errorf("hugegraph vertex property label not exist")
		}
		if valueType != structure.ValueTypeString {
			logrus.Errorf("Hugegraph vertex property label not string, get type: %v", valueType)
			cb.SetStatusError(computeTask.Task.ID, "Hugegraph vertex property label not string")
			return fmt.Errorf("hugegraph vertex property label not string")
		}

		pdAddr, err := common.FindValidPD(options.GetSliceString(computeTask.Task.Params, "output.hg_pd_peers"))
		if err != nil {
			logrus.Errorf("find valid pd error:%v", err)
			cb.SetStatusError(computeTask.Task.ID, fmt.Sprintf("find valid pd error:%v", err))
			return fmt.Errorf("find valid pd error:%w", err)
		}

		serverAddr, err := common.FindServerAddr(pdAddr,
			options.GetString(computeTask.Task.Params, "output.hugegraph_name"),
			options.GetString(computeTask.Task.Params, "output.hugegraph_username"),
			options.GetString(computeTask.Task.Params, "output.hugegraph_password"))
		if err != nil {
			logrus.Errorf("find server address error:%v", err)
			cb.SetStatusError(computeTask.Task.ID, fmt.Sprintf("find server address error:%v", err))
			return fmt.Errorf("find server address error:%w", err)
		}
		computeTask.Task.Params["output.hugegraph_server"] = serverAddr
	}

	if options.GetInt(computeTask.Task.Params, "output.need_statistics") == 1 {
		cb.statistics(computeTask, graph)
	}

	var uploadVertexValues [][]*pb.VertexValue
	needQuery := options.GetInt(computeTask.Task.Params, "output.need_query") == 1
	if needQuery {
		uploadVertexValues = make([][]*pb.VertexValue, parallel)
		for i := range uploadVertexValues {
			uploadVertexValues[i] = make([]*pb.VertexValue, 0, graph.VertexCount/int64(parallel))
		}
	}

	useOutputFilter := false
	outputExprStr := options.GetString(computeTask.Task.Params, "output.filter_expr")
	filteroutputProps := options.GetSliceString(computeTask.Task.Params, "output.filter_properties")
	vertexFilters := make([]*compute.VertexFilter, parallel)
	if outputExprStr != "" && len(filteroutputProps) > 0 {
		useOutputFilter = true
		for i := range vertexFilters {
			vertexFilters[i] = &compute.VertexFilter{}
			err := vertexFilters[i].Init(outputExprStr, filteroutputProps, graph.Data.VertexProperty)
			if err != nil {
				logrus.Errorf("output filter init error:%v", err)
				useOutputFilter = false
			}
		}
	}

	part := int(worker.VertexCount)/parallel + 1
	cId := int(worker.VertIdStart)
	wg := sync.WaitGroup{}
	for i := 0; i < parallel; i++ {
		wg.Add(1)
		go func(partId int, bId int, eId int) {
			defer func() {
				if r := recover(); r != nil {
					cb.SetStatusError(computeTask.Task.ID, fmt.Sprintf("RunOutput panic recover panic:%v, stack message: %s",
						r, common.GetCurrentGoroutineStack()))
					logrus.Errorf("RunOutput panic recover taskID:%v, pID:%v, panic:%v, stack message: %s",
						computeTask.Task.ID, partId, r, common.GetCurrentGoroutineStack())
				}
			}()
			defer wg.Done()
			if eId > int(worker.VertIdStart+worker.VertexCount) {
				eId = int(worker.VertIdStart + worker.VertexCount)
			}
			writer := graphio.MakeWriter(outputType)
			writerInitInfo := graphio.WriterInitInfo{
				Params: computeTask.Task.Params,
				PartID: wIdx*parallel + partId,
				MaxID:  len(graph.Workers) * parallel,
				Mode:   graphio.WriteModeVertexValue,
			}
			if outputType == graphio.LoadTypeHugegraph {
				writerInitInfo.OutputType = computeTask.ComputeWorker.OutputValueType()
				writerInitInfo.HgVertexSchema = graph.Data.VertexPropertySchema
			}
			err := writer.Init(writerInitInfo)

			if err != nil {
				logrus.Errorf("writer init error:%v", err)
				cb.SetStatusError(computeTask.Task.ID, fmt.Sprintf("writer init error:%v", err))
				return
			}
			defer writer.Close()
			for vId := bId; vId < eId; vId++ {
				if useOutputFilter && !vertexFilters[partId].Filter(uint32(vId)) {
					continue
				}
				writeVertexInfo := graphio.WriteVertexValue{
					VertexID: graph.Data.Vertex.GetVertex(uint32(vId)).ID,
					Value:    computeTask.ComputeWorker.VertexValue(uint32(vId)),
				}
				if needQuery {
					uploadVertexValues[partId] = append(uploadVertexValues[partId], &pb.VertexValue{
						ID:    writeVertexInfo.VertexID,
						Value: writeVertexInfo.Value.ToString(),
					})
				}
				if outputType == graphio.LoadTypeHugegraph {
					writeVertexInfo.HgLabel = string(graph.Data.VertexProperty.GetStringValue("label", uint32(vId)))
				}
				writer.WriteVertex(writeVertexInfo)
			}
		}(i, cId+i*part, cId+((i+1)*part))
	}
	wg.Wait()

	cb.dealQuery(needQuery, uploadVertexValues, computeTask.Task.ID)
	return nil
}