in source/plugins/go/src/oms.go [582:827]
func flushKubeMonAgentEventRecords() {
for ; true; <-KubeMonAgentConfigEventsSendTicker.C {
if skipKubeMonEventsFlush != true {
Log("In flushConfigErrorRecords\n")
start := time.Now()
var elapsed time.Duration
var laKubeMonAgentEventsRecords []laKubeMonAgentEvents
var msgPackEntries []MsgPackEntry
telemetryDimensions := make(map[string]string)
telemetryDimensions["ConfigErrorEventCount"] = strconv.Itoa(len(ConfigErrorEvent))
telemetryDimensions["PromScrapeErrorEventCount"] = strconv.Itoa(len(PromScrapeErrorEvent))
if (len(ConfigErrorEvent) > 0) || (len(PromScrapeErrorEvent) > 0) {
EventHashUpdateMutex.Lock()
Log("Locked EventHashUpdateMutex for reading hashes\n")
for k, v := range ConfigErrorEvent {
tagJson, err := json.Marshal(v)
if err != nil {
message := fmt.Sprintf("Error while Marshalling config error event tags: %s", err.Error())
Log(message)
SendException(message)
} else {
laKubeMonAgentEventsRecord := laKubeMonAgentEvents{
Computer: Computer,
CollectionTime: start.Format(time.RFC3339),
Category: ConfigErrorEventCategory,
Level: KubeMonAgentEventError,
ClusterId: ResourceID,
ClusterName: ResourceName,
Message: k,
Tags: fmt.Sprintf("%s", tagJson),
}
laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord)
var stringMap map[string]string
jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord)
if err != nil {
message := fmt.Sprintf("Error while Marshalling laKubeMonAgentEventsRecord to json bytes: %s", err.Error())
Log(message)
SendException(message)
} else {
if err := json.Unmarshal(jsonBytes, &stringMap); err != nil {
message := fmt.Sprintf("Error while UnMarhalling json bytes to stringmap: %s", err.Error())
Log(message)
SendException(message)
} else {
msgPackEntry := MsgPackEntry{
Record: stringMap,
}
msgPackEntries = append(msgPackEntries, msgPackEntry)
}
}
}
}
for k, v := range PromScrapeErrorEvent {
tagJson, err := json.Marshal(v)
if err != nil {
message := fmt.Sprintf("Error while Marshalling prom scrape error event tags: %s", err.Error())
Log(message)
SendException(message)
} else {
laKubeMonAgentEventsRecord := laKubeMonAgentEvents{
Computer: Computer,
CollectionTime: start.Format(time.RFC3339),
Category: PromScrapingErrorEventCategory,
Level: KubeMonAgentEventWarning,
ClusterId: ResourceID,
ClusterName: ResourceName,
Message: k,
Tags: fmt.Sprintf("%s", tagJson),
}
laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord)
var stringMap map[string]string
jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord)
if err != nil {
message := fmt.Sprintf("Error while Marshalling laKubeMonAgentEventsRecord to json bytes: %s", err.Error())
Log(message)
SendException(message)
} else {
if err := json.Unmarshal(jsonBytes, &stringMap); err != nil {
message := fmt.Sprintf("Error while UnMarhalling json bytes to stringmap: %s", err.Error())
Log(message)
SendException(message)
} else {
msgPackEntry := MsgPackEntry{
Record: stringMap,
}
msgPackEntries = append(msgPackEntries, msgPackEntry)
}
}
}
}
//Clearing out the prometheus scrape hash so that it can be rebuilt with the errors in the next hour
for k := range PromScrapeErrorEvent {
delete(PromScrapeErrorEvent, k)
}
Log("PromScrapeErrorEvent cache cleared\n")
EventHashUpdateMutex.Unlock()
Log("Unlocked EventHashUpdateMutex for reading hashes\n")
} else {
//Sending a record in case there are no errors to be able to differentiate between no data vs no errors
tagsValue := KubeMonAgentEventTags{}
tagJson, err := json.Marshal(tagsValue)
if err != nil {
message := fmt.Sprintf("Error while Marshalling no error tags: %s", err.Error())
Log(message)
SendException(message)
} else {
laKubeMonAgentEventsRecord := laKubeMonAgentEvents{
Computer: Computer,
CollectionTime: start.Format(time.RFC3339),
Category: NoErrorEventCategory,
Level: KubeMonAgentEventInfo,
ClusterId: ResourceID,
ClusterName: ResourceName,
Message: "No errors",
Tags: fmt.Sprintf("%s", tagJson),
}
laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord)
var stringMap map[string]string
jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord)
if err != nil {
message := fmt.Sprintf("Error while Marshalling laKubeMonAgentEventsRecord to json bytes: %s", err.Error())
Log(message)
SendException(message)
} else {
if err := json.Unmarshal(jsonBytes, &stringMap); err != nil {
message := fmt.Sprintf("Error while UnMarshalling json bytes to stringmap: %s", err.Error())
Log(message)
SendException(message)
} else {
msgPackEntry := MsgPackEntry{
Record: stringMap,
}
msgPackEntries = append(msgPackEntries, msgPackEntry)
}
}
}
}
if IsWindows == false && len(msgPackEntries) > 0 { //for linux, mdsd route
if IsAADMSIAuthMode == true && strings.HasPrefix(MdsdKubeMonAgentEventsTagName, MdsdOutputStreamIdTagPrefix) == false {
Log("Info::mdsd::obtaining output stream id for data type: %s", KubeMonAgentEventDataType)
MdsdKubeMonAgentEventsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(KubeMonAgentEventDataType)
}
Log("Info::mdsd:: using mdsdsource name for KubeMonAgentEvents: %s", MdsdKubeMonAgentEventsTagName)
msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdKubeMonAgentEventsTagName, msgPackEntries)
if MdsdKubeMonMsgpUnixSocketClient == nil {
Log("Error::mdsd::mdsd connection for KubeMonAgentEvents does not exist. re-connecting ...")
CreateMDSDClient(KubeMonAgentEvents, ContainerType)
if MdsdKubeMonMsgpUnixSocketClient == nil {
Log("Error::mdsd::Unable to create mdsd client for KubeMonAgentEvents. Please check error log.")
ContainerLogTelemetryMutex.Lock()
defer ContainerLogTelemetryMutex.Unlock()
KubeMonEventsMDSDClientCreateErrors += 1
}
}
if MdsdKubeMonMsgpUnixSocketClient != nil {
deadline := 10 * time.Second
MdsdKubeMonMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse
bts, er := MdsdKubeMonMsgpUnixSocketClient.Write(msgpBytes)
elapsed = time.Since(start)
if er != nil {
message := fmt.Sprintf("Error::mdsd::Failed to write to kubemonagent mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error())
Log(message)
if MdsdKubeMonMsgpUnixSocketClient != nil {
MdsdKubeMonMsgpUnixSocketClient.Close()
MdsdKubeMonMsgpUnixSocketClient = nil
}
SendException(message)
} else {
numRecords := len(msgPackEntries)
Log("FlushKubeMonAgentEventRecords::Info::Successfully flushed %d records that was %d bytes in %s", numRecords, bts, elapsed)
// Send telemetry to AppInsights resource
SendEvent(KubeMonAgentEventsFlushedEvent, telemetryDimensions)
}
} else {
Log("Error::mdsd::Unable to create mdsd client for KubeMonAgentEvents. Please check error log.")
}
} else if len(laKubeMonAgentEventsRecords) > 0 { //for windows, ODS direct
kubeMonAgentEventEntry := KubeMonAgentEventBlob{
DataType: KubeMonAgentEventDataType,
IPName: IPName,
DataItems: laKubeMonAgentEventsRecords}
marshalled, err := json.Marshal(kubeMonAgentEventEntry)
if err != nil {
message := fmt.Sprintf("Error while marshalling kubemonagentevent entry: %s", err.Error())
Log(message)
SendException(message)
} else {
req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", userAgent)
reqId := uuid.New().String()
req.Header.Set("X-Request-ID", reqId)
//expensive to do string len for every request, so use a flag
if ResourceCentric == true {
req.Header.Set("x-ms-AzureResourceId", ResourceID)
}
if IsAADMSIAuthMode == true {
IngestionAuthTokenUpdateMutex.Lock()
ingestionAuthToken := ODSIngestionAuthToken
IngestionAuthTokenUpdateMutex.Unlock()
if ingestionAuthToken == "" {
Log("Error::ODS Ingestion Auth Token is empty. Please check error log.")
}
req.Header.Set("Authorization", "Bearer "+ingestionAuthToken)
}
resp, err := HTTPClient.Do(req)
elapsed = time.Since(start)
if err != nil {
message := fmt.Sprintf("Error when sending kubemonagentevent request %s \n", err.Error())
Log(message)
Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed)
} else if resp == nil || resp.StatusCode != 200 {
if resp != nil {
Log("flushKubeMonAgentEventRecords: RequestId %s Status %s Status Code %d", reqId, resp.Status, resp.StatusCode)
}
Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed)
} else {
numRecords := len(laKubeMonAgentEventsRecords)
Log("FlushKubeMonAgentEventRecords::Info::Successfully flushed %d records in %s", numRecords, elapsed)
// Send telemetry to AppInsights resource
SendEvent(KubeMonAgentEventsFlushedEvent, telemetryDimensions)
}
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
}
}
} else {
// Setting this to false to allow for subsequent flushes after the first hour
skipKubeMonEventsFlush = false
}
}
}