app/metrics/processor/processor.go (71 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package processor handles the received event and generates metrics package processor import ( "context" "google/jss/pubsub-integration/metrics" "google/jss/pubsub-integration/metrics/config" "google/jss/pubsub-integration/pubsub" "log" "math/rand" "time" ) // Creates a Cloud Pub/Sub subscription to receive event and then generate metrics func Start(ctx context.Context, factory metrics.Factory) error { client, err := pubsub.Service.NewClient(ctx, nil) if err != nil { return err } defer client.Close() // nolint: errcheck // The subscription to receive event sub := client.NewSubscription(config.Config.EventSubscription, config.Config.EventCodec, config.Config.SubscriberNumGoroutines, config.Config.SubscriberMaxOutstanding) // The topic to publish the metrics converted from received event metricsTopic := client.NewTopic(config.Config.MetricsTopic, config.Config.MetricsCodec, config.Config.PublisherBatchSize, config.Config.PublisherNumGoroutines, 0) defer metricsTopic.Stop() // The handler to handles the received event, generate and publish metrics to the metrics topic handler := eventHandler(metricsTopic, factory) // Start to handle received event using given handler. // It does not return until the context is done for { if err := sub.Receive(ctx, handler); err != nil { log.Printf("sub.Receive: %v", err) } select { case <-ctx.Done(): log.Printf("context done, subscriber stopped") return nil default: waitTime := 30 * time.Second log.Printf("waiting %v for retry", waitTime) time.Sleep(waitTime) } } } // eventHandler creates the event message handler for subscriber to handle the received event // The handler receives event message and generates metrics using the given metrics factory // It acks the message and publishes the metrics to the metrics topic if it generates metrics successfully or nacks if it does not func eventHandler(metricsTopic pubsub.Topic, factory metrics.Factory) pubsub.MessageHandler { // factory: the metrics factory to generate metrics from the received event return func(ctx context.Context, message *pubsub.Message) { log.Printf("processing event ID: %v, data: %v", message.ID, message.Data) processingTime := ProcessingTime() time.Sleep(processingTime) // Simulate processing time ackTime := time.Now() metrics, err := factory(message.Data, message.PublishTime, ackTime, processingTime) if err != nil { log.Printf("nack the event ID: %v, error: %v", message.ID, err) message.Nack() return } log.Printf("event ID: %v converted to metrics: %v", message.ID, metrics) id, err := metricsTopic.Publish(ctx, metrics) if err != nil { log.Println(err) } else { log.Printf("event ID: %v is processed and published to metiric topic as message ID: %v", message.ID, id) } log.Printf("ack the event ID: %v", message.ID) message.Ack() } } var random = rand.New(rand.NewSource(time.Now().UnixNano())) const proesssTimeMin = 0.1 const processTimeMax = 0.3 const processTimeMean = (proesssTimeMin + processTimeMax) / 2 const processTimeStdDev = (processTimeMax - processTimeMean) / 3.29 // 3.29 is the z-score for 99.9% confidence interval // ProcessingTime returns a normal distributed random processing time to simulate the time used to process an event // It is between 0.1 and 0.5 seconds and 99.9% of the time between 0.1 and 0.3 seconds func ProcessingTime() time.Duration { for { seconds := random.NormFloat64()*processTimeStdDev + processTimeMean if seconds >= proesssTimeMin && seconds <= 5.0 { return time.Duration(seconds * float64(time.Second)) } } }