in internal/queue/in_memory_queue.go [82:97]
func (q *inMemoryQueue) handle(item rxgo.Item) {
v, ok := item.V.(*model.WorkflowTaskInstance)
if !ok {
return
}
log.Get(constants.LogQueue).Infof("handle=%s", gconv.String(v))
if v.ID != 0 {
if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), v); err != nil {
log.Get(constants.LogQueue).Errorf("Observe UpdateTaskInstance error=%v", err)
}
return
}
if err := q.workflowDAL.InsertTaskInstance(context.Background(), v); err != nil {
log.Get(constants.LogQueue).Errorf("Observe InsertTaskInstance error=%v", err)
}
}