in source/plugins/ruby/in_kube_podinventory.rb [103:247]
def enumerate(podList = nil)
begin
podInventory = podList
telemetryFlush = false
@podCount = 0
@containerCount = 0
@serviceCount = 0
@controllerSet = Set.new []
@winContainerCount = 0
@winContainerInventoryTotalSizeBytes = 0
@winContainerCountWithInventoryRecordSize64KBOrMore = 0
@winContainerCountWithEnvVarSize64KBOrMore = 0
@winContainerCountWithPortsSize64KBOrMore = 0
@winContainerCountWithCommandSize64KBOrMore = 0
@windowsNodeCount = 0
@controllerData = {}
currentTime = Time.now
batchTime = currentTime.utc.iso8601
serviceRecords = []
@podInventoryE2EProcessingLatencyMs = 0
podInventoryStartTime = (Time.now.to_f * 1000).to_i
if ExtensionUtils.isAADMSIAuthMode()
$log.info("in_kube_podinventory::enumerate: AAD AUTH MSI MODE")
if @kubeperfTag.nil? || !@kubeperfTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@kubeperfTag = ExtensionUtils.getOutputStreamId(Constants::PERF_DATA_TYPE)
end
if @kubeservicesTag.nil? || !@kubeservicesTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@kubeservicesTag = ExtensionUtils.getOutputStreamId(Constants::KUBE_SERVICES_DATA_TYPE)
end
if @containerInventoryTag.nil? || !@containerInventoryTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@containerInventoryTag = ExtensionUtils.getOutputStreamId(Constants::CONTAINER_INVENTORY_DATA_TYPE)
end
if @insightsMetricsTag.nil? || !@insightsMetricsTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@insightsMetricsTag = ExtensionUtils.getOutputStreamId(Constants::INSIGHTS_METRICS_DATA_TYPE)
end
if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@tag = ExtensionUtils.getOutputStreamId(Constants::KUBE_POD_INVENTORY_DATA_TYPE)
end
$log.info("in_kube_podinventory::enumerate: using perf tag -#{@kubeperfTag} @ #{Time.now.utc.iso8601}")
$log.info("in_kube_podinventory::enumerate: using kubeservices tag -#{@kubeservicesTag} @ #{Time.now.utc.iso8601}")
$log.info("in_kube_podinventory::enumerate: using containerinventory tag -#{@containerInventoryTag} @ #{Time.now.utc.iso8601}")
$log.info("in_kube_podinventory::enumerate: using insightsmetrics tag -#{@insightsMetricsTag} @ #{Time.now.utc.iso8601}")
$log.info("in_kube_podinventory::enumerate: using kubepodinventory tag -#{@tag} @ #{Time.now.utc.iso8601}")
end
$log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}")
serviceInfo = KubernetesApiClient.getKubeResourceInfo("services")
$log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}")
if !serviceInfo.nil?
$log.info("in_kube_podinventory::enumerate:Start:Parsing services data using yajl @ #{Time.now.utc.iso8601}")
serviceList = Yajl::Parser.parse(StringIO.new(serviceInfo.body))
$log.info("in_kube_podinventory::enumerate:End:Parsing services data using yajl @ #{Time.now.utc.iso8601}")
serviceInfo = nil
serviceRecords = KubernetesApiClient.getKubeServicesInventoryRecords(serviceList, batchTime)
@serviceCount += serviceRecords.length
serviceList = nil
end
@podsAPIE2ELatencyMs = 0
podsAPIChunkStartTime = (Time.now.to_f * 1000).to_i
continuationToken = nil
$log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}")
$log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")
podsAPIChunkEndTime = (Time.now.to_f * 1000).to_i
@podsAPIE2ELatencyMs = (podsAPIChunkEndTime - podsAPIChunkStartTime)
if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?)
$log.info("in_kube_podinventory::enumerate : number of pod items :#{podInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime)
else
$log.warn "in_kube_podinventory::enumerate:Received empty podInventory"
end
while (!continuationToken.nil? && !continuationToken.empty?)
podsAPIChunkStartTime = (Time.now.to_f * 1000).to_i
continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}")
podsAPIChunkEndTime = (Time.now.to_f * 1000).to_i
@podsAPIE2ELatencyMs = @podsAPIE2ELatencyMs + (podsAPIChunkEndTime - podsAPIChunkStartTime)
if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?)
$log.info("in_kube_podinventory::enumerate : number of pod items :#{podInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime)
else
$log.warn "in_kube_podinventory::enumerate:Received empty podInventory"
end
end
@podInventoryE2EProcessingLatencyMs = ((Time.now.to_f * 1000).to_i - podInventoryStartTime)
podInventory = nil
serviceRecords = nil
timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs
timeDifferenceInMinutes = timeDifference / 60
if (timeDifferenceInMinutes >= 5)
telemetryFlush = true
end
if telemetryFlush == true
telemetryProperties = {}
telemetryProperties["Computer"] = @@hostName
telemetryProperties["PODS_CHUNK_SIZE"] = @PODS_CHUNK_SIZE
telemetryProperties["PODS_EMIT_STREAM_BATCH_SIZE"] = @PODS_EMIT_STREAM_BATCH_SIZE
ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties)
ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {})
ApplicationInsightsUtility.sendMetricTelemetry("ContainerCount", @containerCount, {})
ApplicationInsightsUtility.sendMetricTelemetry("ServiceCount", @serviceCount, {})
telemetryProperties["ControllerData"] = @controllerData.to_json
ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties)
if @winContainerCount > 0
telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount
telemetryProperties["WindowsNodeCount"] = @windowsNodeCount
telemetryProperties["ClusterWideWindowsContainerInventoryTotalSizeKB"] = @winContainerInventoryTotalSizeBytes / 1024
telemetryProperties["WindowsContainerCountWithInventoryRecordSize64KBorMore"] = @winContainerCountWithInventoryRecordSize64KBOrMore
if @winContainerCountWithEnvVarSize64KBOrMore > 0
telemetryProperties["WinContainerCountWithEnvVarSize64KBOrMore"] = @winContainerCountWithEnvVarSize64KBOrMore
end
if @winContainerCountWithPortsSize64KBOrMore > 0
telemetryProperties["WinContainerCountWithPortsSize64KBOrMore"] = @winContainerCountWithPortsSize64KBOrMore
end
if @winContainerCountWithCommandSize64KBOrMore > 0
telemetryProperties["WinContainerCountWithCommandSize64KBOrMore"] = @winContainerCountWithCommandSize64KBOrMore
end
ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties)
end
ApplicationInsightsUtility.sendMetricTelemetry("PodInventoryE2EProcessingLatencyMs", @podInventoryE2EProcessingLatencyMs, telemetryProperties)
ApplicationInsightsUtility.sendMetricTelemetry("PodsAPIE2ELatencyMs", @podsAPIE2ELatencyMs, telemetryProperties)
@@podTelemetryTimeTracker = DateTime.now.to_time.to_i
end
rescue => errorStr
$log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end