in source/plugins/ruby/in_kube_podinventory.rb [248:440]
def parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime = Time.utc.iso8601)
currentTime = Time.now
emitTime = Fluent::Engine.now
eventStream = Fluent::MultiEventStream.new
containerInventoryStream = Fluent::MultiEventStream.new
kubePerfEventStream = Fluent::MultiEventStream.new
insightsMetricsEventStream = Fluent::MultiEventStream.new
@@istestvar = ENV["ISTEST"]
begin
winNodes = KubernetesApiClient.getWindowsNodesArray
podInventory["items"].each do |item|
podInventoryRecords = getPodInventoryRecords(item, serviceRecords, batchTime)
@containerCount += podInventoryRecords.length
podInventoryRecords.each do |record|
if !record.nil?
eventStream.add(emitTime, record) if record
@inventoryToMdmConvertor.process_pod_inventory_record(record)
end
end
if winNodes.length > 0
nodeName = ""
if !item["spec"]["nodeName"].nil?
nodeName = item["spec"]["nodeName"]
end
@windowsNodeCount = winNodes.length
if (!nodeName.empty? && (winNodes.include? nodeName))
clusterCollectEnvironmentVar = ENV["AZMON_CLUSTER_COLLECT_ENV_VAR"]
containerInventoryRecords = KubernetesContainerInventory.getContainerInventoryRecords(item, batchTime, clusterCollectEnvironmentVar, true)
@winContainerCount += containerInventoryRecords.length
containerInventoryRecords.each do |cirecord|
if !cirecord.nil?
containerInventoryStream.add(emitTime, cirecord) if cirecord
ciRecordSize = cirecord.to_s.length
@winContainerInventoryTotalSizeBytes += ciRecordSize
if ciRecordSize >= Constants::MAX_RECORD_OR_FIELD_SIZE_FOR_TELEMETRY
@winContainerCountWithInventoryRecordSize64KBOrMore += 1
end
if !cirecord["EnvironmentVar"].nil? && !cirecord["EnvironmentVar"].empty? && cirecord["EnvironmentVar"].length >= Constants::MAX_RECORD_OR_FIELD_SIZE_FOR_TELEMETRY
@winContainerCountWithEnvVarSize64KBOrMore += 1
end
if !cirecord["Ports"].nil? && !cirecord["Ports"].empty? && cirecord["Ports"].length >= Constants::MAX_RECORD_OR_FIELD_SIZE_FOR_TELEMETRY
@winContainerCountWithPortsSize64KBOrMore += 1
end
if !cirecord["Command"].nil? && !cirecord["Command"].empty? && cirecord["Command"].length >= Constants::MAX_RECORD_OR_FIELD_SIZE_FOR_TELEMETRY
@winContainerCountWithCommandSize64KBOrMore += 1
end
end
end
end
end
if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && eventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE
$log.info("in_kube_podinventory::parse_and_emit_records: number of pod inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
router.emit_stream(@tag, eventStream) if eventStream
eventStream = Fluent::MultiEventStream.new
end
containerMetricDataItems = []
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "requests", "cpu", "cpuRequestNanoCores", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "requests", "memory", "memoryRequestBytes", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "limits", "cpu", "cpuLimitNanoCores", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "limits", "memory", "memoryLimitBytes", batchTime))
containerMetricDataItems.each do |record|
kubePerfEventStream.add(emitTime, record) if record
end
if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && kubePerfEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE
$log.info("in_kube_podinventory::parse_and_emit_records: number of container perf records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeContainerPerfEventEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
kubePerfEventStream = Fluent::MultiEventStream.new
end
containerGPUInsightsMetricsDataItems = []
containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "requests", "nvidia.com/gpu", "containerGpuRequests", batchTime))
containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "limits", "nvidia.com/gpu", "containerGpuLimits", batchTime))
containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "requests", "amd.com/gpu", "containerGpuRequests", batchTime))
containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "limits", "amd.com/gpu", "containerGpuLimits", batchTime))
containerGPUInsightsMetricsDataItems.each do |insightsMetricsRecord|
insightsMetricsEventStream.add(emitTime, insightsMetricsRecord) if insightsMetricsRecord
end
if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && insightsMetricsEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE
$log.info("in_kube_podinventory::parse_and_emit_records: number of GPU insights metrics records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}")
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubePodInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
router.emit_stream(@insightsMetricsTag, insightsMetricsEventStream) if insightsMetricsEventStream
insightsMetricsEventStream = Fluent::MultiEventStream.new
end
end
if eventStream.count > 0
$log.info("in_kube_podinventory::parse_and_emit_records: number of pod inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@tag, eventStream) if eventStream
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
eventStream = nil
end
if containerInventoryStream.count > 0
$log.info("in_kube_podinventory::parse_and_emit_records: number of windows container inventory records emitted #{containerInventoryStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@containerInventoryTag, containerInventoryStream) if containerInventoryStream
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeWindowsContainerInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
containerInventoryStream = nil
end
if kubePerfEventStream.count > 0
$log.info("in_kube_podinventory::parse_and_emit_records: number of perf records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
kubePerfEventStream = nil
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeContainerPerfEventEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
if insightsMetricsEventStream.count > 0
$log.info("in_kube_podinventory::parse_and_emit_records: number of insights metrics records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@insightsMetricsTag, insightsMetricsEventStream) if insightsMetricsEventStream
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubePodInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
insightsMetricsEventStream = nil
end
if continuationToken.nil?
@log.info "Sending pod inventory mdm records to out_mdm"
pod_inventory_mdm_records = @inventoryToMdmConvertor.get_pod_inventory_mdm_records(batchTime)
@log.info "pod_inventory_mdm_records.size #{pod_inventory_mdm_records.size}"
mdm_pod_inventory_es = Fluent::MultiEventStream.new
pod_inventory_mdm_records.each { |pod_inventory_mdm_record|
mdm_pod_inventory_es.add(batchTime, pod_inventory_mdm_record) if pod_inventory_mdm_record
} if pod_inventory_mdm_records
router.emit_stream(@@MDMKubePodInventoryTag, mdm_pod_inventory_es) if mdm_pod_inventory_es
end
if continuationToken.nil?
kubeServicesEventStream = Fluent::MultiEventStream.new
serviceRecords.each do |kubeServiceRecord|
if !kubeServiceRecord.nil?
kubeServiceRecord["ClusterId"] = KubernetesApiClient.getClusterId
kubeServiceRecord["ClusterName"] = KubernetesApiClient.getClusterName
kubeServicesEventStream.add(emitTime, kubeServiceRecord) if kubeServiceRecord
if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && kubeServicesEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE
$log.info("in_kube_podinventory::parse_and_emit_records: number of service records emitted #{kubeServicesEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@kubeservicesTag, kubeServicesEventStream) if kubeServicesEventStream
kubeServicesEventStream = Fluent::MultiEventStream.new
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeServicesEventEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
end
end
if kubeServicesEventStream.count > 0
$log.info("in_kube_podinventory::parse_and_emit_records : number of service records emitted #{kubeServicesEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@kubeservicesTag, kubeServicesEventStream) if kubeServicesEventStream
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeServicesEventEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
kubeServicesEventStream = nil
end
@podCount += podInventory["items"].length
rescue => errorStr
$log.warn "Failed in parse_and_emit_record pod inventory: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end