enumerate

in source/plugins/ruby/in_kube_events.rb [80:190]


    def enumerate
      begin
        eventList = nil
        currentTime = Time.now
        batchTime = currentTime.utc.iso8601
        eventQueryState = getEventQueryState
        newEventQueryState = []
        @eventsCount = 0

        if ExtensionUtils.isAADMSIAuthMode()
          $log.info("in_kube_events::enumerate: AAD AUTH MSI MODE")
          if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @tag = ExtensionUtils.getOutputStreamId(Constants::KUBE_EVENTS_DATA_TYPE)
          end
          $log.info("in_kube_events::enumerate: using kubeevents tag -#{@tag} @ #{Time.now.utc.iso8601}")
        end
        
        continuationToken = nil
        $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}")
        if @collectAllKubeEvents
          continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?limit=#{@EVENTS_CHUNK_SIZE}")
        else
          continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}")
        end
        $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}")
        if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?)
          eventsCount = eventList["items"].length
          $log.info "in_kube_events::enumerate:Received number of events in eventList is #{eventsCount} @ #{Time.now.utc.iso8601}"
          newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime)
        else
          $log.warn "in_kube_events::enumerate:Received empty eventList"
        end

        
        while (!continuationToken.nil? && !continuationToken.empty?)
          continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}&continue=#{continuationToken}")
          if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?)
            eventsCount = eventList["items"].length
            $log.info "in_kube_events::enumerate:Received number of events in eventList is #{eventsCount} @ #{Time.now.utc.iso8601}"
            newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime)
          else
            $log.warn "in_kube_events::enumerate:Received empty eventList"
          end
        end

        
        eventList = nil
        writeEventQueryState(newEventQueryState)

        
        if (@eventsCount > 0)
          ApplicationInsightsUtility.sendMetricTelemetry("EventCount", @eventsCount, {})
        end
      rescue => errorStr
        $log.warn "in_kube_events::enumerate:Failed in enumerate: #{errorStr}"
        $log.debug_backtrace(errorStr.backtrace)
        ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
      end
    end 

    def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTime = Time.utc.iso8601)
      currentTime = Time.now
      emitTime = Fluent::Engine.now
      @@istestvar = ENV["ISTEST"]
      begin
        eventStream = Fluent::MultiEventStream.new
        events["items"].each do |items|
          record = {}
          
          record["CollectionTime"] = batchTime 
          eventId = items["metadata"]["uid"] + "/" + items["count"].to_s
          newEventQueryState.push(eventId)
          if !eventQueryState.empty? && eventQueryState.include?(eventId)
            next
          end

          nodeName = items["source"].key?("host") ? items["source"]["host"] : (OMS::Common.get_hostname)
          
          if KubernetesApiClient.isAROV3Cluster && !nodeName.nil? && !nodeName.empty? &&
             (nodeName.downcase.start_with?("infra-") || nodeName.downcase.start_with?("master-"))
            next
          end

          record["ObjectKind"] = items["involvedObject"]["kind"]
          record["Namespace"] = items["involvedObject"]["namespace"]
          record["Name"] = items["involvedObject"]["name"]
          record["Reason"] = items["reason"]
          record["Message"] = items["message"]
          record["KubeEventType"] = items["type"]
          record["TimeGenerated"] = items["metadata"]["creationTimestamp"]
          record["SourceComponent"] = items["source"]["component"]
          record["FirstSeen"] = items["firstTimestamp"]
          record["LastSeen"] = items["lastTimestamp"]
          record["Count"] = items["count"]
          record["Computer"] = nodeName
          record["ClusterName"] = KubernetesApiClient.getClusterName
          record["ClusterId"] = KubernetesApiClient.getClusterId
          eventStream.add(emitTime, record) if record
          @eventsCount += 1
        end
        router.emit_stream(@tag, eventStream) if eventStream
        if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
          $log.info("kubeEventsInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
        end
      rescue => errorStr
        $log.debug_backtrace(errorStr.backtrace)
        ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
      end
      return newEventQueryState
    end