func()

in internal/queue/in_memory_queue.go [82:97]


func (q *inMemoryQueue) handle(item rxgo.Item) {
	v, ok := item.V.(*model.WorkflowTaskInstance)
	if !ok {
		return
	}
	log.Get(constants.LogQueue).Infof("handle=%s", gconv.String(v))
	if v.ID != 0 {
		if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), v); err != nil {
			log.Get(constants.LogQueue).Errorf("Observe UpdateTaskInstance error=%v", err)
		}
		return
	}
	if err := q.workflowDAL.InsertTaskInstance(context.Background(), v); err != nil {
		log.Get(constants.LogQueue).Errorf("Observe InsertTaskInstance error=%v", err)
	}
}