in internal/queue/eventmesh_queue.go [132:150]
func (q *eventMeshQueue) handler(message *sdk_pb.SimpleMessage) interface{} {
metrics.Dec(constants.MetricsTaskQueue, fmt.Sprintf("%s_%s", q.Name(), constants.MetricsQueueSize))
workflowTask, err := q.toWorkflowTask(message)
if err != nil {
return err
}
log.Get(constants.LogQueue).Infof("receive task from EventMesh queue, task=%s", gconv.String(workflowTask))
if workflowTask.ID != 0 {
if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), workflowTask); err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue observe UpdateTaskInstance error=%v", err)
}
return err
}
// new task
if err := q.workflowDAL.InsertTaskInstance(context.Background(), workflowTask); err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue observe InsertTaskInstance error=%v", err)
}
return nil
}