func PostDataHelper()

in source/plugins/go/src/oms.go [1080:1470]


func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
	start := time.Now()
	var dataItemsLAv1 []DataItemLAv1
	var dataItemsLAv2 []DataItemLAv2
	var dataItemsADX []DataItemADX

	var msgPackEntries []MsgPackEntry
	var stringMap map[string]string
	var elapsed time.Duration

	var maxLatency float64
	var maxLatencyContainer string

	imageIDMap := make(map[string]string)
	nameIDMap := make(map[string]string)

	DataUpdateMutex.Lock()

	for k, v := range ImageIDMap {
		imageIDMap[k] = v
	}
	for k, v := range NameIDMap {
		nameIDMap[k] = v
	}
	DataUpdateMutex.Unlock()

	for _, record := range tailPluginRecords {
		containerID, k8sNamespace, k8sPodName, containerName := GetContainerIDK8sNamespacePodNameFromFileName(ToString(record["filepath"]))
		logEntrySource := ToString(record["stream"])

		if strings.EqualFold(logEntrySource, "stdout") {
			if containerID == "" || containsKey(StdoutIgnoreNsSet, k8sNamespace) {
				continue
			}
		} else if strings.EqualFold(logEntrySource, "stderr") {
			if containerID == "" || containsKey(StderrIgnoreNsSet, k8sNamespace) {
				continue
			}
		}

		stringMap = make(map[string]string)
		//below id & name are used by latency telemetry in both v1 & v2 LA schemas
		id := ""
		name := ""

		logEntry := ToString(record["log"])
		logEntryTimeStamp := ToString(record["time"])
		//ADX Schema & LAv2 schema are almost the same (except resourceId)
		if ContainerLogSchemaV2 == true || ContainerLogsRouteADX == true {
			stringMap["Computer"] = Computer
			stringMap["ContainerId"] = containerID
			stringMap["ContainerName"] = containerName
			stringMap["PodName"] = k8sPodName
			stringMap["PodNamespace"] = k8sNamespace
			stringMap["LogMessage"] = logEntry
			stringMap["LogSource"] = logEntrySource
			stringMap["TimeGenerated"] = logEntryTimeStamp
		} else {
			stringMap["LogEntry"] = logEntry
			stringMap["LogEntrySource"] = logEntrySource
			stringMap["LogEntryTimeStamp"] = logEntryTimeStamp
			stringMap["SourceSystem"] = "Containers"
			stringMap["Id"] = containerID

			if val, ok := imageIDMap[containerID]; ok {
				stringMap["Image"] = val
			}

			if val, ok := nameIDMap[containerID]; ok {
				stringMap["Name"] = val
			}

			stringMap["TimeOfCommand"] = start.Format(time.RFC3339)
			stringMap["Computer"] = Computer
		}
		var dataItemLAv1 DataItemLAv1
		var dataItemLAv2 DataItemLAv2
		var dataItemADX DataItemADX
		var msgPackEntry MsgPackEntry

		FlushedRecordsSize += float64(len(stringMap["LogEntry"]))

		if ContainerLogsRouteV2 == true {
			msgPackEntry = MsgPackEntry{
				// this below time is what mdsd uses in its buffer/expiry calculations. better to be as close to flushtime as possible, so its filled just before flushing for each entry
				//Time: start.Unix(),
				//Time: time.Now().Unix(),
				Record: stringMap,
			}
			msgPackEntries = append(msgPackEntries, msgPackEntry)
		} else if ContainerLogsRouteADX == true {
			if ResourceCentric == true {
				stringMap["AzureResourceId"] = ResourceID
			} else {
				stringMap["AzureResourceId"] = ""
			}
			dataItemADX = DataItemADX{
				TimeGenerated:   stringMap["TimeGenerated"],
				Computer:        stringMap["Computer"],
				ContainerId:     stringMap["ContainerId"],
				ContainerName:   stringMap["ContainerName"],
				PodName:         stringMap["PodName"],
				PodNamespace:    stringMap["PodNamespace"],
				LogMessage:      stringMap["LogMessage"],
				LogSource:       stringMap["LogSource"],
				AzureResourceId: stringMap["AzureResourceId"],
			}
			//ADX
			dataItemsADX = append(dataItemsADX, dataItemADX)
		} else {
			if ContainerLogSchemaV2 == true {
				dataItemLAv2 = DataItemLAv2{
					TimeGenerated: stringMap["TimeGenerated"],
					Computer:      stringMap["Computer"],
					ContainerId:   stringMap["ContainerId"],
					ContainerName: stringMap["ContainerName"],
					PodName:       stringMap["PodName"],
					PodNamespace:  stringMap["PodNamespace"],
					LogMessage:    stringMap["LogMessage"],
					LogSource:     stringMap["LogSource"],
				}
				//ODS-v2 schema
				dataItemsLAv2 = append(dataItemsLAv2, dataItemLAv2)
				name = stringMap["ContainerName"]
				id = stringMap["ContainerId"]
			} else {
				dataItemLAv1 = DataItemLAv1{
					ID:                    stringMap["Id"],
					LogEntry:              stringMap["LogEntry"],
					LogEntrySource:        stringMap["LogEntrySource"],
					LogEntryTimeStamp:     stringMap["LogEntryTimeStamp"],
					LogEntryTimeOfCommand: stringMap["TimeOfCommand"],
					SourceSystem:          stringMap["SourceSystem"],
					Computer:              stringMap["Computer"],
					Image:                 stringMap["Image"],
					Name:                  stringMap["Name"],
				}
				//ODS-v1 schema
				dataItemsLAv1 = append(dataItemsLAv1, dataItemLAv1)
				name = stringMap["Name"]
				id = stringMap["Id"]
			}
		}

		if logEntryTimeStamp != "" {
			loggedTime, e := time.Parse(time.RFC3339, logEntryTimeStamp)
			if e != nil {
				message := fmt.Sprintf("Error while converting logEntryTimeStamp for telemetry purposes: %s", e.Error())
				Log(message)
				SendException(message)
			} else {
				ltncy := float64(start.Sub(loggedTime) / time.Millisecond)
				if ltncy >= maxLatency {
					maxLatency = ltncy
					maxLatencyContainer = name + "=" + id
				}
			}
		} else {
			ContainerLogTelemetryMutex.Lock()
			ContainerLogRecordCountWithEmptyTimeStamp += 1
			ContainerLogTelemetryMutex.Unlock()
		}
	}

	numContainerLogRecords := 0

	if len(msgPackEntries) > 0 && ContainerLogsRouteV2 == true {
		//flush to mdsd
		if IsAADMSIAuthMode == true && strings.HasPrefix(MdsdContainerLogTagName, MdsdOutputStreamIdTagPrefix) == false {
			Log("Info::mdsd::obtaining output stream id")
			if ContainerLogSchemaV2 == true {
				MdsdContainerLogTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(ContainerLogV2DataType)
			} else {
				MdsdContainerLogTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(ContainerLogDataType)
			}
			Log("Info::mdsd:: using mdsdsource name: %s", MdsdContainerLogTagName)
		}

		fluentForward := MsgPackForward{
			Tag:     MdsdContainerLogTagName,
			Entries: msgPackEntries,
		}

		//determine the size of msgp message
		msgpSize := 1 + msgp.StringPrefixSize + len(fluentForward.Tag) + msgp.ArrayHeaderSize
		for i := range fluentForward.Entries {
			msgpSize += 1 + msgp.Int64Size + msgp.GuessSize(fluentForward.Entries[i].Record)
		}

		//allocate buffer for msgp message
		var msgpBytes []byte
		msgpBytes = msgp.Require(nil, msgpSize)

		//construct the stream
		msgpBytes = append(msgpBytes, 0x92)
		msgpBytes = msgp.AppendString(msgpBytes, fluentForward.Tag)
		msgpBytes = msgp.AppendArrayHeader(msgpBytes, uint32(len(fluentForward.Entries)))
		batchTime := time.Now().Unix()
		for entry := range fluentForward.Entries {
			msgpBytes = append(msgpBytes, 0x92)
			msgpBytes = msgp.AppendInt64(msgpBytes, batchTime)
			msgpBytes = msgp.AppendMapStrStr(msgpBytes, fluentForward.Entries[entry].Record)
		}

		if MdsdMsgpUnixSocketClient == nil {
			Log("Error::mdsd::mdsd connection does not exist. re-connecting ...")
			CreateMDSDClient(ContainerLogV2, ContainerType)
			if MdsdMsgpUnixSocketClient == nil {
				Log("Error::mdsd::Unable to create mdsd client. Please check error log.")

				ContainerLogTelemetryMutex.Lock()
				defer ContainerLogTelemetryMutex.Unlock()
				ContainerLogsMDSDClientCreateErrors += 1

				return output.FLB_RETRY
			}
		}

		deadline := 10 * time.Second
		MdsdMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse

		bts, er := MdsdMsgpUnixSocketClient.Write(msgpBytes)

		elapsed = time.Since(start)

		if er != nil {
			Log("Error::mdsd::Failed to write to mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error())
			if MdsdMsgpUnixSocketClient != nil {
				MdsdMsgpUnixSocketClient.Close()
				MdsdMsgpUnixSocketClient = nil
			}

			ContainerLogTelemetryMutex.Lock()
			defer ContainerLogTelemetryMutex.Unlock()
			ContainerLogsSendErrorsToMDSDFromFluent += 1

			return output.FLB_RETRY
		} else {
			numContainerLogRecords = len(msgPackEntries)
			Log("Success::mdsd::Successfully flushed %d container log records that was %d bytes to mdsd in %s ", numContainerLogRecords, bts, elapsed)
		}
	} else if ContainerLogsRouteADX == true && len(dataItemsADX) > 0 {
		// Route to ADX
		r, w := io.Pipe()
		defer r.Close()
		enc := json.NewEncoder(w)
		go func() {
			defer w.Close()
			for _, data := range dataItemsADX {
				if encError := enc.Encode(data); encError != nil {
					message := fmt.Sprintf("Error::ADX Encoding data for ADX %s", encError)
					Log(message)
					//SendException(message) //use for testing/debugging only as this can generate a lot of exceptions
					//continue and move on, so one poisoned message does not impact the whole batch
				}
			}
		}()

		if ADXIngestor == nil {
			Log("Error::ADX::ADXIngestor does not exist. re-creating ...")
			CreateADXClient()
			if ADXIngestor == nil {
				Log("Error::ADX::Unable to create ADX client. Please check error log.")

				ContainerLogTelemetryMutex.Lock()
				defer ContainerLogTelemetryMutex.Unlock()
				ContainerLogsADXClientCreateErrors += 1

				return output.FLB_RETRY
			}
		}

		// Setup a maximum time for completion to be 30 Seconds.
		ctx, cancel := context.WithTimeout(ParentContext, 30*time.Second)
		defer cancel()

		//ADXFlushMutex.Lock()
		//defer ADXFlushMutex.Unlock()
		//MultiJSON support is not there yet
		if _, ingestionErr := ADXIngestor.FromReader(ctx, r, ingest.IngestionMappingRef("ContainerLogV2Mapping", ingest.JSON), ingest.FileFormat(ingest.JSON)); ingestionErr != nil {
			Log("Error when streaming to ADX Ingestion: %s", ingestionErr.Error())
			//ADXIngestor = nil  //not required as per ADX team. Will keep it to indicate that we tried this approach

			ContainerLogTelemetryMutex.Lock()
			defer ContainerLogTelemetryMutex.Unlock()
			ContainerLogsSendErrorsToADXFromFluent += 1

			return output.FLB_RETRY
		}

		elapsed = time.Since(start)
		numContainerLogRecords = len(dataItemsADX)
		Log("Success::ADX::Successfully wrote %d container log records to ADX in %s", numContainerLogRecords, elapsed)

	} else if (ContainerLogSchemaV2 == true && len(dataItemsLAv2) > 0) || len(dataItemsLAv1) > 0 { //ODS
		var logEntry interface{}
		recordType := ""
		loglinesCount := 0
		//schema v2
		if len(dataItemsLAv2) > 0 && ContainerLogSchemaV2 == true {
			logEntry = ContainerLogBlobLAv2{
				DataType:  ContainerLogV2DataType,
				IPName:    IPName,
				DataItems: dataItemsLAv2}
			loglinesCount = len(dataItemsLAv2)
			recordType = "ContainerLogV2"
		} else {
			//schema v1
			if len(dataItemsLAv1) > 0 {
				logEntry = ContainerLogBlobLAv1{
					DataType:  ContainerLogDataType,
					IPName:    IPName,
					DataItems: dataItemsLAv1}
				loglinesCount = len(dataItemsLAv1)
				recordType = "ContainerLog"
			}
		}

		marshalled, err := json.Marshal(logEntry)
		//Log("LogEntry::e %s", marshalled)
		if err != nil {
			message := fmt.Sprintf("Error while Marshalling log Entry: %s", err.Error())
			Log(message)
			SendException(message)
			return output.FLB_OK
		}

		req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled))
		req.Header.Set("Content-Type", "application/json")
		req.Header.Set("User-Agent", userAgent)
		reqId := uuid.New().String()
		req.Header.Set("X-Request-ID", reqId)
		//expensive to do string len for every request, so use a flag
		if ResourceCentric == true {
			req.Header.Set("x-ms-AzureResourceId", ResourceID)
		}

		if IsAADMSIAuthMode == true {
			IngestionAuthTokenUpdateMutex.Lock()
			ingestionAuthToken := ODSIngestionAuthToken
			IngestionAuthTokenUpdateMutex.Unlock()
			if ingestionAuthToken == "" {
				Log("Error::ODS Ingestion Auth Token is empty. Please check error log.")
				return output.FLB_RETRY
			}
			// add authorization header to the req
			req.Header.Set("Authorization", "Bearer "+ingestionAuthToken)
		}

		resp, err := HTTPClient.Do(req)
		elapsed = time.Since(start)

		if err != nil {
			message := fmt.Sprintf("Error when sending request %s \n", err.Error())
			Log(message)
			// Commenting this out for now. TODO - Add better telemetry for ods errors using aggregation
			//SendException(message)

			Log("Failed to flush %d records after %s", loglinesCount, elapsed)

			return output.FLB_RETRY
		}

		if resp == nil || resp.StatusCode != 200 {
			if resp != nil {
				Log("RequestId %s Status %s Status Code %d", reqId, resp.Status, resp.StatusCode)
			}
			return output.FLB_RETRY
		}

		defer resp.Body.Close()
		numContainerLogRecords = loglinesCount
		Log("PostDataHelper::Info::Successfully flushed %d %s records to ODS in %s", numContainerLogRecords, recordType, elapsed)

	}

	ContainerLogTelemetryMutex.Lock()
	defer ContainerLogTelemetryMutex.Unlock()

	if numContainerLogRecords > 0 {
		FlushedRecordsCount += float64(numContainerLogRecords)
		FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond)

		if maxLatency >= AgentLogProcessingMaxLatencyMs {
			AgentLogProcessingMaxLatencyMs = maxLatency
			AgentLogProcessingMaxLatencyMsContainer = maxLatencyContainer
		}
	}

	return output.FLB_OK
}