func HandleRequest()

in sandbox/CWMetricStreamExporter/lambda/main.go [55:103]


func HandleRequest(ctx context.Context, evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) {

	var response events.KinesisFirehoseResponse
	var timeSeries []*prompb.TimeSeries
	// These are the 4 value types from Cloudwatch, each of which map to a Prometheus Gauge
	values := []Values{Count, Max, Min, Sum}

	for _, record := range evnt.Records {

		splitRecord := strings.Split(string(record.Data), string('\n'))
		for _, x := range splitRecord {

			// The Records includes an empty new line at the last position which becomes "" after parsing. Skipping over the empty string.
			if x == "" {
				continue
			}
			var metricStreamData MetricStreamData
			json.Unmarshal([]byte(x), &metricStreamData)

			// For each metric, the labels + valuetype is the __name__ of the sample, and the corresponding single sample value is used to create the timeseries.
			for _, value := range values {
				var samples []prompb.Sample
				currentLabels := handleAddLabels(value, metricStreamData.MetricName, metricStreamData.Namespace, metricStreamData.Dimensions)
				currentSamples := handleAddSamples(value, metricStreamData.Value, metricStreamData.Timestamp)
				samples = append(samples, currentSamples)

				singleTimeSeries := &prompb.TimeSeries{
					Labels:  currentLabels,
					Samples: samples,
				}
				timeSeries = append(timeSeries, singleTimeSeries)
			}
		}

		// No transformation occurs, just send OK response back to Kinesis
		var transformedRecord events.KinesisFirehoseResponseRecord
		transformedRecord.RecordID = record.RecordID
		transformedRecord.Result = events.KinesisFirehoseTransformedStateOk
		transformedRecord.Data = []byte(string(record.Data))

		response.Records = append(response.Records, transformedRecord)
	}

	_, err := createWriteRequestAndSendToAPS(timeSeries)
	if err != nil {
		panic(err)
	}
	return response, nil
}