func()

in app/pubsub-integration/pubsub/pubsub.go [149:171]


func (t *pubsubTopic) Publish(ctx context.Context, data map[string]interface{}) (string, error) {
	// data: the message data to be published should comply with the avro schema of the topic

	// Encode message data by the avro schema of the topic
	json, err := avro.EncodeToJSON(t.codec, data)
	if err != nil {
		return "", fmt.Errorf("ignore invalid message: %v", data)
	}
	msg := &pubsub.Message{
		Data: json,
	}
	now := time.Now()
	// Publish the encoded message to the topic
	result := t.topic.Publish(ctx, msg)
	// Wait and get the result of publishing
	id, err := result.Get(ctx)
	elapsed := time.Since(now)
	log.Printf("publish message id: %v, elapsed: %v", id, elapsed)
	if err != nil {
		return id, fmt.Errorf("fail to publish message: %v to topic: %v, err: %w", json, t.topic, err)
	}
	return id, nil
}