func PostTelegrafMetricsToLA()

in source/plugins/go/src/oms.go [881:1068]


func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int {
	var laMetrics []*laTelegrafMetric

	if (telegrafRecords == nil) || !(len(telegrafRecords) > 0) {
		Log("PostTelegrafMetricsToLA::Error:no timeseries to derive")
		return output.FLB_OK
	}

	for _, record := range telegrafRecords {
		translatedMetrics, err := translateTelegrafMetrics(record)
		if err != nil {
			message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:when translating telegraf metric to log analytics metric %q", err)
			Log(message)
			//SendException(message) //This will be too noisy
		}
		laMetrics = append(laMetrics, translatedMetrics...)
	}

	if (laMetrics == nil) || !(len(laMetrics) > 0) {
		Log("PostTelegrafMetricsToLA::Info:no metrics derived from timeseries data")
		return output.FLB_OK
	} else {
		message := fmt.Sprintf("PostTelegrafMetricsToLA::Info:derived %v metrics from %v timeseries", len(laMetrics), len(telegrafRecords))
		Log(message)
	}

	if IsWindows == false { //for linux, mdsd route
		var msgPackEntries []MsgPackEntry
		var i int
		start := time.Now()
		var elapsed time.Duration

		for i = 0; i < len(laMetrics); i++ {
			var interfaceMap map[string]interface{}
			stringMap := make(map[string]string)
			jsonBytes, err := json.Marshal(*laMetrics[i])
			if err != nil {
				message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:when marshalling json %q", err)
				Log(message)
				SendException(message)
				return output.FLB_OK
			} else {
				if err := json.Unmarshal(jsonBytes, &interfaceMap); err != nil {
					message := fmt.Sprintf("Error while UnMarshalling json bytes to interfaceMap: %s", err.Error())
					Log(message)
					SendException(message)
					return output.FLB_OK
				} else {
					for key, value := range interfaceMap {
						strKey := fmt.Sprintf("%v", key)
						strValue := fmt.Sprintf("%v", value)
						stringMap[strKey] = strValue
					}
					msgPackEntry := MsgPackEntry{
						Record: stringMap,
					}
					msgPackEntries = append(msgPackEntries, msgPackEntry)
				}
			}
		}
		if len(msgPackEntries) > 0 {
			if IsAADMSIAuthMode == true && (strings.HasPrefix(MdsdInsightsMetricsTagName, MdsdOutputStreamIdTagPrefix) == false) {
				Log("Info::mdsd::obtaining output stream id for InsightsMetricsDataType since Log Analytics AAD MSI Auth Enabled")
				MdsdInsightsMetricsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(InsightsMetricsDataType)
			}
			msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdInsightsMetricsTagName, msgPackEntries)
			if MdsdInsightsMetricsMsgpUnixSocketClient == nil {
				Log("Error::mdsd::mdsd connection does not exist. re-connecting ...")
				CreateMDSDClient(InsightsMetrics, ContainerType)
				if MdsdInsightsMetricsMsgpUnixSocketClient == nil {
					Log("Error::mdsd::Unable to create mdsd client for insights metrics. Please check error log.")
					ContainerLogTelemetryMutex.Lock()
					defer ContainerLogTelemetryMutex.Unlock()
					InsightsMetricsMDSDClientCreateErrors += 1
					return output.FLB_RETRY
				}
			}

			deadline := 10 * time.Second
			MdsdInsightsMetricsMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse
			bts, er := MdsdInsightsMetricsMsgpUnixSocketClient.Write(msgpBytes)

			elapsed = time.Since(start)

			if er != nil {
				Log("Error::mdsd::Failed to write to mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error())
				UpdateNumTelegrafMetricsSentTelemetry(0, 1, 0, 0)
				if MdsdInsightsMetricsMsgpUnixSocketClient != nil {
					MdsdInsightsMetricsMsgpUnixSocketClient.Close()
					MdsdInsightsMetricsMsgpUnixSocketClient = nil
				}

				ContainerLogTelemetryMutex.Lock()
				defer ContainerLogTelemetryMutex.Unlock()
				InsightsMetricsMDSDClientCreateErrors += 1
				return output.FLB_RETRY
			} else {
				numTelegrafMetricsRecords := len(msgPackEntries)
				UpdateNumTelegrafMetricsSentTelemetry(numTelegrafMetricsRecords, 0, 0, 0)
				Log("Success::mdsd::Successfully flushed %d telegraf metrics records that was %d bytes to mdsd in %s ", numTelegrafMetricsRecords, bts, elapsed)
			}
		}

	} else { // for windows, ODS direct

		var metrics []laTelegrafMetric
		var i int
		numWinMetricsWithTagsSize64KBorMore := 0

		for i = 0; i < len(laMetrics); i++ {
			metrics = append(metrics, *laMetrics[i])
			if len(*&laMetrics[i].Tags) >= (64 * 1024) {
				numWinMetricsWithTagsSize64KBorMore += 1
			}
		}

		laTelegrafMetrics := InsightsMetricsBlob{
			DataType:  InsightsMetricsDataType,
			IPName:    IPName,
			DataItems: metrics}

		jsonBytes, err := json.Marshal(laTelegrafMetrics)

		if err != nil {
			message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:when marshalling json %q", err)
			Log(message)
			SendException(message)
			return output.FLB_OK
		}

		//Post metrics data to LA
		req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(jsonBytes))

		//req.URL.Query().Add("api-version","2016-04-01")

		//set headers
		req.Header.Set("x-ms-date", time.Now().Format(time.RFC3339))
		req.Header.Set("User-Agent", userAgent)
		reqID := uuid.New().String()
		req.Header.Set("X-Request-ID", reqID)

		//expensive to do string len for every request, so use a flag
		if ResourceCentric == true {
			req.Header.Set("x-ms-AzureResourceId", ResourceID)
		}
		if IsAADMSIAuthMode == true {
			IngestionAuthTokenUpdateMutex.Lock()
			ingestionAuthToken := ODSIngestionAuthToken
			IngestionAuthTokenUpdateMutex.Unlock()
			if ingestionAuthToken == "" {
				message := "Error::ODS Ingestion Auth Token is empty. Please check error log."
				Log(message)
				return output.FLB_RETRY
			}
			// add authorization header to the req
			req.Header.Set("Authorization", "Bearer "+ingestionAuthToken)
		}

		start := time.Now()
		resp, err := HTTPClient.Do(req)
		elapsed := time.Since(start)

		if err != nil {
			message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:(retriable) when sending %v metrics. duration:%v err:%q \n", len(laMetrics), elapsed, err.Error())
			Log(message)
			UpdateNumTelegrafMetricsSentTelemetry(0, 1, 0, 0)
			return output.FLB_RETRY
		}

		if resp == nil || resp.StatusCode != 200 {
			if resp != nil {
				Log("PostTelegrafMetricsToLA::Error:(retriable) RequestID %s Response Status %v Status Code %v", reqID, resp.Status, resp.StatusCode)
			}
			if resp != nil && resp.StatusCode == 429 {
				UpdateNumTelegrafMetricsSentTelemetry(0, 1, 1, 0)
			}
			return output.FLB_RETRY
		}

		defer resp.Body.Close()

		numMetrics := len(laMetrics)
		UpdateNumTelegrafMetricsSentTelemetry(numMetrics, 0, 0, numWinMetricsWithTagsSize64KBorMore)
		Log("PostTelegrafMetricsToLA::Info:Successfully flushed %v records in %v", numMetrics, elapsed)
	}

	return output.FLB_OK
}