func()

in plugins/outputs/awscsm/providers/config_provider.go [161:267]


func (c *csmConfigProvider) interval(interval time.Duration) {
	// perform the very first publishing configuration query within at most a minute of Start
	startupInterval := interval
	if interval.Minutes() >= 1.0 {
		startupInterval = time.Duration(rand.Int63n(int64(60 * time.Second)))
	}

	t := time.NewTimer(startupInterval)

	cfg := AgentConfig{}
	queryIntervalInMinutes := int64(interval.Minutes())

	for {
		updated := false

		select {
		case <-c.done:
			return
		case <-t.C:
			log.Println("D! Output awscsm config provider ticking")
			cfg = c.agentConfigHandler.Get()
			out, err := c.client.GetPublishingConfiguration(nil)
			endpoint := ""
			apiCallTimestamp := time.Now()

			if svc, ok := c.client.(*csm.CSM); ok {
				endpoint = svc.Client.Endpoint
			}

			metametrics.MetricListener.CountSuccess(opPublishingConfigurationKey, err == nil, apiCallTimestamp, endpoint)

			if err != nil {
				log.Println("E!", err)
				break
			}

			if err := validateConfiguration(out); err != nil {
				log.Println("E!", err)
				break
			}

			queryIntervalInMinutes = *(out.QueryIntervalInMinutes)
			cfg.Endpoint = aws.StringValue(out.Endpoint)
			cfg.Status = aws.StringValue(out.Status)
			updated = true

			if cfg.SchemaVersion == *out.SchemaVersion {
				break
			}

			if cfg.IsTerminated() {
				break
			}

			cfg.SchemaVersion = aws.StringValue(out.SchemaVersion)
			schema, err := c.client.GetPublishingSchema(&csm.GetPublishingSchemaInput{
				SchemaVersion: out.SchemaVersion,
			})

			metametrics.MetricListener.CountSuccess(opPublishingSchemaKey, err == nil, apiCallTimestamp, endpoint)

			if err != nil {
				log.Println("E!", err)
				break
			}

			if err := validateSchema(schema); err != nil {
				log.Println("E!", err)
				break
			}

			cfg.Definitions.clear()
			cfg.Definitions.add(schema)

			cfg.Limits = Limits{
				MaxCompressedSampleSize:         int(aws.Int64Value(schema.ServiceLimits.CompressedEventSamplesSizeLimit)),
				MaxUncompressedSampleSize:       int(aws.Int64Value(schema.ServiceLimits.UncompressedSamplesLengthLimit)),
				MaxSEHBuckets:                   int(aws.Int64Value(schema.ServiceLimits.SehBucketLimit)),
				MaxFrequencyDistributionKeySize: int(aws.Int64Value(schema.ServiceLimits.FrequencyDistributionEntryKeySizeLimit)),
				MaxAggregationKeyValueSize:      int(aws.Int64Value(schema.ServiceLimits.SdkAggregationKeyEntryValueSizeLimit)),
				MaxFrequencyDistributionSize:    int(aws.Int64Value(schema.ServiceLimits.FrequencyMetricDistributionSizeLimit)),
				MaxRecords:                      int(aws.Int64Value(schema.ServiceLimits.SdkMonitoringRecordsLimit)),
				MaxPublishingMetricsPerCall:     int(aws.Int64Value(schema.ServiceLimits.PublishingMetricsLimit)),
			}
			log.Println("Config updated to", cfg)
		}

		if updated {
			c.agentConfigHandler.container.Store(cfg)
		}

		if cfg.IsTerminated() {
			continue
		}

		if queryIntervalInMinutes <= 1 {
			queryIntervalInMinutes = FallbackQueryInterval
		}

		timer := int64(time.Duration(queryIntervalInMinutes) * time.Minute)
		timerHalf := timer / 2
		interval = time.Duration(rand.Int63n(2 * timerHalf))
		interval += time.Duration(timerHalf)

		t.Reset(interval)
	}
}