internal/events/pubsub/pubsub.go (58 lines of code) (raw):
// Package pubsub contains utilities for handling Google Cloud Pub/Sub events.
package pubsub
import (
"fmt"
"regexp"
"strings"
"time"
"cloud.google.com/go/functions/metadata"
"github.com/GoogleCloudPlatform/functions-framework-go/internal/fftypes"
)
const (
pubsubEventType = "google.pubsub.topic.publish"
pubsubMessageType = "type.googleapis.com/google.pubusb.v1.PubsubMessage"
pubsubService = "pubsub.googleapis.com"
)
// LegacyPushSubscriptionEvent is the event payload for legacy Cloud Pub/Sub
// push subscription triggers (https://cloud.google.com/functions/docs/calling/pubsub#legacy_cloud_pubsub_triggers).
// This matched the event payload that is sent by Pub/Sub to HTTP push
// subscription endpoints (https://cloud.google.com/pubsub/docs/push#receiving_messages).
type LegacyPushSubscriptionEvent struct {
Subscription string `json:"subscription"`
Message `json:"message"`
}
// Message represents a Pub/Sub message.
type Message struct {
// ID identifies this message.
// This ID is assigned by the server and is populated for Messages obtained from a subscription.
// This field is read-only.
ID string `json:"messageId"`
// Data is the actual data in the message.
Data []byte `json:"data"`
// Attributes represents the key-value pairs the current message
// is labelled with.
Attributes map[string]string `json:"attributes"`
// The time at which the message was published.
// This is populated by the server for Messages obtained from a subscription.
// This field is read-only.
PublishTime time.Time `json:"publishTime"`
}
// ExtractTopicFromRequestPath extracts a Pub/Sub topic from a URL request path.
func ExtractTopicFromRequestPath(path string) (string, error) {
re := regexp.MustCompile(`(projects\/[^/?]+\/topics\/[^/?]+)/*`)
matches := re.FindStringSubmatch(path)
if matches == nil {
// Prevent log injection by removing newline characters in path
replacer := strings.NewReplacer("\n", "", "\r", "")
escapedPath := replacer.Replace(path)
return "", fmt.Errorf("failed to extract Pub/Sub topic name from the URL request path: %q, configure your subscription's push endpoint to use the following path pattern: 'projects/PROJECT_NAME/topics/TOPIC_NAME'",
escapedPath)
}
// Index 0 is the entire input string matched, index 1 is the first submatch
return matches[1], nil
}
// ToBackgroundEvent converts the event to the standard BackgroundEvent format
// for Background Functions.
func (e *LegacyPushSubscriptionEvent) ToBackgroundEvent(topic string) *fftypes.BackgroundEvent {
timestamp := e.Message.PublishTime
if timestamp.IsZero() {
timestamp = time.Now()
}
return &fftypes.BackgroundEvent{
Metadata: &metadata.Metadata{
EventID: e.ID,
Timestamp: timestamp,
EventType: pubsubEventType,
Resource: &metadata.Resource{
Name: topic,
Type: pubsubMessageType,
Service: pubsubService,
},
},
Data: map[string]interface{}{
"@type": pubsubMessageType,
"data": e.Message.Data,
"attributes": e.Message.Attributes,
},
}
}