in source/plugins/ruby/in_containerinventory.rb [53:140]
def enumerate
currentTime = Time.now
batchTime = currentTime.utc.iso8601
emitTime = Fluent::Engine.now
containerInventory = Array.new
eventStream = Fluent::MultiEventStream.new
hostName = ""
$log.info("in_container_inventory::enumerate : Begin processing @ #{Time.now.utc.iso8601}")
if ExtensionUtils.isAADMSIAuthMode()
$log.info("in_container_inventory::enumerate: AAD AUTH MSI MODE")
if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@tag = ExtensionUtils.getOutputStreamId(Constants::CONTAINER_INVENTORY_DATA_TYPE)
end
$log.info("in_container_inventory::enumerate: using tag -#{@tag} @ #{Time.now.utc.iso8601}")
end
begin
containerRuntimeEnv = ENV["CONTAINER_RUNTIME"]
$log.info("in_container_inventory::enumerate : container runtime : #{containerRuntimeEnv}")
clusterCollectEnvironmentVar = ENV["AZMON_CLUSTER_COLLECT_ENV_VAR"]
$log.info("in_container_inventory::enumerate : using cadvisor apis")
containerIds = Array.new
response = CAdvisorMetricsAPIClient.getPodsFromCAdvisor(winNode: nil)
if !response.nil? && !response.body.nil?
podList = JSON.parse(response.body)
if !podList.nil? && !podList.empty? && podList.key?("items") && !podList["items"].nil? && !podList["items"].empty?
podList["items"].each do |item|
containerInventoryRecords = KubernetesContainerInventory.getContainerInventoryRecords(item, batchTime, clusterCollectEnvironmentVar)
containerInventoryRecords.each do |containerRecord|
ContainerInventoryState.writeContainerState(containerRecord)
if hostName.empty? && !containerRecord["Computer"].empty?
hostName = containerRecord["Computer"]
end
if @addonTokenAdapterImageTag.empty? && ExtensionUtils.isAADMSIAuthMode()
if !containerRecord["ElementName"].nil? && !containerRecord["ElementName"].empty? &&
containerRecord["ElementName"].include?("_kube-system_") &&
containerRecord["ElementName"].include?("addon-token-adapter_omsagent")
if !containerRecord["ImageTag"].nil? && !containerRecord["ImageTag"].empty?
@addonTokenAdapterImageTag = containerRecord["ImageTag"]
end
end
end
containerIds.push containerRecord["InstanceID"]
containerInventory.push containerRecord
end
end
end
end
deletedContainers = ContainerInventoryState.getDeletedContainers(containerIds)
if !deletedContainers.nil? && !deletedContainers.empty?
deletedContainers.each do |deletedContainer|
container = ContainerInventoryState.readContainerState(deletedContainer)
if !container.nil?
container.each { |k, v| container[k] = v }
container["State"] = "Deleted"
KubernetesContainerInventory.deleteCGroupCacheEntryForDeletedContainer(container["InstanceID"])
containerInventory.push container
end
end
end
containerInventory.each do |record|
eventStream.add(emitTime, record) if record
end
router.emit_stream(@tag, eventStream) if eventStream
@@istestvar = ENV["ISTEST"]
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0)
$log.info("containerInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
$log.info("in_container_inventory::enumerate : Processing complete - emitted stream @ #{Time.now.utc.iso8601}")
timeDifference = (DateTime.now.to_time.to_i - @@telemetryTimeTracker).abs
timeDifferenceInMinutes = timeDifference / 60
if (timeDifferenceInMinutes >= 5)
@@telemetryTimeTracker = DateTime.now.to_time.to_i
telemetryProperties = {}
telemetryProperties["Computer"] = hostName
telemetryProperties["ContainerCount"] = containerInventory.length
if !@addonTokenAdapterImageTag.empty?
telemetryProperties["addonTokenAdapterImageTag"] = @addonTokenAdapterImageTag
end
ApplicationInsightsUtility.sendTelemetry(@@PluginName, telemetryProperties)
end
rescue => errorStr
$log.warn("Exception in enumerate container inventory: #{errorStr}")
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end