func flushKubeMonAgentEventRecords()

in source/plugins/go/src/oms.go [582:827]


func flushKubeMonAgentEventRecords() {
	for ; true; <-KubeMonAgentConfigEventsSendTicker.C {
		if skipKubeMonEventsFlush != true {
			Log("In flushConfigErrorRecords\n")
			start := time.Now()
			var elapsed time.Duration
			var laKubeMonAgentEventsRecords []laKubeMonAgentEvents
			var msgPackEntries []MsgPackEntry
			telemetryDimensions := make(map[string]string)

			telemetryDimensions["ConfigErrorEventCount"] = strconv.Itoa(len(ConfigErrorEvent))
			telemetryDimensions["PromScrapeErrorEventCount"] = strconv.Itoa(len(PromScrapeErrorEvent))

			if (len(ConfigErrorEvent) > 0) || (len(PromScrapeErrorEvent) > 0) {
				EventHashUpdateMutex.Lock()
				Log("Locked EventHashUpdateMutex for reading hashes\n")
				for k, v := range ConfigErrorEvent {
					tagJson, err := json.Marshal(v)

					if err != nil {
						message := fmt.Sprintf("Error while Marshalling config error event tags: %s", err.Error())
						Log(message)
						SendException(message)
					} else {
						laKubeMonAgentEventsRecord := laKubeMonAgentEvents{
							Computer:       Computer,
							CollectionTime: start.Format(time.RFC3339),
							Category:       ConfigErrorEventCategory,
							Level:          KubeMonAgentEventError,
							ClusterId:      ResourceID,
							ClusterName:    ResourceName,
							Message:        k,
							Tags:           fmt.Sprintf("%s", tagJson),
						}
						laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord)
						var stringMap map[string]string
						jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord)
						if err != nil {
							message := fmt.Sprintf("Error while Marshalling laKubeMonAgentEventsRecord to json bytes: %s", err.Error())
							Log(message)
							SendException(message)
						} else {
							if err := json.Unmarshal(jsonBytes, &stringMap); err != nil {
								message := fmt.Sprintf("Error while UnMarhalling json bytes to stringmap: %s", err.Error())
								Log(message)
								SendException(message)
							} else {
								msgPackEntry := MsgPackEntry{
									Record: stringMap,
								}
								msgPackEntries = append(msgPackEntries, msgPackEntry)
							}
						}
					}
				}

				for k, v := range PromScrapeErrorEvent {
					tagJson, err := json.Marshal(v)
					if err != nil {
						message := fmt.Sprintf("Error while Marshalling prom scrape error event tags: %s", err.Error())
						Log(message)
						SendException(message)
					} else {
						laKubeMonAgentEventsRecord := laKubeMonAgentEvents{
							Computer:       Computer,
							CollectionTime: start.Format(time.RFC3339),
							Category:       PromScrapingErrorEventCategory,
							Level:          KubeMonAgentEventWarning,
							ClusterId:      ResourceID,
							ClusterName:    ResourceName,
							Message:        k,
							Tags:           fmt.Sprintf("%s", tagJson),
						}
						laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord)
						var stringMap map[string]string
						jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord)
						if err != nil {
							message := fmt.Sprintf("Error while Marshalling laKubeMonAgentEventsRecord to json bytes: %s", err.Error())
							Log(message)
							SendException(message)
						} else {
							if err := json.Unmarshal(jsonBytes, &stringMap); err != nil {
								message := fmt.Sprintf("Error while UnMarhalling json bytes to stringmap: %s", err.Error())
								Log(message)
								SendException(message)
							} else {
								msgPackEntry := MsgPackEntry{
									Record: stringMap,
								}
								msgPackEntries = append(msgPackEntries, msgPackEntry)
							}
						}
					}
				}

				//Clearing out the prometheus scrape hash so that it can be rebuilt with the errors in the next hour
				for k := range PromScrapeErrorEvent {
					delete(PromScrapeErrorEvent, k)
				}
				Log("PromScrapeErrorEvent cache cleared\n")
				EventHashUpdateMutex.Unlock()
				Log("Unlocked EventHashUpdateMutex for reading hashes\n")
			} else {
				//Sending a record in case there are no errors to be able to differentiate between no data vs no errors
				tagsValue := KubeMonAgentEventTags{}

				tagJson, err := json.Marshal(tagsValue)
				if err != nil {
					message := fmt.Sprintf("Error while Marshalling no error tags: %s", err.Error())
					Log(message)
					SendException(message)
				} else {
					laKubeMonAgentEventsRecord := laKubeMonAgentEvents{
						Computer:       Computer,
						CollectionTime: start.Format(time.RFC3339),
						Category:       NoErrorEventCategory,
						Level:          KubeMonAgentEventInfo,
						ClusterId:      ResourceID,
						ClusterName:    ResourceName,
						Message:        "No errors",
						Tags:           fmt.Sprintf("%s", tagJson),
					}
					laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord)
					var stringMap map[string]string
					jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord)
					if err != nil {
						message := fmt.Sprintf("Error while Marshalling laKubeMonAgentEventsRecord to json bytes: %s", err.Error())
						Log(message)
						SendException(message)
					} else {
						if err := json.Unmarshal(jsonBytes, &stringMap); err != nil {
							message := fmt.Sprintf("Error while UnMarshalling json bytes to stringmap: %s", err.Error())
							Log(message)
							SendException(message)
						} else {
							msgPackEntry := MsgPackEntry{
								Record: stringMap,
							}
							msgPackEntries = append(msgPackEntries, msgPackEntry)
						}
					}
				}
			}
			if IsWindows == false && len(msgPackEntries) > 0 { //for linux, mdsd route
				if IsAADMSIAuthMode == true && strings.HasPrefix(MdsdKubeMonAgentEventsTagName, MdsdOutputStreamIdTagPrefix) == false {
					Log("Info::mdsd::obtaining output stream id for data type: %s", KubeMonAgentEventDataType)
					MdsdKubeMonAgentEventsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(KubeMonAgentEventDataType)
				}
				Log("Info::mdsd:: using mdsdsource name for KubeMonAgentEvents: %s", MdsdKubeMonAgentEventsTagName)
				msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdKubeMonAgentEventsTagName, msgPackEntries)
				if MdsdKubeMonMsgpUnixSocketClient == nil {
					Log("Error::mdsd::mdsd connection for KubeMonAgentEvents does not exist. re-connecting ...")
					CreateMDSDClient(KubeMonAgentEvents, ContainerType)
					if MdsdKubeMonMsgpUnixSocketClient == nil {
						Log("Error::mdsd::Unable to create mdsd client for KubeMonAgentEvents. Please check error log.")
						ContainerLogTelemetryMutex.Lock()
						defer ContainerLogTelemetryMutex.Unlock()
						KubeMonEventsMDSDClientCreateErrors += 1
					}
				}
				if MdsdKubeMonMsgpUnixSocketClient != nil {
					deadline := 10 * time.Second
					MdsdKubeMonMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse
					bts, er := MdsdKubeMonMsgpUnixSocketClient.Write(msgpBytes)
					elapsed = time.Since(start)
					if er != nil {
						message := fmt.Sprintf("Error::mdsd::Failed to write to kubemonagent mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error())
						Log(message)
						if MdsdKubeMonMsgpUnixSocketClient != nil {
							MdsdKubeMonMsgpUnixSocketClient.Close()
							MdsdKubeMonMsgpUnixSocketClient = nil
						}
						SendException(message)
					} else {
						numRecords := len(msgPackEntries)
						Log("FlushKubeMonAgentEventRecords::Info::Successfully flushed %d records that was %d bytes in %s", numRecords, bts, elapsed)
						// Send telemetry to AppInsights resource
						SendEvent(KubeMonAgentEventsFlushedEvent, telemetryDimensions)
					}
				} else {
					Log("Error::mdsd::Unable to create mdsd client for KubeMonAgentEvents. Please check error log.")
				}
			} else if len(laKubeMonAgentEventsRecords) > 0 { //for windows, ODS direct
				kubeMonAgentEventEntry := KubeMonAgentEventBlob{
					DataType:  KubeMonAgentEventDataType,
					IPName:    IPName,
					DataItems: laKubeMonAgentEventsRecords}

				marshalled, err := json.Marshal(kubeMonAgentEventEntry)

				if err != nil {
					message := fmt.Sprintf("Error while marshalling kubemonagentevent entry: %s", err.Error())
					Log(message)
					SendException(message)
				} else {
					req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled))
					req.Header.Set("Content-Type", "application/json")
					req.Header.Set("User-Agent", userAgent)
					reqId := uuid.New().String()
					req.Header.Set("X-Request-ID", reqId)
					//expensive to do string len for every request, so use a flag
					if ResourceCentric == true {
						req.Header.Set("x-ms-AzureResourceId", ResourceID)
					}

					if IsAADMSIAuthMode == true {
						IngestionAuthTokenUpdateMutex.Lock()
						ingestionAuthToken := ODSIngestionAuthToken
						IngestionAuthTokenUpdateMutex.Unlock()
						if ingestionAuthToken == "" {
							Log("Error::ODS Ingestion Auth Token is empty. Please check error log.")
						}
						req.Header.Set("Authorization", "Bearer "+ingestionAuthToken)
					}

					resp, err := HTTPClient.Do(req)
					elapsed = time.Since(start)

					if err != nil {
						message := fmt.Sprintf("Error when sending kubemonagentevent request %s \n", err.Error())
						Log(message)
						Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed)
					} else if resp == nil || resp.StatusCode != 200 {
						if resp != nil {
							Log("flushKubeMonAgentEventRecords: RequestId %s Status %s Status Code %d", reqId, resp.Status, resp.StatusCode)
						}
						Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed)
					} else {
						numRecords := len(laKubeMonAgentEventsRecords)
						Log("FlushKubeMonAgentEventRecords::Info::Successfully flushed %d records in %s", numRecords, elapsed)

						// Send telemetry to AppInsights resource
						SendEvent(KubeMonAgentEventsFlushedEvent, telemetryDimensions)

					}
					if resp != nil && resp.Body != nil {
						defer resp.Body.Close()
					}
				}
			}
		} else {
			// Setting this to false to allow for subsequent flushes after the first hour
			skipKubeMonEventsFlush = false
		}
	}
}