enumerate

in source/plugins/ruby/in_kube_nodes.rb [115:437]


    def enumerate
      begin
        nodeInventory = nil
        currentTime = Time.now
        batchTime = currentTime.utc.iso8601
        nodeCount = 0

        @nodesAPIE2ELatencyMs = 0
        @nodeInventoryE2EProcessingLatencyMs = 0
        nodeInventoryStartTime = (Time.now.to_f * 1000).to_i

        if @extensionUtils.isAADMSIAuthMode()
          $log.info("in_kube_nodes::enumerate: AAD AUTH MSI MODE")
          if @kubeperfTag.nil? || !@kubeperfTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @kubeperfTag = @extensionUtils.getOutputStreamId(Constants::PERF_DATA_TYPE)
          end
          if @insightsMetricsTag.nil? || !@insightsMetricsTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @insightsMetricsTag = @extensionUtils.getOutputStreamId(Constants::INSIGHTS_METRICS_DATA_TYPE)
          end
          if @ContainerNodeInventoryTag.nil? || !@ContainerNodeInventoryTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @ContainerNodeInventoryTag = @extensionUtils.getOutputStreamId(Constants::CONTAINER_NODE_INVENTORY_DATA_TYPE)
          end
          if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @tag = @extensionUtils.getOutputStreamId(Constants::KUBE_NODE_INVENTORY_DATA_TYPE)
          end
          $log.info("in_kube_nodes::enumerate: using perf tag -#{@kubeperfTag} @ #{Time.now.utc.iso8601}")
          $log.info("in_kube_nodes::enumerate: using insightsmetrics tag -#{@insightsMetricsTag} @ #{Time.now.utc.iso8601}")
          $log.info("in_kube_nodes::enumerate: using containernodeinventory tag -#{@ContainerNodeInventoryTag} @ #{Time.now.utc.iso8601}")
          $log.info("in_kube_nodes::enumerate: using kubenodeinventory tag -#{@tag} @ #{Time.now.utc.iso8601}")
        end
        nodesAPIChunkStartTime = (Time.now.to_f * 1000).to_i

        
        continuationToken = nil
        $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
        
        resourceUri = KubernetesApiClient.getNodesResourceUri("nodes?limit=#{@NODES_CHUNK_SIZE}")
        continuationToken, nodeInventory = @kubernetesApiClient.getResourcesAndContinuationToken(resourceUri)
        $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
        nodesAPIChunkEndTime = (Time.now.to_f * 1000).to_i
        @nodesAPIE2ELatencyMs = (nodesAPIChunkEndTime - nodesAPIChunkStartTime)
        if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?)
          nodeCount += nodeInventory["items"].length
          $log.info("in_kube_nodes::enumerate : number of node items :#{nodeInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
          parse_and_emit_records(nodeInventory, batchTime)
        else
          $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory"
        end

        
        while (!continuationToken.nil? && !continuationToken.empty?)
          nodesAPIChunkStartTime = (Time.now.to_f * 1000).to_i
          continuationToken, nodeInventory = @kubernetesApiClient.getResourcesAndContinuationToken(resourceUri + "&continue=#{continuationToken}")
          nodesAPIChunkEndTime = (Time.now.to_f * 1000).to_i
          @nodesAPIE2ELatencyMs = @nodesAPIE2ELatencyMs + (nodesAPIChunkEndTime - nodesAPIChunkStartTime)
          if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?)
            nodeCount += nodeInventory["items"].length
            $log.info("in_kube_nodes::enumerate : number of node items :#{nodeInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
            parse_and_emit_records(nodeInventory, batchTime)
          else
            $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory"
          end
        end

        @nodeInventoryE2EProcessingLatencyMs = ((Time.now.to_f * 1000).to_i - nodeInventoryStartTime)
        timeDifference = (DateTime.now.to_time.to_i - @@nodeInventoryLatencyTelemetryTimeTracker).abs
        timeDifferenceInMinutes = timeDifference / 60
        if (timeDifferenceInMinutes >= @TELEMETRY_FLUSH_INTERVAL_IN_MINUTES)
          @applicationInsightsUtility.sendMetricTelemetry("NodeInventoryE2EProcessingLatencyMs", @nodeInventoryE2EProcessingLatencyMs, {})
          @applicationInsightsUtility.sendMetricTelemetry("NodesAPIE2ELatencyMs", @nodesAPIE2ELatencyMs, {})
          @applicationInsightsUtility.sendMetricTelemetry("NodeCount", nodeCount, {})
          @@nodeInventoryLatencyTelemetryTimeTracker = DateTime.now.to_time.to_i
        end
        
        nodeInventory = nil
      rescue => errorStr
        $log.warn "in_kube_nodes::enumerate:Failed in enumerate: #{errorStr}"
        $log.debug_backtrace(errorStr.backtrace)
        @applicationInsightsUtility.sendExceptionTelemetry(errorStr)
      end
    end 

    def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601)
      begin
        currentTime = Time.now
        emitTime = Fluent::Engine.now
        telemetrySent = false
        eventStream = Fluent::MultiEventStream.new
        containerNodeInventoryEventStream = Fluent::MultiEventStream.new
        insightsMetricsEventStream = Fluent::MultiEventStream.new
        kubePerfEventStream = Fluent::MultiEventStream.new
        @@istestvar = @env["ISTEST"]
        
        nodeInventory["items"].each do |item|
          
          nodeInventoryRecord = getNodeInventoryRecord(item, batchTime)
          eventStream.add(emitTime, nodeInventoryRecord) if nodeInventoryRecord
          if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && eventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE
            $log.info("in_kube_node::parse_and_emit_records: number of node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
            router.emit_stream(@tag, eventStream) if eventStream
            $log.info("in_kube_node::parse_and_emit_records: number of mdm node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
            router.emit_stream(@MDMKubeNodeInventoryTag, eventStream) if eventStream
            if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
              $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
            end
            eventStream = Fluent::MultiEventStream.new
          end

          
          containerNodeInventoryRecord = getContainerNodeInventoryRecord(item, batchTime)
          containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryRecord) if containerNodeInventoryRecord

          if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && containerNodeInventoryEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE
            $log.info("in_kube_node::parse_and_emit_records: number of container node inventory records emitted #{containerNodeInventoryEventStream.count} @ #{Time.now.utc.iso8601}")
            router.emit_stream(@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
            containerNodeInventoryEventStream = Fluent::MultiEventStream.new
            if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
              $log.info("containerNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
            end
          end

          
          is_windows_node = false
          if !item["status"].nil? && !item["status"]["nodeInfo"].nil? && !item["status"]["nodeInfo"]["operatingSystem"].nil?
            operatingSystem = item["status"]["nodeInfo"]["operatingSystem"]
            if (operatingSystem.is_a?(String) && operatingSystem.casecmp("windows") == 0)
              is_windows_node = true
            end
          end

          
          nodeMetricRecords = []
          nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)
          if !nodeMetricRecord.nil? && !nodeMetricRecord.empty?
            nodeMetricRecords.push(nodeMetricRecord)
          end
          nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "allocatable", "memory", "memoryAllocatableBytes", batchTime)
          if !nodeMetricRecord.nil? && !nodeMetricRecord.empty?
            nodeMetricRecords.push(nodeMetricRecord)
          end
          nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "capacity", "cpu", "cpuCapacityNanoCores", batchTime)
          if !nodeMetricRecord.nil? && !nodeMetricRecord.empty?
            nodeMetricRecords.push(nodeMetricRecord)
            
            if is_windows_node
              metricVal = JSON.parse(nodeMetricRecord["json_Collections"])[0]["Value"]
              @NodeCache.cpu.set_capacity(nodeMetricRecord["Host"], metricVal)
            end
          end
          nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "capacity", "memory", "memoryCapacityBytes", batchTime)
          if !nodeMetricRecord.nil? && !nodeMetricRecord.empty?
            nodeMetricRecords.push(nodeMetricRecord)
            
            if is_windows_node
              metricVal = JSON.parse(nodeMetricRecord["json_Collections"])[0]["Value"]
              @NodeCache.mem.set_capacity(nodeMetricRecord["Host"], metricVal)
            end
          end
          nodeMetricRecords.each do |metricRecord|
            kubePerfEventStream.add(emitTime, metricRecord) if metricRecord
          end
          if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && kubePerfEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE
            $log.info("in_kube_nodes::parse_and_emit_records: number of node perf metric records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}")
            router.emit_stream(@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
            kubePerfEventStream = Fluent::MultiEventStream.new
            if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
              $log.info("kubeNodePerfEmitStreamSuccess @ #{Time.now.utc.iso8601}")
            end
          end

          
          nodeGPUInsightsMetricsRecords = []
          insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "allocatable", "nvidia.com/gpu", "nodeGpuAllocatable", batchTime)
          if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty?
            nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord)
          end
          insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "capacity", "nvidia.com/gpu", "nodeGpuCapacity", batchTime)
          if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty?
            nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord)
          end
          insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "allocatable", "amd.com/gpu", "nodeGpuAllocatable", batchTime)
          if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty?
            nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord)
          end
          insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "capacity", "amd.com/gpu", "nodeGpuCapacity", batchTime)
          if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty?
            nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord)
          end
          nodeGPUInsightsMetricsRecords.each do |insightsMetricsRecord|
            insightsMetricsEventStream.add(emitTime, insightsMetricsRecord) if insightsMetricsRecord
          end
          if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && insightsMetricsEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE
            $log.info("in_kube_nodes::parse_and_emit_records: number of GPU node perf metric records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}")
            router.emit_stream(@insightsMetricsTag, insightsMetricsEventStream) if insightsMetricsEventStream
            insightsMetricsEventStream = Fluent::MultiEventStream.new
            if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
              $log.info("kubeNodeInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
            end
          end
          
          timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs
          timeDifferenceInMinutes = timeDifference / 60
          if (timeDifferenceInMinutes >= @TELEMETRY_FLUSH_INTERVAL_IN_MINUTES)
            begin
              properties = getNodeTelemetryProps(item)
              properties["KubernetesProviderID"] = nodeInventoryRecord["KubernetesProviderID"]
              capacityInfo = item["status"]["capacity"]

              ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties)
              begin
                if (!capacityInfo["nvidia.com/gpu"].nil?) && (!capacityInfo["nvidia.com/gpu"].empty?)
                  properties["nvigpus"] = capacityInfo["nvidia.com/gpu"]
                end

                if (!capacityInfo["amd.com/gpu"].nil?) && (!capacityInfo["amd.com/gpu"].empty?)
                  properties["amdgpus"] = capacityInfo["amd.com/gpu"]
                end
              rescue => errorStr
                $log.warn "Failed in getting GPU telemetry in_kube_nodes : #{errorStr}"
                $log.debug_backtrace(errorStr.backtrace)
                ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
              end

              
              if (File.file?(@@configMapMountPath))
                properties["collectAllKubeEvents"] = @@collectAllKubeEvents
              end

              
              if (File.file?(@@promConfigMountPath))
                properties["rsPromInt"] = @@rsPromInterval
                properties["rsPromFPC"] = @@rsPromFieldPassCount
                properties["rsPromFDC"] = @@rsPromFieldDropCount
                properties["rsPromServ"] = @@rsPromK8sServiceCount
                properties["rsPromUrl"] = @@rsPromUrlCount
                properties["rsPromMonPods"] = @@rsPromMonitorPods
                properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength
                properties["rsPromMonPodsLabelSelectorLength"] = @@rsPromMonitorPodsLabelSelectorLength
                properties["rsPromMonPodsFieldSelectorLength"] = @@rsPromMonitorPodsFieldSelectorLength
              end
              
              if (File.file?(@@osmConfigMountPath))
                properties["osmNamespaceCount"] = @@osmNamespaceCount
              end
              ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties)
              telemetrySent = true

              
              if (File.file?(@@configMapMountPath))
                properties["collectAllKubeEvents"] = @@collectAllKubeEvents
              end

              
              if (File.file?(@@promConfigMountPath))
                properties["rsPromInt"] = @@rsPromInterval
                properties["rsPromFPC"] = @@rsPromFieldPassCount
                properties["rsPromFDC"] = @@rsPromFieldDropCount
                properties["rsPromServ"] = @@rsPromK8sServiceCount
                properties["rsPromUrl"] = @@rsPromUrlCount
                properties["rsPromMonPods"] = @@rsPromMonitorPods
                properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength
                properties["rsPromMonPodsLabelSelectorLength"] = @@rsPromMonitorPodsLabelSelectorLength
                properties["rsPromMonPodsFieldSelectorLength"] = @@rsPromMonitorPodsFieldSelectorLength
              end
              
              if (File.file?(@@osmConfigMountPath))
                properties["osmNamespaceCount"] = @@osmNamespaceCount
              end
              @applicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties)
              telemetrySent = true
            rescue => errorStr
              $log.warn "Failed in getting telemetry in_kube_nodes : #{errorStr}"
              $log.debug_backtrace(errorStr.backtrace)
              @applicationInsightsUtility.sendExceptionTelemetry(errorStr)
            end
          end
        end
        if telemetrySent == true
          @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
        end
        if eventStream.count > 0
          $log.info("in_kube_node::parse_and_emit_records: number of node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@tag, eventStream) if eventStream
          $log.info("in_kube_node::parse_and_emit_records: number of mdm node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@MDMKubeNodeInventoryTag, eventStream) if eventStream
          if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
            $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
          end
          eventStream = nil
        end
        if containerNodeInventoryEventStream.count > 0
          $log.info("in_kube_node::parse_and_emit_records: number of container node inventory records emitted #{containerNodeInventoryEventStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
          containerNodeInventoryEventStream = nil
          if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
            $log.info("containerNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
          end
        end

        if kubePerfEventStream.count > 0
          $log.info("in_kube_nodes::parse_and_emit_records: number of node perf metric records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
          kubePerfEventStream = nil
          if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
            $log.info("kubeNodePerfInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
          end
        end
        if insightsMetricsEventStream.count > 0
          $log.info("in_kube_nodes::parse_and_emit_records: number of GPU node perf metric records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@insightsMetricsTag, insightsMetricsEventStream) if insightsMetricsEventStream
          insightsMetricsEventStream = nil
          if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
            $log.info("kubeNodeInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
          end
        end
      rescue => errorStr
        $log.warn "Failed to retrieve node inventory: #{errorStr}"
        $log.debug_backtrace(errorStr.backtrace)
        @applicationInsightsUtility.sendExceptionTelemetry(errorStr)
      end
      $log.info "in_kube_nodes::parse_and_emit_records:End #{Time.now.utc.iso8601}"
    end