in sandbox/CWMetricStreamExporter/lambda/main.go [55:103]
func HandleRequest(ctx context.Context, evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) {
var response events.KinesisFirehoseResponse
var timeSeries []*prompb.TimeSeries
// These are the 4 value types from Cloudwatch, each of which map to a Prometheus Gauge
values := []Values{Count, Max, Min, Sum}
for _, record := range evnt.Records {
splitRecord := strings.Split(string(record.Data), string('\n'))
for _, x := range splitRecord {
// The Records includes an empty new line at the last position which becomes "" after parsing. Skipping over the empty string.
if x == "" {
continue
}
var metricStreamData MetricStreamData
json.Unmarshal([]byte(x), &metricStreamData)
// For each metric, the labels + valuetype is the __name__ of the sample, and the corresponding single sample value is used to create the timeseries.
for _, value := range values {
var samples []prompb.Sample
currentLabels := handleAddLabels(value, metricStreamData.MetricName, metricStreamData.Namespace, metricStreamData.Dimensions)
currentSamples := handleAddSamples(value, metricStreamData.Value, metricStreamData.Timestamp)
samples = append(samples, currentSamples)
singleTimeSeries := &prompb.TimeSeries{
Labels: currentLabels,
Samples: samples,
}
timeSeries = append(timeSeries, singleTimeSeries)
}
}
// No transformation occurs, just send OK response back to Kinesis
var transformedRecord events.KinesisFirehoseResponseRecord
transformedRecord.RecordID = record.RecordID
transformedRecord.Result = events.KinesisFirehoseTransformedStateOk
transformedRecord.Data = []byte(string(record.Data))
response.Records = append(response.Records, transformedRecord)
}
_, err := createWriteRequestAndSendToAPS(timeSeries)
if err != nil {
panic(err)
}
return response, nil
}