enumerate

in source/plugins/ruby/in_containerinventory.rb [53:140]


    def enumerate
      currentTime = Time.now
      batchTime = currentTime.utc.iso8601
      emitTime = Fluent::Engine.now
      containerInventory = Array.new
      eventStream = Fluent::MultiEventStream.new
      hostName = ""
      $log.info("in_container_inventory::enumerate : Begin processing @ #{Time.now.utc.iso8601}")
      if ExtensionUtils.isAADMSIAuthMode()
        $log.info("in_container_inventory::enumerate: AAD AUTH MSI MODE")
        if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
          @tag = ExtensionUtils.getOutputStreamId(Constants::CONTAINER_INVENTORY_DATA_TYPE)
        end
        $log.info("in_container_inventory::enumerate: using tag -#{@tag} @ #{Time.now.utc.iso8601}")
      end
      begin
        containerRuntimeEnv = ENV["CONTAINER_RUNTIME"]
        $log.info("in_container_inventory::enumerate : container runtime : #{containerRuntimeEnv}")
        clusterCollectEnvironmentVar = ENV["AZMON_CLUSTER_COLLECT_ENV_VAR"]
        $log.info("in_container_inventory::enumerate : using cadvisor apis")
        containerIds = Array.new
        response = CAdvisorMetricsAPIClient.getPodsFromCAdvisor(winNode: nil)
        if !response.nil? && !response.body.nil?
            podList = JSON.parse(response.body)
            if !podList.nil? && !podList.empty? && podList.key?("items") && !podList["items"].nil? && !podList["items"].empty?
              podList["items"].each do |item|
                containerInventoryRecords = KubernetesContainerInventory.getContainerInventoryRecords(item, batchTime, clusterCollectEnvironmentVar)
                containerInventoryRecords.each do |containerRecord|
                  ContainerInventoryState.writeContainerState(containerRecord)
                  if hostName.empty? && !containerRecord["Computer"].empty?
                    hostName = containerRecord["Computer"]
                  end
                  if @addonTokenAdapterImageTag.empty? && ExtensionUtils.isAADMSIAuthMode()
                     if !containerRecord["ElementName"].nil? && !containerRecord["ElementName"].empty? &&
                      containerRecord["ElementName"].include?("_kube-system_") &&
                      containerRecord["ElementName"].include?("addon-token-adapter_omsagent")
                      if !containerRecord["ImageTag"].nil? && !containerRecord["ImageTag"].empty?
                        @addonTokenAdapterImageTag = containerRecord["ImageTag"]
                      end
                     end
                  end
                  containerIds.push containerRecord["InstanceID"]
                  containerInventory.push containerRecord
                end
              end
            end
        end
        
        deletedContainers = ContainerInventoryState.getDeletedContainers(containerIds)
        if !deletedContainers.nil? && !deletedContainers.empty?
          deletedContainers.each do |deletedContainer|
            container = ContainerInventoryState.readContainerState(deletedContainer)
            if !container.nil?
              container.each { |k, v| container[k] = v }
              container["State"] = "Deleted"
              KubernetesContainerInventory.deleteCGroupCacheEntryForDeletedContainer(container["InstanceID"])
              containerInventory.push container
             end
           end
        end
        containerInventory.each do |record|
          eventStream.add(emitTime, record) if record
        end
        router.emit_stream(@tag, eventStream) if eventStream
        @@istestvar = ENV["ISTEST"]
        if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0)
          $log.info("containerInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
        end
        $log.info("in_container_inventory::enumerate : Processing complete - emitted stream @ #{Time.now.utc.iso8601}")
        timeDifference = (DateTime.now.to_time.to_i - @@telemetryTimeTracker).abs
        timeDifferenceInMinutes = timeDifference / 60
        if (timeDifferenceInMinutes >= 5)
          @@telemetryTimeTracker = DateTime.now.to_time.to_i
          telemetryProperties = {}
          telemetryProperties["Computer"] = hostName
          telemetryProperties["ContainerCount"] = containerInventory.length
          if !@addonTokenAdapterImageTag.empty?
            telemetryProperties["addonTokenAdapterImageTag"] = @addonTokenAdapterImageTag
          end
          ApplicationInsightsUtility.sendTelemetry(@@PluginName, telemetryProperties)
        end
      rescue => errorStr
        $log.warn("Exception in enumerate container inventory: #{errorStr}")
        $log.debug_backtrace(errorStr.backtrace)
        ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
      end
    end