in source/plugins/go/src/oms.go [881:1068]
func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int {
var laMetrics []*laTelegrafMetric
if (telegrafRecords == nil) || !(len(telegrafRecords) > 0) {
Log("PostTelegrafMetricsToLA::Error:no timeseries to derive")
return output.FLB_OK
}
for _, record := range telegrafRecords {
translatedMetrics, err := translateTelegrafMetrics(record)
if err != nil {
message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:when translating telegraf metric to log analytics metric %q", err)
Log(message)
//SendException(message) //This will be too noisy
}
laMetrics = append(laMetrics, translatedMetrics...)
}
if (laMetrics == nil) || !(len(laMetrics) > 0) {
Log("PostTelegrafMetricsToLA::Info:no metrics derived from timeseries data")
return output.FLB_OK
} else {
message := fmt.Sprintf("PostTelegrafMetricsToLA::Info:derived %v metrics from %v timeseries", len(laMetrics), len(telegrafRecords))
Log(message)
}
if IsWindows == false { //for linux, mdsd route
var msgPackEntries []MsgPackEntry
var i int
start := time.Now()
var elapsed time.Duration
for i = 0; i < len(laMetrics); i++ {
var interfaceMap map[string]interface{}
stringMap := make(map[string]string)
jsonBytes, err := json.Marshal(*laMetrics[i])
if err != nil {
message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:when marshalling json %q", err)
Log(message)
SendException(message)
return output.FLB_OK
} else {
if err := json.Unmarshal(jsonBytes, &interfaceMap); err != nil {
message := fmt.Sprintf("Error while UnMarshalling json bytes to interfaceMap: %s", err.Error())
Log(message)
SendException(message)
return output.FLB_OK
} else {
for key, value := range interfaceMap {
strKey := fmt.Sprintf("%v", key)
strValue := fmt.Sprintf("%v", value)
stringMap[strKey] = strValue
}
msgPackEntry := MsgPackEntry{
Record: stringMap,
}
msgPackEntries = append(msgPackEntries, msgPackEntry)
}
}
}
if len(msgPackEntries) > 0 {
if IsAADMSIAuthMode == true && (strings.HasPrefix(MdsdInsightsMetricsTagName, MdsdOutputStreamIdTagPrefix) == false) {
Log("Info::mdsd::obtaining output stream id for InsightsMetricsDataType since Log Analytics AAD MSI Auth Enabled")
MdsdInsightsMetricsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(InsightsMetricsDataType)
}
msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdInsightsMetricsTagName, msgPackEntries)
if MdsdInsightsMetricsMsgpUnixSocketClient == nil {
Log("Error::mdsd::mdsd connection does not exist. re-connecting ...")
CreateMDSDClient(InsightsMetrics, ContainerType)
if MdsdInsightsMetricsMsgpUnixSocketClient == nil {
Log("Error::mdsd::Unable to create mdsd client for insights metrics. Please check error log.")
ContainerLogTelemetryMutex.Lock()
defer ContainerLogTelemetryMutex.Unlock()
InsightsMetricsMDSDClientCreateErrors += 1
return output.FLB_RETRY
}
}
deadline := 10 * time.Second
MdsdInsightsMetricsMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse
bts, er := MdsdInsightsMetricsMsgpUnixSocketClient.Write(msgpBytes)
elapsed = time.Since(start)
if er != nil {
Log("Error::mdsd::Failed to write to mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error())
UpdateNumTelegrafMetricsSentTelemetry(0, 1, 0, 0)
if MdsdInsightsMetricsMsgpUnixSocketClient != nil {
MdsdInsightsMetricsMsgpUnixSocketClient.Close()
MdsdInsightsMetricsMsgpUnixSocketClient = nil
}
ContainerLogTelemetryMutex.Lock()
defer ContainerLogTelemetryMutex.Unlock()
InsightsMetricsMDSDClientCreateErrors += 1
return output.FLB_RETRY
} else {
numTelegrafMetricsRecords := len(msgPackEntries)
UpdateNumTelegrafMetricsSentTelemetry(numTelegrafMetricsRecords, 0, 0, 0)
Log("Success::mdsd::Successfully flushed %d telegraf metrics records that was %d bytes to mdsd in %s ", numTelegrafMetricsRecords, bts, elapsed)
}
}
} else { // for windows, ODS direct
var metrics []laTelegrafMetric
var i int
numWinMetricsWithTagsSize64KBorMore := 0
for i = 0; i < len(laMetrics); i++ {
metrics = append(metrics, *laMetrics[i])
if len(*&laMetrics[i].Tags) >= (64 * 1024) {
numWinMetricsWithTagsSize64KBorMore += 1
}
}
laTelegrafMetrics := InsightsMetricsBlob{
DataType: InsightsMetricsDataType,
IPName: IPName,
DataItems: metrics}
jsonBytes, err := json.Marshal(laTelegrafMetrics)
if err != nil {
message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:when marshalling json %q", err)
Log(message)
SendException(message)
return output.FLB_OK
}
//Post metrics data to LA
req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(jsonBytes))
//req.URL.Query().Add("api-version","2016-04-01")
//set headers
req.Header.Set("x-ms-date", time.Now().Format(time.RFC3339))
req.Header.Set("User-Agent", userAgent)
reqID := uuid.New().String()
req.Header.Set("X-Request-ID", reqID)
//expensive to do string len for every request, so use a flag
if ResourceCentric == true {
req.Header.Set("x-ms-AzureResourceId", ResourceID)
}
if IsAADMSIAuthMode == true {
IngestionAuthTokenUpdateMutex.Lock()
ingestionAuthToken := ODSIngestionAuthToken
IngestionAuthTokenUpdateMutex.Unlock()
if ingestionAuthToken == "" {
message := "Error::ODS Ingestion Auth Token is empty. Please check error log."
Log(message)
return output.FLB_RETRY
}
// add authorization header to the req
req.Header.Set("Authorization", "Bearer "+ingestionAuthToken)
}
start := time.Now()
resp, err := HTTPClient.Do(req)
elapsed := time.Since(start)
if err != nil {
message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:(retriable) when sending %v metrics. duration:%v err:%q \n", len(laMetrics), elapsed, err.Error())
Log(message)
UpdateNumTelegrafMetricsSentTelemetry(0, 1, 0, 0)
return output.FLB_RETRY
}
if resp == nil || resp.StatusCode != 200 {
if resp != nil {
Log("PostTelegrafMetricsToLA::Error:(retriable) RequestID %s Response Status %v Status Code %v", reqID, resp.Status, resp.StatusCode)
}
if resp != nil && resp.StatusCode == 429 {
UpdateNumTelegrafMetricsSentTelemetry(0, 1, 1, 0)
}
return output.FLB_RETRY
}
defer resp.Body.Close()
numMetrics := len(laMetrics)
UpdateNumTelegrafMetricsSentTelemetry(numMetrics, 0, 0, numWinMetricsWithTagsSize64KBorMore)
Log("PostTelegrafMetricsToLA::Info:Successfully flushed %v records in %v", numMetrics, elapsed)
}
return output.FLB_OK
}