in internal/queue/eventmesh_queue.go [91:112]
func (q *eventMeshQueue) Publish(tasks []*model.WorkflowTaskInstance) error {
if len(tasks) == 0 {
return nil
}
for _, task := range tasks {
if task == nil {
continue
}
message, err := q.toEventMeshMessage(task)
if err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
return err
}
_, err = q.grpcClient.Publish(context.Background(), message)
if err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
return err
}
metrics.Inc(constants.MetricsTaskQueue, fmt.Sprintf("%s_%s", q.Name(), constants.MetricsQueueSize))
}
return nil
}