app/eventgen/generator/publishers/publishers.go (128 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package publishers create group of publishers to publish message package publishers import ( "context" "google/jss/pubsub-integration/pubsub" "log" "strconv" "sync" "time" ) // NewMessage is the function to generate new message type NewMessage func() map[string]interface{} // Publishers is the group of publishers type Publishers struct { pubsub.Topic newMessage NewMessage publishers []*publisher timeout time.Duration sync.Locker waitFinish *sync.Cond } // NewPublishers creates the publishers group for publishing message concurrently. // The publishers that have been added will publish messages generated by newMessage function continuously until timeout. func NewPublishers(topic pubsub.Topic, newMessage NewMessage, timeout time.Duration) *Publishers { var mux sync.Mutex return &Publishers{ Topic: topic, newMessage: newMessage, timeout: timeout, Locker: &mux, waitFinish: sync.NewCond(&mux), } } // Adds or removes the publishers based on the given number. // The publishers will start publishing messages when it is added. func (pbrs *Publishers) Add(ctx context.Context, number int) { pbrs.Lock() defer pbrs.Unlock() if number < 0 { // Remove publishers newLen := len(pbrs.publishers) + number if newLen < 0 { newLen = 0 } stopPubs := pbrs.publishers[newLen:] log.Printf("stopping %v publishers", len(stopPubs)) for _, p := range stopPubs { p.Stop() } pbrs.publishers = pbrs.publishers[:newLen] } else { // Add publishers log.Printf("starting %v publishers", number) for i := 0; i < number; i++ { name := pbrs.Topic.GetID() + "-publisher-" + strconv.Itoa(len(pbrs.publishers)) pbrs.addOne(ctx, name) } } } func (pbrs *Publishers) addOne(ctx context.Context, name string) { pbrs.publishers = append(pbrs.publishers, runPublisher(ctx, name, pbrs)) } func (pbrs *Publishers) remove(pbr *publisher) { pbrs.Lock() defer pbrs.Unlock() pbrs.publishers = remove(pbrs.publishers, pbr) if len(pbrs.publishers) == 0 { pbrs.waitFinish.Broadcast() // All publisers are stopped, release all waiting routings } } func remove(slice []*publisher, element *publisher) []*publisher { for i, e := range slice { if e == element { return append(slice[:i], slice[i+1:]...) } } return slice } // Stops all publishers func (pbrs *Publishers) Stop() { pbrs.Lock() defer pbrs.Unlock() for _, pbr := range pbrs.publishers { pbr.Stop() } } // WaitFinish waits until all publishers are stopped func (pbrs *Publishers) WaitFinish() { pbrs.Lock() defer pbrs.Unlock() for len(pbrs.publishers) > 0 { pbrs.waitFinish.Wait() // Waiting until no running publishers } } type publisher struct { *Publishers name string cancel context.CancelFunc } func runPublisher(ctx context.Context, name string, publishers *Publishers) *publisher { pbr := &publisher{ Publishers: publishers, name: name, } pbr.run(ctx) return pbr } // Starts to run the publisher unitl ctx done func (pbr *publisher) run(ctx context.Context) { var pbrCtx context.Context if pbr.timeout > 0 { pbrCtx, pbr.cancel = context.WithTimeout(ctx, pbr.timeout) } else { pbrCtx, pbr.cancel = context.WithCancel(ctx) } // Create new thread to publish until pbrCtx done go func() { defer pbr.finish() log.Printf("%v: started", pbr.name) for { select { case <-pbrCtx.Done(): log.Printf("%v: context done, stopped", pbr.name) return default: msg := pbr.newMessage() if id, err := pbr.Publish(ctx, msg); err != nil { log.Printf("%v: err: %v", pbr.name, err) } else { log.Printf("%v: published message ID: %v", pbr.name, id) } } } }() } // Removes itself from publishers func (pbr *publisher) finish() { pbr.Publishers.remove(pbr) } // Stops the publisher gracefully func (pbr *publisher) Stop() { pbr.cancel() }