in source/plugins/go/src/oms.go [1080:1470]
func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
start := time.Now()
var dataItemsLAv1 []DataItemLAv1
var dataItemsLAv2 []DataItemLAv2
var dataItemsADX []DataItemADX
var msgPackEntries []MsgPackEntry
var stringMap map[string]string
var elapsed time.Duration
var maxLatency float64
var maxLatencyContainer string
imageIDMap := make(map[string]string)
nameIDMap := make(map[string]string)
DataUpdateMutex.Lock()
for k, v := range ImageIDMap {
imageIDMap[k] = v
}
for k, v := range NameIDMap {
nameIDMap[k] = v
}
DataUpdateMutex.Unlock()
for _, record := range tailPluginRecords {
containerID, k8sNamespace, k8sPodName, containerName := GetContainerIDK8sNamespacePodNameFromFileName(ToString(record["filepath"]))
logEntrySource := ToString(record["stream"])
if strings.EqualFold(logEntrySource, "stdout") {
if containerID == "" || containsKey(StdoutIgnoreNsSet, k8sNamespace) {
continue
}
} else if strings.EqualFold(logEntrySource, "stderr") {
if containerID == "" || containsKey(StderrIgnoreNsSet, k8sNamespace) {
continue
}
}
stringMap = make(map[string]string)
//below id & name are used by latency telemetry in both v1 & v2 LA schemas
id := ""
name := ""
logEntry := ToString(record["log"])
logEntryTimeStamp := ToString(record["time"])
//ADX Schema & LAv2 schema are almost the same (except resourceId)
if ContainerLogSchemaV2 == true || ContainerLogsRouteADX == true {
stringMap["Computer"] = Computer
stringMap["ContainerId"] = containerID
stringMap["ContainerName"] = containerName
stringMap["PodName"] = k8sPodName
stringMap["PodNamespace"] = k8sNamespace
stringMap["LogMessage"] = logEntry
stringMap["LogSource"] = logEntrySource
stringMap["TimeGenerated"] = logEntryTimeStamp
} else {
stringMap["LogEntry"] = logEntry
stringMap["LogEntrySource"] = logEntrySource
stringMap["LogEntryTimeStamp"] = logEntryTimeStamp
stringMap["SourceSystem"] = "Containers"
stringMap["Id"] = containerID
if val, ok := imageIDMap[containerID]; ok {
stringMap["Image"] = val
}
if val, ok := nameIDMap[containerID]; ok {
stringMap["Name"] = val
}
stringMap["TimeOfCommand"] = start.Format(time.RFC3339)
stringMap["Computer"] = Computer
}
var dataItemLAv1 DataItemLAv1
var dataItemLAv2 DataItemLAv2
var dataItemADX DataItemADX
var msgPackEntry MsgPackEntry
FlushedRecordsSize += float64(len(stringMap["LogEntry"]))
if ContainerLogsRouteV2 == true {
msgPackEntry = MsgPackEntry{
// this below time is what mdsd uses in its buffer/expiry calculations. better to be as close to flushtime as possible, so its filled just before flushing for each entry
//Time: start.Unix(),
//Time: time.Now().Unix(),
Record: stringMap,
}
msgPackEntries = append(msgPackEntries, msgPackEntry)
} else if ContainerLogsRouteADX == true {
if ResourceCentric == true {
stringMap["AzureResourceId"] = ResourceID
} else {
stringMap["AzureResourceId"] = ""
}
dataItemADX = DataItemADX{
TimeGenerated: stringMap["TimeGenerated"],
Computer: stringMap["Computer"],
ContainerId: stringMap["ContainerId"],
ContainerName: stringMap["ContainerName"],
PodName: stringMap["PodName"],
PodNamespace: stringMap["PodNamespace"],
LogMessage: stringMap["LogMessage"],
LogSource: stringMap["LogSource"],
AzureResourceId: stringMap["AzureResourceId"],
}
//ADX
dataItemsADX = append(dataItemsADX, dataItemADX)
} else {
if ContainerLogSchemaV2 == true {
dataItemLAv2 = DataItemLAv2{
TimeGenerated: stringMap["TimeGenerated"],
Computer: stringMap["Computer"],
ContainerId: stringMap["ContainerId"],
ContainerName: stringMap["ContainerName"],
PodName: stringMap["PodName"],
PodNamespace: stringMap["PodNamespace"],
LogMessage: stringMap["LogMessage"],
LogSource: stringMap["LogSource"],
}
//ODS-v2 schema
dataItemsLAv2 = append(dataItemsLAv2, dataItemLAv2)
name = stringMap["ContainerName"]
id = stringMap["ContainerId"]
} else {
dataItemLAv1 = DataItemLAv1{
ID: stringMap["Id"],
LogEntry: stringMap["LogEntry"],
LogEntrySource: stringMap["LogEntrySource"],
LogEntryTimeStamp: stringMap["LogEntryTimeStamp"],
LogEntryTimeOfCommand: stringMap["TimeOfCommand"],
SourceSystem: stringMap["SourceSystem"],
Computer: stringMap["Computer"],
Image: stringMap["Image"],
Name: stringMap["Name"],
}
//ODS-v1 schema
dataItemsLAv1 = append(dataItemsLAv1, dataItemLAv1)
name = stringMap["Name"]
id = stringMap["Id"]
}
}
if logEntryTimeStamp != "" {
loggedTime, e := time.Parse(time.RFC3339, logEntryTimeStamp)
if e != nil {
message := fmt.Sprintf("Error while converting logEntryTimeStamp for telemetry purposes: %s", e.Error())
Log(message)
SendException(message)
} else {
ltncy := float64(start.Sub(loggedTime) / time.Millisecond)
if ltncy >= maxLatency {
maxLatency = ltncy
maxLatencyContainer = name + "=" + id
}
}
} else {
ContainerLogTelemetryMutex.Lock()
ContainerLogRecordCountWithEmptyTimeStamp += 1
ContainerLogTelemetryMutex.Unlock()
}
}
numContainerLogRecords := 0
if len(msgPackEntries) > 0 && ContainerLogsRouteV2 == true {
//flush to mdsd
if IsAADMSIAuthMode == true && strings.HasPrefix(MdsdContainerLogTagName, MdsdOutputStreamIdTagPrefix) == false {
Log("Info::mdsd::obtaining output stream id")
if ContainerLogSchemaV2 == true {
MdsdContainerLogTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(ContainerLogV2DataType)
} else {
MdsdContainerLogTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(ContainerLogDataType)
}
Log("Info::mdsd:: using mdsdsource name: %s", MdsdContainerLogTagName)
}
fluentForward := MsgPackForward{
Tag: MdsdContainerLogTagName,
Entries: msgPackEntries,
}
//determine the size of msgp message
msgpSize := 1 + msgp.StringPrefixSize + len(fluentForward.Tag) + msgp.ArrayHeaderSize
for i := range fluentForward.Entries {
msgpSize += 1 + msgp.Int64Size + msgp.GuessSize(fluentForward.Entries[i].Record)
}
//allocate buffer for msgp message
var msgpBytes []byte
msgpBytes = msgp.Require(nil, msgpSize)
//construct the stream
msgpBytes = append(msgpBytes, 0x92)
msgpBytes = msgp.AppendString(msgpBytes, fluentForward.Tag)
msgpBytes = msgp.AppendArrayHeader(msgpBytes, uint32(len(fluentForward.Entries)))
batchTime := time.Now().Unix()
for entry := range fluentForward.Entries {
msgpBytes = append(msgpBytes, 0x92)
msgpBytes = msgp.AppendInt64(msgpBytes, batchTime)
msgpBytes = msgp.AppendMapStrStr(msgpBytes, fluentForward.Entries[entry].Record)
}
if MdsdMsgpUnixSocketClient == nil {
Log("Error::mdsd::mdsd connection does not exist. re-connecting ...")
CreateMDSDClient(ContainerLogV2, ContainerType)
if MdsdMsgpUnixSocketClient == nil {
Log("Error::mdsd::Unable to create mdsd client. Please check error log.")
ContainerLogTelemetryMutex.Lock()
defer ContainerLogTelemetryMutex.Unlock()
ContainerLogsMDSDClientCreateErrors += 1
return output.FLB_RETRY
}
}
deadline := 10 * time.Second
MdsdMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse
bts, er := MdsdMsgpUnixSocketClient.Write(msgpBytes)
elapsed = time.Since(start)
if er != nil {
Log("Error::mdsd::Failed to write to mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error())
if MdsdMsgpUnixSocketClient != nil {
MdsdMsgpUnixSocketClient.Close()
MdsdMsgpUnixSocketClient = nil
}
ContainerLogTelemetryMutex.Lock()
defer ContainerLogTelemetryMutex.Unlock()
ContainerLogsSendErrorsToMDSDFromFluent += 1
return output.FLB_RETRY
} else {
numContainerLogRecords = len(msgPackEntries)
Log("Success::mdsd::Successfully flushed %d container log records that was %d bytes to mdsd in %s ", numContainerLogRecords, bts, elapsed)
}
} else if ContainerLogsRouteADX == true && len(dataItemsADX) > 0 {
// Route to ADX
r, w := io.Pipe()
defer r.Close()
enc := json.NewEncoder(w)
go func() {
defer w.Close()
for _, data := range dataItemsADX {
if encError := enc.Encode(data); encError != nil {
message := fmt.Sprintf("Error::ADX Encoding data for ADX %s", encError)
Log(message)
//SendException(message) //use for testing/debugging only as this can generate a lot of exceptions
//continue and move on, so one poisoned message does not impact the whole batch
}
}
}()
if ADXIngestor == nil {
Log("Error::ADX::ADXIngestor does not exist. re-creating ...")
CreateADXClient()
if ADXIngestor == nil {
Log("Error::ADX::Unable to create ADX client. Please check error log.")
ContainerLogTelemetryMutex.Lock()
defer ContainerLogTelemetryMutex.Unlock()
ContainerLogsADXClientCreateErrors += 1
return output.FLB_RETRY
}
}
// Setup a maximum time for completion to be 30 Seconds.
ctx, cancel := context.WithTimeout(ParentContext, 30*time.Second)
defer cancel()
//ADXFlushMutex.Lock()
//defer ADXFlushMutex.Unlock()
//MultiJSON support is not there yet
if _, ingestionErr := ADXIngestor.FromReader(ctx, r, ingest.IngestionMappingRef("ContainerLogV2Mapping", ingest.JSON), ingest.FileFormat(ingest.JSON)); ingestionErr != nil {
Log("Error when streaming to ADX Ingestion: %s", ingestionErr.Error())
//ADXIngestor = nil //not required as per ADX team. Will keep it to indicate that we tried this approach
ContainerLogTelemetryMutex.Lock()
defer ContainerLogTelemetryMutex.Unlock()
ContainerLogsSendErrorsToADXFromFluent += 1
return output.FLB_RETRY
}
elapsed = time.Since(start)
numContainerLogRecords = len(dataItemsADX)
Log("Success::ADX::Successfully wrote %d container log records to ADX in %s", numContainerLogRecords, elapsed)
} else if (ContainerLogSchemaV2 == true && len(dataItemsLAv2) > 0) || len(dataItemsLAv1) > 0 { //ODS
var logEntry interface{}
recordType := ""
loglinesCount := 0
//schema v2
if len(dataItemsLAv2) > 0 && ContainerLogSchemaV2 == true {
logEntry = ContainerLogBlobLAv2{
DataType: ContainerLogV2DataType,
IPName: IPName,
DataItems: dataItemsLAv2}
loglinesCount = len(dataItemsLAv2)
recordType = "ContainerLogV2"
} else {
//schema v1
if len(dataItemsLAv1) > 0 {
logEntry = ContainerLogBlobLAv1{
DataType: ContainerLogDataType,
IPName: IPName,
DataItems: dataItemsLAv1}
loglinesCount = len(dataItemsLAv1)
recordType = "ContainerLog"
}
}
marshalled, err := json.Marshal(logEntry)
//Log("LogEntry::e %s", marshalled)
if err != nil {
message := fmt.Sprintf("Error while Marshalling log Entry: %s", err.Error())
Log(message)
SendException(message)
return output.FLB_OK
}
req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", userAgent)
reqId := uuid.New().String()
req.Header.Set("X-Request-ID", reqId)
//expensive to do string len for every request, so use a flag
if ResourceCentric == true {
req.Header.Set("x-ms-AzureResourceId", ResourceID)
}
if IsAADMSIAuthMode == true {
IngestionAuthTokenUpdateMutex.Lock()
ingestionAuthToken := ODSIngestionAuthToken
IngestionAuthTokenUpdateMutex.Unlock()
if ingestionAuthToken == "" {
Log("Error::ODS Ingestion Auth Token is empty. Please check error log.")
return output.FLB_RETRY
}
// add authorization header to the req
req.Header.Set("Authorization", "Bearer "+ingestionAuthToken)
}
resp, err := HTTPClient.Do(req)
elapsed = time.Since(start)
if err != nil {
message := fmt.Sprintf("Error when sending request %s \n", err.Error())
Log(message)
// Commenting this out for now. TODO - Add better telemetry for ods errors using aggregation
//SendException(message)
Log("Failed to flush %d records after %s", loglinesCount, elapsed)
return output.FLB_RETRY
}
if resp == nil || resp.StatusCode != 200 {
if resp != nil {
Log("RequestId %s Status %s Status Code %d", reqId, resp.Status, resp.StatusCode)
}
return output.FLB_RETRY
}
defer resp.Body.Close()
numContainerLogRecords = loglinesCount
Log("PostDataHelper::Info::Successfully flushed %d %s records to ODS in %s", numContainerLogRecords, recordType, elapsed)
}
ContainerLogTelemetryMutex.Lock()
defer ContainerLogTelemetryMutex.Unlock()
if numContainerLogRecords > 0 {
FlushedRecordsCount += float64(numContainerLogRecords)
FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond)
if maxLatency >= AgentLogProcessingMaxLatencyMs {
AgentLogProcessingMaxLatencyMs = maxLatency
AgentLogProcessingMaxLatencyMsContainer = maxLatencyContainer
}
}
return output.FLB_OK
}