in source/plugins/ruby/in_kube_nodes.rb [115:437]
def enumerate
begin
nodeInventory = nil
currentTime = Time.now
batchTime = currentTime.utc.iso8601
nodeCount = 0
@nodesAPIE2ELatencyMs = 0
@nodeInventoryE2EProcessingLatencyMs = 0
nodeInventoryStartTime = (Time.now.to_f * 1000).to_i
if @extensionUtils.isAADMSIAuthMode()
$log.info("in_kube_nodes::enumerate: AAD AUTH MSI MODE")
if @kubeperfTag.nil? || !@kubeperfTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@kubeperfTag = @extensionUtils.getOutputStreamId(Constants::PERF_DATA_TYPE)
end
if @insightsMetricsTag.nil? || !@insightsMetricsTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@insightsMetricsTag = @extensionUtils.getOutputStreamId(Constants::INSIGHTS_METRICS_DATA_TYPE)
end
if @ContainerNodeInventoryTag.nil? || !@ContainerNodeInventoryTag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@ContainerNodeInventoryTag = @extensionUtils.getOutputStreamId(Constants::CONTAINER_NODE_INVENTORY_DATA_TYPE)
end
if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@tag = @extensionUtils.getOutputStreamId(Constants::KUBE_NODE_INVENTORY_DATA_TYPE)
end
$log.info("in_kube_nodes::enumerate: using perf tag -#{@kubeperfTag} @ #{Time.now.utc.iso8601}")
$log.info("in_kube_nodes::enumerate: using insightsmetrics tag -#{@insightsMetricsTag} @ #{Time.now.utc.iso8601}")
$log.info("in_kube_nodes::enumerate: using containernodeinventory tag -#{@ContainerNodeInventoryTag} @ #{Time.now.utc.iso8601}")
$log.info("in_kube_nodes::enumerate: using kubenodeinventory tag -#{@tag} @ #{Time.now.utc.iso8601}")
end
nodesAPIChunkStartTime = (Time.now.to_f * 1000).to_i
continuationToken = nil
$log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
resourceUri = KubernetesApiClient.getNodesResourceUri("nodes?limit=#{@NODES_CHUNK_SIZE}")
continuationToken, nodeInventory = @kubernetesApiClient.getResourcesAndContinuationToken(resourceUri)
$log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
nodesAPIChunkEndTime = (Time.now.to_f * 1000).to_i
@nodesAPIE2ELatencyMs = (nodesAPIChunkEndTime - nodesAPIChunkStartTime)
if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?)
nodeCount += nodeInventory["items"].length
$log.info("in_kube_nodes::enumerate : number of node items :#{nodeInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
parse_and_emit_records(nodeInventory, batchTime)
else
$log.warn "in_kube_nodes::enumerate:Received empty nodeInventory"
end
while (!continuationToken.nil? && !continuationToken.empty?)
nodesAPIChunkStartTime = (Time.now.to_f * 1000).to_i
continuationToken, nodeInventory = @kubernetesApiClient.getResourcesAndContinuationToken(resourceUri + "&continue=#{continuationToken}")
nodesAPIChunkEndTime = (Time.now.to_f * 1000).to_i
@nodesAPIE2ELatencyMs = @nodesAPIE2ELatencyMs + (nodesAPIChunkEndTime - nodesAPIChunkStartTime)
if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?)
nodeCount += nodeInventory["items"].length
$log.info("in_kube_nodes::enumerate : number of node items :#{nodeInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
parse_and_emit_records(nodeInventory, batchTime)
else
$log.warn "in_kube_nodes::enumerate:Received empty nodeInventory"
end
end
@nodeInventoryE2EProcessingLatencyMs = ((Time.now.to_f * 1000).to_i - nodeInventoryStartTime)
timeDifference = (DateTime.now.to_time.to_i - @@nodeInventoryLatencyTelemetryTimeTracker).abs
timeDifferenceInMinutes = timeDifference / 60
if (timeDifferenceInMinutes >= @TELEMETRY_FLUSH_INTERVAL_IN_MINUTES)
@applicationInsightsUtility.sendMetricTelemetry("NodeInventoryE2EProcessingLatencyMs", @nodeInventoryE2EProcessingLatencyMs, {})
@applicationInsightsUtility.sendMetricTelemetry("NodesAPIE2ELatencyMs", @nodesAPIE2ELatencyMs, {})
@applicationInsightsUtility.sendMetricTelemetry("NodeCount", nodeCount, {})
@@nodeInventoryLatencyTelemetryTimeTracker = DateTime.now.to_time.to_i
end
nodeInventory = nil
rescue => errorStr
$log.warn "in_kube_nodes::enumerate:Failed in enumerate: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
@applicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end
def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601)
begin
currentTime = Time.now
emitTime = Fluent::Engine.now
telemetrySent = false
eventStream = Fluent::MultiEventStream.new
containerNodeInventoryEventStream = Fluent::MultiEventStream.new
insightsMetricsEventStream = Fluent::MultiEventStream.new
kubePerfEventStream = Fluent::MultiEventStream.new
@@istestvar = @env["ISTEST"]
nodeInventory["items"].each do |item|
nodeInventoryRecord = getNodeInventoryRecord(item, batchTime)
eventStream.add(emitTime, nodeInventoryRecord) if nodeInventoryRecord
if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && eventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE
$log.info("in_kube_node::parse_and_emit_records: number of node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@tag, eventStream) if eventStream
$log.info("in_kube_node::parse_and_emit_records: number of mdm node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@MDMKubeNodeInventoryTag, eventStream) if eventStream
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
eventStream = Fluent::MultiEventStream.new
end
containerNodeInventoryRecord = getContainerNodeInventoryRecord(item, batchTime)
containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryRecord) if containerNodeInventoryRecord
if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && containerNodeInventoryEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE
$log.info("in_kube_node::parse_and_emit_records: number of container node inventory records emitted #{containerNodeInventoryEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
containerNodeInventoryEventStream = Fluent::MultiEventStream.new
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("containerNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
is_windows_node = false
if !item["status"].nil? && !item["status"]["nodeInfo"].nil? && !item["status"]["nodeInfo"]["operatingSystem"].nil?
operatingSystem = item["status"]["nodeInfo"]["operatingSystem"]
if (operatingSystem.is_a?(String) && operatingSystem.casecmp("windows") == 0)
is_windows_node = true
end
end
nodeMetricRecords = []
nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)
if !nodeMetricRecord.nil? && !nodeMetricRecord.empty?
nodeMetricRecords.push(nodeMetricRecord)
end
nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "allocatable", "memory", "memoryAllocatableBytes", batchTime)
if !nodeMetricRecord.nil? && !nodeMetricRecord.empty?
nodeMetricRecords.push(nodeMetricRecord)
end
nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "capacity", "cpu", "cpuCapacityNanoCores", batchTime)
if !nodeMetricRecord.nil? && !nodeMetricRecord.empty?
nodeMetricRecords.push(nodeMetricRecord)
if is_windows_node
metricVal = JSON.parse(nodeMetricRecord["json_Collections"])[0]["Value"]
@NodeCache.cpu.set_capacity(nodeMetricRecord["Host"], metricVal)
end
end
nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "capacity", "memory", "memoryCapacityBytes", batchTime)
if !nodeMetricRecord.nil? && !nodeMetricRecord.empty?
nodeMetricRecords.push(nodeMetricRecord)
if is_windows_node
metricVal = JSON.parse(nodeMetricRecord["json_Collections"])[0]["Value"]
@NodeCache.mem.set_capacity(nodeMetricRecord["Host"], metricVal)
end
end
nodeMetricRecords.each do |metricRecord|
kubePerfEventStream.add(emitTime, metricRecord) if metricRecord
end
if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && kubePerfEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE
$log.info("in_kube_nodes::parse_and_emit_records: number of node perf metric records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
kubePerfEventStream = Fluent::MultiEventStream.new
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeNodePerfEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
nodeGPUInsightsMetricsRecords = []
insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "allocatable", "nvidia.com/gpu", "nodeGpuAllocatable", batchTime)
if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty?
nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord)
end
insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "capacity", "nvidia.com/gpu", "nodeGpuCapacity", batchTime)
if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty?
nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord)
end
insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "allocatable", "amd.com/gpu", "nodeGpuAllocatable", batchTime)
if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty?
nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord)
end
insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "capacity", "amd.com/gpu", "nodeGpuCapacity", batchTime)
if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty?
nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord)
end
nodeGPUInsightsMetricsRecords.each do |insightsMetricsRecord|
insightsMetricsEventStream.add(emitTime, insightsMetricsRecord) if insightsMetricsRecord
end
if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && insightsMetricsEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE
$log.info("in_kube_nodes::parse_and_emit_records: number of GPU node perf metric records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@insightsMetricsTag, insightsMetricsEventStream) if insightsMetricsEventStream
insightsMetricsEventStream = Fluent::MultiEventStream.new
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeNodeInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs
timeDifferenceInMinutes = timeDifference / 60
if (timeDifferenceInMinutes >= @TELEMETRY_FLUSH_INTERVAL_IN_MINUTES)
begin
properties = getNodeTelemetryProps(item)
properties["KubernetesProviderID"] = nodeInventoryRecord["KubernetesProviderID"]
capacityInfo = item["status"]["capacity"]
ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties)
begin
if (!capacityInfo["nvidia.com/gpu"].nil?) && (!capacityInfo["nvidia.com/gpu"].empty?)
properties["nvigpus"] = capacityInfo["nvidia.com/gpu"]
end
if (!capacityInfo["amd.com/gpu"].nil?) && (!capacityInfo["amd.com/gpu"].empty?)
properties["amdgpus"] = capacityInfo["amd.com/gpu"]
end
rescue => errorStr
$log.warn "Failed in getting GPU telemetry in_kube_nodes : #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
if (File.file?(@@configMapMountPath))
properties["collectAllKubeEvents"] = @@collectAllKubeEvents
end
if (File.file?(@@promConfigMountPath))
properties["rsPromInt"] = @@rsPromInterval
properties["rsPromFPC"] = @@rsPromFieldPassCount
properties["rsPromFDC"] = @@rsPromFieldDropCount
properties["rsPromServ"] = @@rsPromK8sServiceCount
properties["rsPromUrl"] = @@rsPromUrlCount
properties["rsPromMonPods"] = @@rsPromMonitorPods
properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength
properties["rsPromMonPodsLabelSelectorLength"] = @@rsPromMonitorPodsLabelSelectorLength
properties["rsPromMonPodsFieldSelectorLength"] = @@rsPromMonitorPodsFieldSelectorLength
end
if (File.file?(@@osmConfigMountPath))
properties["osmNamespaceCount"] = @@osmNamespaceCount
end
ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties)
telemetrySent = true
if (File.file?(@@configMapMountPath))
properties["collectAllKubeEvents"] = @@collectAllKubeEvents
end
if (File.file?(@@promConfigMountPath))
properties["rsPromInt"] = @@rsPromInterval
properties["rsPromFPC"] = @@rsPromFieldPassCount
properties["rsPromFDC"] = @@rsPromFieldDropCount
properties["rsPromServ"] = @@rsPromK8sServiceCount
properties["rsPromUrl"] = @@rsPromUrlCount
properties["rsPromMonPods"] = @@rsPromMonitorPods
properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength
properties["rsPromMonPodsLabelSelectorLength"] = @@rsPromMonitorPodsLabelSelectorLength
properties["rsPromMonPodsFieldSelectorLength"] = @@rsPromMonitorPodsFieldSelectorLength
end
if (File.file?(@@osmConfigMountPath))
properties["osmNamespaceCount"] = @@osmNamespaceCount
end
@applicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties)
telemetrySent = true
rescue => errorStr
$log.warn "Failed in getting telemetry in_kube_nodes : #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
@applicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end
end
if telemetrySent == true
@@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
end
if eventStream.count > 0
$log.info("in_kube_node::parse_and_emit_records: number of node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@tag, eventStream) if eventStream
$log.info("in_kube_node::parse_and_emit_records: number of mdm node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@MDMKubeNodeInventoryTag, eventStream) if eventStream
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
eventStream = nil
end
if containerNodeInventoryEventStream.count > 0
$log.info("in_kube_node::parse_and_emit_records: number of container node inventory records emitted #{containerNodeInventoryEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
containerNodeInventoryEventStream = nil
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("containerNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
if kubePerfEventStream.count > 0
$log.info("in_kube_nodes::parse_and_emit_records: number of node perf metric records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
kubePerfEventStream = nil
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeNodePerfInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
if insightsMetricsEventStream.count > 0
$log.info("in_kube_nodes::parse_and_emit_records: number of GPU node perf metric records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}")
router.emit_stream(@insightsMetricsTag, insightsMetricsEventStream) if insightsMetricsEventStream
insightsMetricsEventStream = nil
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
$log.info("kubeNodeInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
end
rescue => errorStr
$log.warn "Failed to retrieve node inventory: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
@applicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
$log.info "in_kube_nodes::parse_and_emit_records:End #{Time.now.utc.iso8601}"
end