enumerate

in source/plugins/ruby/in_kube_podinventory.rb [103:247]


    def enumerate(podList = nil)
      begin
        podInventory = podList
        telemetryFlush = false
        @podCount = 0
        @containerCount = 0
        @serviceCount = 0
        @controllerSet = Set.new []
        @winContainerCount = 0
        @winContainerInventoryTotalSizeBytes = 0
        @winContainerCountWithInventoryRecordSize64KBOrMore = 0
        @winContainerCountWithEnvVarSize64KBOrMore = 0
        @winContainerCountWithPortsSize64KBOrMore = 0
        @winContainerCountWithCommandSize64KBOrMore = 0
        @windowsNodeCount = 0
        @controllerData = {}
        currentTime = Time.now
        batchTime = currentTime.utc.iso8601
        serviceRecords = []
        @podInventoryE2EProcessingLatencyMs = 0
        podInventoryStartTime = (Time.now.to_f * 1000).to_i
        if ExtensionUtils.isAADMSIAuthMode()
          $log.info("in_kube_podinventory::enumerate: AAD AUTH MSI MODE")
          if @kubeperfTag.nil? || !@kubeperfTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @kubeperfTag = ExtensionUtils.getOutputStreamId(Constants::PERF_DATA_TYPE)
          end
          if @kubeservicesTag.nil? || !@kubeservicesTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @kubeservicesTag = ExtensionUtils.getOutputStreamId(Constants::KUBE_SERVICES_DATA_TYPE)
          end
          if @containerInventoryTag.nil? || !@containerInventoryTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @containerInventoryTag = ExtensionUtils.getOutputStreamId(Constants::CONTAINER_INVENTORY_DATA_TYPE)
          end
          if @insightsMetricsTag.nil? || !@insightsMetricsTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @insightsMetricsTag = ExtensionUtils.getOutputStreamId(Constants::INSIGHTS_METRICS_DATA_TYPE)
          end
          if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @tag = ExtensionUtils.getOutputStreamId(Constants::KUBE_POD_INVENTORY_DATA_TYPE)
          end
          $log.info("in_kube_podinventory::enumerate: using perf tag -#{@kubeperfTag} @ #{Time.now.utc.iso8601}")
          $log.info("in_kube_podinventory::enumerate: using kubeservices tag -#{@kubeservicesTag} @ #{Time.now.utc.iso8601}")
          $log.info("in_kube_podinventory::enumerate: using containerinventory tag -#{@containerInventoryTag} @ #{Time.now.utc.iso8601}")
          $log.info("in_kube_podinventory::enumerate: using insightsmetrics tag -#{@insightsMetricsTag} @ #{Time.now.utc.iso8601}")
          $log.info("in_kube_podinventory::enumerate: using kubepodinventory tag -#{@tag} @ #{Time.now.utc.iso8601}")
        end

        
        $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}")
        serviceInfo = KubernetesApiClient.getKubeResourceInfo("services")
        
        $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}")

        if !serviceInfo.nil?
          $log.info("in_kube_podinventory::enumerate:Start:Parsing services data using yajl @ #{Time.now.utc.iso8601}")
          serviceList = Yajl::Parser.parse(StringIO.new(serviceInfo.body))
          $log.info("in_kube_podinventory::enumerate:End:Parsing services data using yajl @ #{Time.now.utc.iso8601}")
          serviceInfo = nil
          
          serviceRecords = KubernetesApiClient.getKubeServicesInventoryRecords(serviceList, batchTime)
          
          @serviceCount += serviceRecords.length
          serviceList = nil
        end

        
        @podsAPIE2ELatencyMs = 0
        podsAPIChunkStartTime = (Time.now.to_f * 1000).to_i
        
        continuationToken = nil
        $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
        continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}")
        $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")
        podsAPIChunkEndTime = (Time.now.to_f * 1000).to_i
        @podsAPIE2ELatencyMs = (podsAPIChunkEndTime - podsAPIChunkStartTime)
        if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?)
          $log.info("in_kube_podinventory::enumerate : number of pod items :#{podInventory["items"].length}  from Kube API @ #{Time.now.utc.iso8601}")
          parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime)
        else
          $log.warn "in_kube_podinventory::enumerate:Received empty podInventory"
        end

        
        while (!continuationToken.nil? && !continuationToken.empty?)
          podsAPIChunkStartTime = (Time.now.to_f * 1000).to_i
          continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}")
          podsAPIChunkEndTime = (Time.now.to_f * 1000).to_i
          @podsAPIE2ELatencyMs = @podsAPIE2ELatencyMs + (podsAPIChunkEndTime - podsAPIChunkStartTime)
          if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?)
            $log.info("in_kube_podinventory::enumerate : number of pod items :#{podInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
            parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime)
          else
            $log.warn "in_kube_podinventory::enumerate:Received empty podInventory"
          end
        end

        @podInventoryE2EProcessingLatencyMs = ((Time.now.to_f * 1000).to_i - podInventoryStartTime)
        
        podInventory = nil
        serviceRecords = nil

        
        timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs
        timeDifferenceInMinutes = timeDifference / 60
        if (timeDifferenceInMinutes >= 5)
          telemetryFlush = true
        end

        
        if telemetryFlush == true
          telemetryProperties = {}
          telemetryProperties["Computer"] = @@hostName
          telemetryProperties["PODS_CHUNK_SIZE"] = @PODS_CHUNK_SIZE
          telemetryProperties["PODS_EMIT_STREAM_BATCH_SIZE"] = @PODS_EMIT_STREAM_BATCH_SIZE
          ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties)
          ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {})
          ApplicationInsightsUtility.sendMetricTelemetry("ContainerCount", @containerCount, {})
          ApplicationInsightsUtility.sendMetricTelemetry("ServiceCount", @serviceCount, {})
          telemetryProperties["ControllerData"] = @controllerData.to_json
          ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties)
          if @winContainerCount > 0
            telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount
            telemetryProperties["WindowsNodeCount"] = @windowsNodeCount
            telemetryProperties["ClusterWideWindowsContainerInventoryTotalSizeKB"] = @winContainerInventoryTotalSizeBytes / 1024
            telemetryProperties["WindowsContainerCountWithInventoryRecordSize64KBorMore"] = @winContainerCountWithInventoryRecordSize64KBOrMore
            if @winContainerCountWithEnvVarSize64KBOrMore > 0
              telemetryProperties["WinContainerCountWithEnvVarSize64KBOrMore"] = @winContainerCountWithEnvVarSize64KBOrMore
            end
            if @winContainerCountWithPortsSize64KBOrMore > 0
              telemetryProperties["WinContainerCountWithPortsSize64KBOrMore"] = @winContainerCountWithPortsSize64KBOrMore
            end
            if @winContainerCountWithCommandSize64KBOrMore > 0
              telemetryProperties["WinContainerCountWithCommandSize64KBOrMore"] = @winContainerCountWithCommandSize64KBOrMore
            end
            ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties)
          end
          ApplicationInsightsUtility.sendMetricTelemetry("PodInventoryE2EProcessingLatencyMs", @podInventoryE2EProcessingLatencyMs, telemetryProperties)
          ApplicationInsightsUtility.sendMetricTelemetry("PodsAPIE2ELatencyMs", @podsAPIE2ELatencyMs, telemetryProperties)
          @@podTelemetryTimeTracker = DateTime.now.to_time.to_i
        end
      rescue => errorStr
        $log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}"
        $log.debug_backtrace(errorStr.backtrace)
        ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
      end
    end