internal/output/gcppubsub/gcppubsub.go (143 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package gcppubsub
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"
"github.com/elastic/stream/internal/output"
)
func init() {
output.Register("gcppubsub", New)
}
type Output struct {
opts *output.Options
client *pubsub.Client
cancelFunc func()
}
func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("emulator address is required")
}
os.Setenv("PUBSUB_EMULATOR_HOST", opts.Addr)
ctx, cancel := context.WithCancel(context.Background())
client, err := pubsub.NewClient(ctx, opts.GCPPubsubOptions.Project)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create client: %w", err)
}
return &Output{opts: opts, client: client, cancelFunc: cancel}, nil
}
func (o *Output) DialContext(ctx context.Context) error {
// Disable HTTP keep-alives to ensure no extra goroutines hang around.
httpClient := http.Client{Transport: &http.Transport{DisableKeepAlives: true}}
// Sanity check the emulator.
resp, err := httpClient.Get("http://" + o.opts.Addr)
if err != nil {
return err
}
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %v", resp.StatusCode)
}
if o.opts.GCPPubsubOptions.Clear {
if err := o.clear(); err != nil {
return err
}
}
if err := o.createTopic(); err != nil {
return err
}
if err := o.createSubscription(); err != nil {
return err
}
return nil
}
func (o *Output) Close() error {
o.client.Topic(o.opts.GCPPubsubOptions.Topic).Stop()
o.cancelFunc()
return nil
}
func (o *Output) Write(b []byte) (int, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
topic := o.client.Topic(o.opts.GCPPubsubOptions.Topic)
result := topic.Publish(ctx, &pubsub.Message{Data: b})
// Wait for message to publish and get assigned ID.
if _, err := result.Get(ctx); err != nil {
return 0, err
}
return len(b), nil
}
func (o *Output) clear() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Clear all topics.
topics := o.client.Topics(ctx)
for {
topic, err := topics.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return err
}
if err = topic.Delete(ctx); err != nil {
return fmt.Errorf("failed to delete topic %v: %w", topic.ID(), err)
}
}
// Clear all subscriptions.
subs := o.client.Subscriptions(ctx)
for {
sub, err := subs.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return err
}
if err = sub.Delete(ctx); err != nil {
return fmt.Errorf("failed to delete subscription %v: %w", sub.ID(), err)
}
}
return nil
}
func (o *Output) createTopic() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
topic := o.client.Topic(o.opts.GCPPubsubOptions.Topic)
exists, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("failed to check if topic exists: %w", err)
}
if !exists {
if _, err := o.client.CreateTopic(ctx, o.opts.GCPPubsubOptions.Topic); err != nil {
return fmt.Errorf("failed to create the topic: %w", err)
}
}
return nil
}
func (o *Output) createSubscription() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sub := o.client.Subscription(o.opts.GCPPubsubOptions.Subscription)
exists, err := sub.Exists(ctx)
if err != nil {
return fmt.Errorf("failed to check if sub exists: %w", err)
}
if !exists {
_, err := o.client.CreateSubscription(
ctx,
o.opts.GCPPubsubOptions.Subscription,
pubsub.SubscriptionConfig{
Topic: o.client.Topic(o.opts.GCPPubsubOptions.Topic),
},
)
if err != nil {
return fmt.Errorf("failed to create subscription: %w", err)
}
}
return nil
}