app/eventgen/generator/generator.go (87 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package generator creates event message and publish to event topic package generator import ( "context" "errors" "google/jss/pubsub-integration/eventgen/config" "google/jss/pubsub-integration/eventgen/generator/publishers" "google/jss/pubsub-integration/pubsub" "log" "sync" "time" "github.com/linkedin/goavro/v2" ) type generator struct { client pubsub.Client topic pubsub.Topic publishers *publishers.Publishers cancel context.CancelFunc } // Initializes the Cloud Pub/Sub client and the topic for event generator func newGenerator(topicID string, codec *goavro.Codec, batchSize int, numGoroutines int, maxOutstanding int) (*generator, error) { var g generator backoff := pubsub.NewClientBackoffConfig(config.Config.PublisherRetryInit, config.Config.PublisherRetryTotal) client, err := pubsub.Service.NewClient(context.Background(), backoff) if err != nil { log.Printf("fail to connect to Cloud Pub/Sub, err: %v", err) return nil, err } g.client = client g.topic = client.NewTopic(topicID, codec, batchSize, numGoroutines, maxOutstanding) return &g, nil } // Creates the publisher group and starts to publish events func (g *generator) Run(event publishers.NewMessage, numPublishers int, timeout time.Duration) { log.Printf("run event generator with numPublishers: %v, timeout: %v", numPublishers, timeout) ctx, cancel := context.WithCancel(context.Background()) g.cancel = cancel pbrs := publishers.NewPublishers(g.topic, event, timeout) pbrs.Add(ctx, numPublishers) g.publishers = pbrs // Wait for all publishers to finish and then release the resources in another thread go func() { pbrs.WaitFinish() g.release() }() } // Stops the event generator and then release its resources func (g *generator) Stop() { if g.publishers != nil { g.publishers.Stop() // Resources will be released after all publishers have finished g.cancel() // Force stop the generator } else { g.release() } } // Stops the topic and then close the Cloud Pub/Sub client func (g *generator) release() { g.topic.Stop() if err := g.client.Close(); err != nil { log.Printf("fail to close Cloud Pub/Sub client, err: %v", err) } mux.Lock() defer mux.Unlock() if running == g { running = nil } } var mux sync.Mutex // Protects the running singleon var running *generator // This is a singleton. Only one generator can be running at a time // Generates and publishes an event to a Cloud Pub/Sub topic func Start(event publishers.NewMessage, numPublishers int, timeout time.Duration) error { mux.Lock() defer mux.Unlock() if running != nil { return errors.New("there is already an running generator") } g, err := newGenerator(config.Config.EventTopic, config.Config.EventCodec, config.Config.PublisherBatchSize, config.Config.PublisherNumGoroutines, config.Config.PublisherMaxOutstanding) if err != nil { return err } g.Run(event, numPublishers, timeout) running = g return nil } // Stops the event generation func Stop() { mux.Lock() defer mux.Unlock() if running == nil { log.Printf("there is no running generator") return } running.Stop() running = nil }