func()

in internal/queue/eventmesh_queue.go [91:112]


func (q *eventMeshQueue) Publish(tasks []*model.WorkflowTaskInstance) error {
	if len(tasks) == 0 {
		return nil
	}
	for _, task := range tasks {
		if task == nil {
			continue
		}
		message, err := q.toEventMeshMessage(task)
		if err != nil {
			log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
			return err
		}
		_, err = q.grpcClient.Publish(context.Background(), message)
		if err != nil {
			log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
			return err
		}
		metrics.Inc(constants.MetricsTaskQueue, fmt.Sprintf("%s_%s", q.Name(), constants.MetricsQueueSize))
	}
	return nil
}