app/pubsub-integration/pubsub/pubsub.go (153 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package pubsub provides API to publish and receive message
package pubsub
import (
"context"
"fmt"
"google/jss/pubsub-integration/avro"
"google/jss/pubsub-integration/pubsub/config"
"log"
"time"
"cloud.google.com/go/pubsub"
vkit "cloud.google.com/go/pubsub/apiv1"
"github.com/googleapis/gax-go/v2"
"github.com/linkedin/goavro/v2"
"google.golang.org/grpc/codes"
)
type service interface {
NewClient(context.Context, *pubsub.ClientConfig) (Client, error)
}
// Service will be used to create clients for bucket handling.
var Service service = new(pubsubService)
type pubsubService struct {
}
// NewClientBackoffConfig creates the default backoff config for Cloud Pub/Sub client
func NewClientBackoffConfig(initial time.Duration, max time.Duration) *pubsub.ClientConfig {
// initial: the initial value of the retry period
// max: the maximum value of the retry period
retryer := func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Aborted,
codes.Canceled,
codes.Internal,
codes.ResourceExhausted,
codes.Unknown,
codes.Unavailable,
codes.DeadlineExceeded,
}, gax.Backoff{
Initial: initial,
Max: max,
})
}
return &pubsub.ClientConfig{
PublisherCallOptions: &vkit.PublisherCallOptions{
Publish: []gax.CallOption{gax.WithRetry(retryer)},
},
}
}
// NewClient creates the client for bucket handling. Using the default backoff config if clientCfg is nil
func (*pubsubService) NewClient(ctx context.Context, clientCfg *pubsub.ClientConfig) (Client, error) {
client, err := pubsub.NewClientWithConfig(ctx, config.Config.Project, clientCfg)
if err != nil {
return nil, err
}
return &pubsubClient{client: client}, err
}
// Client is the interface of the Cloud Pub/Sub client for Pub/Sub handling.
type Client interface {
NewTopic(string, *goavro.Codec, int, int, int) Topic
NewSubscription(string, *goavro.Codec, int, int) *Subscription
Close() error
}
type pubsubClient struct {
client *pubsub.Client
}
// NewTopic retrieves the topic for publishing message. Using the default value if batchSize, numGoroutines, maxOutstanding <= 0
func (c *pubsubClient) NewTopic(topicID string, codec *goavro.Codec, batchSize int, numGoroutines int, maxOutstanding int) Topic {
topic := c.client.Topic(topicID)
if batchSize > 0 {
topic.PublishSettings.CountThreshold = batchSize
}
if numGoroutines > 0 {
topic.PublishSettings.NumGoroutines = numGoroutines // default is 25 * GOMAXPROCS
}
topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = pubsub.FlowControlBlock
if maxOutstanding > 0 {
topic.PublishSettings.FlowControlSettings.MaxOutstandingMessages = maxOutstanding
}
return &pubsubTopic{
id: topicID,
topic: topic,
codec: codec,
}
}
// NewSubscription retrieves the subscription for receiving message. Using the default value if maxOutstanding, numGoroutines <= 0
func (c *pubsubClient) NewSubscription(ID string, codec *goavro.Codec, numGoroutines int, maxOutstanding int) *Subscription {
sub := c.client.Subscription(ID)
if numGoroutines > 0 {
sub.ReceiveSettings.NumGoroutines = numGoroutines // default is 10
}
if maxOutstanding > 0 {
sub.ReceiveSettings.MaxOutstandingMessages = maxOutstanding
}
return &Subscription{
ID: ID,
subscription: sub,
codec: codec,
}
}
// Closes the underlying client.
func (c *pubsubClient) Close() error {
log.Printf("close client: %v", c.client)
return c.client.Close()
}
// Topic is used to publish message to topic
type Topic interface {
Publish(context.Context, map[string]interface{}) (string, error)
GetID() string
Stop()
}
type pubsubTopic struct {
id string
topic *pubsub.Topic
codec *goavro.Codec
}
// Publish encodes the message data with avro schema, publishes and waits for the publish result
// Publish returns the server-generated message ID and/or error result of a Publish call.
func (t *pubsubTopic) Publish(ctx context.Context, data map[string]interface{}) (string, error) {
// data: the message data to be published should comply with the avro schema of the topic
// Encode message data by the avro schema of the topic
json, err := avro.EncodeToJSON(t.codec, data)
if err != nil {
return "", fmt.Errorf("ignore invalid message: %v", data)
}
msg := &pubsub.Message{
Data: json,
}
now := time.Now()
// Publish the encoded message to the topic
result := t.topic.Publish(ctx, msg)
// Wait and get the result of publishing
id, err := result.Get(ctx)
elapsed := time.Since(now)
log.Printf("publish message id: %v, elapsed: %v", id, elapsed)
if err != nil {
return id, fmt.Errorf("fail to publish message: %v to topic: %v, err: %w", json, t.topic, err)
}
return id, nil
}
func (t *pubsubTopic) GetID() string {
return t.id
}
func (t *pubsubTopic) Stop() {
log.Printf("stop topic: %v", t.GetID())
t.topic.Stop()
}
// Subscription is used to receive message
type Subscription struct {
ID string
subscription *pubsub.Subscription
codec *goavro.Codec
}
// MessageHandler is the function to handle the received message
type MessageHandler func(context.Context, *Message)
// Message contains the message content decoded by avro schema
type Message struct {
*pubsub.Message
Data map[string]interface{}
}
// Receive starts to receive messages.
// It receives and decodes the message by the avro schema, and then calls the given handler callback to process the message
// It blocks until ctx is done, or the service returns a non-retryable error
// The way to terminate a Receive is to cancel its context
func (sub *Subscription) Receive(ctx context.Context, handler MessageHandler) error {
// handler: the callback function to handle the received message
return sub.subscription.Receive(ctx, func(ctx context.Context, pubsubMessage *pubsub.Message) {
log.Printf("got Cloud Pub/Sub message ID: %v", pubsubMessage.ID)
data, err := avro.DecodeFromJSON(sub.codec, pubsubMessage.Data)
if err != nil {
log.Printf("failed to check schema, message: %v, ", pubsubMessage.ID)
pubsubMessage.Nack()
return
}
message := &Message{
Message: pubsubMessage,
Data: data,
}
handler(ctx, message)
})
}