in source/plugins/ruby/in_kube_events.rb [80:190]
def enumerate
begin
eventList = nil
currentTime = Time.now
batchTime = currentTime.utc.iso8601
eventQueryState = getEventQueryState
newEventQueryState = []
@eventsCount = 0
if ExtensionUtils.isAADMSIAuthMode()
$log.info("in_kube_events::enumerate: AAD AUTH MSI MODE")
if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@tag = ExtensionUtils.getOutputStreamId(Constants::KUBE_EVENTS_DATA_TYPE)
end
$log.info("in_kube_events::enumerate: using kubeevents tag -#{@tag} @ #{Time.now.utc.iso8601}")
end
continuationToken = nil
$log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}")
if @collectAllKubeEvents
continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?limit=#{@EVENTS_CHUNK_SIZE}")
else
continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}")
end
$log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}")
if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?)
eventsCount = eventList["items"].length
$log.info "in_kube_events::enumerate:Received number of events in eventList is #{eventsCount} @ #{Time.now.utc.iso8601}"
newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime)
else
$log.warn "in_kube_events::enumerate:Received empty eventList"
end
while (!continuationToken.nil? && !continuationToken.empty?)
continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}&continue=#{continuationToken}")
if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?)
eventsCount = eventList["items"].length
$log.info "in_kube_events::enumerate:Received number of events in eventList is #{eventsCount} @ #{Time.now.utc.iso8601}"
newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime)
else
$log.warn "in_kube_events::enumerate:Received empty eventList"
end
end
eventList = nil
writeEventQueryState(newEventQueryState)
if (@eventsCount > 0)
ApplicationInsightsUtility.sendMetricTelemetry("EventCount", @eventsCount, {})
end
rescue => errorStr
$log.warn "in_kube_events::enumerate:Failed in enumerate: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end
def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTime = Time.utc.iso8601)
currentTime = Time.now
emitTime = Fluent::Engine.now
@@istestvar = ENV["ISTEST"]
begin
eventStream = Fluent::MultiEventStream.new
events["items"].each do |items|
record = {}
record["CollectionTime"] = batchTime
eventId = items["metadata"]["uid"] + "/" + items["count"].to_s
newEventQueryState.push(eventId)
if !eventQueryState.empty? && eventQueryState.include?(eventId)
next
end
nodeName = items["source"].key?("host") ? items["source"]["host"] : (OMS::Common.get_hostname)
if KubernetesApiClient.isAROV3Cluster && !nodeName.nil? && !nodeName.empty? &&
(nodeName.downcase.start_with?("infra-") || nodeName.downcase.start_with?("master-"))
next
end
record["ObjectKind"] = items["involvedObject"]["kind"]
record["Namespace"] = items["involvedObject"]["namespace"]
record["Name"] = items["involvedObject"]["name"]
record["Reason"] = items["reason"]
record["Message"] = items["message"]
record["KubeEventType"] = items["type"]
record["TimeGenerated"] = items["metadata"]["creationTimestamp"]
record["SourceComponent"] = items["source"]["component"]
record["FirstSeen"] = items["firstTimestamp"]
record["LastSeen"] = items["lastTimestamp"]
record["Count"] = items["count"]
record["Computer"] = nodeName
record["ClusterName"] = KubernetesApiClient.getClusterName
record["ClusterId"] = KubernetesApiClient.getClusterId
eventStream.add(emitTime, record) if record
@eventsCount += 1
end
router.emit_stream(@tag, eventStream) if eventStream
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeEventsInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
rescue => errorStr
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
return newEventQueryState
end