func()

in internal/queue/eventmesh_queue.go [132:150]


func (q *eventMeshQueue) handler(message *sdk_pb.SimpleMessage) interface{} {
	metrics.Dec(constants.MetricsTaskQueue, fmt.Sprintf("%s_%s", q.Name(), constants.MetricsQueueSize))
	workflowTask, err := q.toWorkflowTask(message)
	if err != nil {
		return err
	}
	log.Get(constants.LogQueue).Infof("receive task from EventMesh queue, task=%s", gconv.String(workflowTask))
	if workflowTask.ID != 0 {
		if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), workflowTask); err != nil {
			log.Get(constants.LogQueue).Errorf("EventMesh task queue observe UpdateTaskInstance error=%v", err)
		}
		return err
	}
	// new task
	if err := q.workflowDAL.InsertTaskInstance(context.Background(), workflowTask); err != nil {
		log.Get(constants.LogQueue).Errorf("EventMesh task queue observe InsertTaskInstance error=%v", err)
	}
	return nil
}