in app/pubsub-integration/pubsub/pubsub.go [149:171]
func (t *pubsubTopic) Publish(ctx context.Context, data map[string]interface{}) (string, error) {
// data: the message data to be published should comply with the avro schema of the topic
// Encode message data by the avro schema of the topic
json, err := avro.EncodeToJSON(t.codec, data)
if err != nil {
return "", fmt.Errorf("ignore invalid message: %v", data)
}
msg := &pubsub.Message{
Data: json,
}
now := time.Now()
// Publish the encoded message to the topic
result := t.topic.Publish(ctx, msg)
// Wait and get the result of publishing
id, err := result.Get(ctx)
elapsed := time.Since(now)
log.Printf("publish message id: %v, elapsed: %v", id, elapsed)
if err != nil {
return id, fmt.Errorf("fail to publish message: %v to topic: %v, err: %w", json, t.topic, err)
}
return id, nil
}