in plugins/outputs/awscsm/providers/config_provider.go [161:267]
func (c *csmConfigProvider) interval(interval time.Duration) {
// perform the very first publishing configuration query within at most a minute of Start
startupInterval := interval
if interval.Minutes() >= 1.0 {
startupInterval = time.Duration(rand.Int63n(int64(60 * time.Second)))
}
t := time.NewTimer(startupInterval)
cfg := AgentConfig{}
queryIntervalInMinutes := int64(interval.Minutes())
for {
updated := false
select {
case <-c.done:
return
case <-t.C:
log.Println("D! Output awscsm config provider ticking")
cfg = c.agentConfigHandler.Get()
out, err := c.client.GetPublishingConfiguration(nil)
endpoint := ""
apiCallTimestamp := time.Now()
if svc, ok := c.client.(*csm.CSM); ok {
endpoint = svc.Client.Endpoint
}
metametrics.MetricListener.CountSuccess(opPublishingConfigurationKey, err == nil, apiCallTimestamp, endpoint)
if err != nil {
log.Println("E!", err)
break
}
if err := validateConfiguration(out); err != nil {
log.Println("E!", err)
break
}
queryIntervalInMinutes = *(out.QueryIntervalInMinutes)
cfg.Endpoint = aws.StringValue(out.Endpoint)
cfg.Status = aws.StringValue(out.Status)
updated = true
if cfg.SchemaVersion == *out.SchemaVersion {
break
}
if cfg.IsTerminated() {
break
}
cfg.SchemaVersion = aws.StringValue(out.SchemaVersion)
schema, err := c.client.GetPublishingSchema(&csm.GetPublishingSchemaInput{
SchemaVersion: out.SchemaVersion,
})
metametrics.MetricListener.CountSuccess(opPublishingSchemaKey, err == nil, apiCallTimestamp, endpoint)
if err != nil {
log.Println("E!", err)
break
}
if err := validateSchema(schema); err != nil {
log.Println("E!", err)
break
}
cfg.Definitions.clear()
cfg.Definitions.add(schema)
cfg.Limits = Limits{
MaxCompressedSampleSize: int(aws.Int64Value(schema.ServiceLimits.CompressedEventSamplesSizeLimit)),
MaxUncompressedSampleSize: int(aws.Int64Value(schema.ServiceLimits.UncompressedSamplesLengthLimit)),
MaxSEHBuckets: int(aws.Int64Value(schema.ServiceLimits.SehBucketLimit)),
MaxFrequencyDistributionKeySize: int(aws.Int64Value(schema.ServiceLimits.FrequencyDistributionEntryKeySizeLimit)),
MaxAggregationKeyValueSize: int(aws.Int64Value(schema.ServiceLimits.SdkAggregationKeyEntryValueSizeLimit)),
MaxFrequencyDistributionSize: int(aws.Int64Value(schema.ServiceLimits.FrequencyMetricDistributionSizeLimit)),
MaxRecords: int(aws.Int64Value(schema.ServiceLimits.SdkMonitoringRecordsLimit)),
MaxPublishingMetricsPerCall: int(aws.Int64Value(schema.ServiceLimits.PublishingMetricsLimit)),
}
log.Println("Config updated to", cfg)
}
if updated {
c.agentConfigHandler.container.Store(cfg)
}
if cfg.IsTerminated() {
continue
}
if queryIntervalInMinutes <= 1 {
queryIntervalInMinutes = FallbackQueryInterval
}
timer := int64(time.Duration(queryIntervalInMinutes) * time.Minute)
timerHalf := timer / 2
interval = time.Duration(rand.Int63n(2 * timerHalf))
interval += time.Duration(timerHalf)
t.Reset(interval)
}
}