parse_and_emit_records

in source/plugins/ruby/in_kube_podinventory.rb [248:440]


    def parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime = Time.utc.iso8601)
      currentTime = Time.now
      emitTime = Fluent::Engine.now
      
      eventStream = Fluent::MultiEventStream.new
      containerInventoryStream = Fluent::MultiEventStream.new
      kubePerfEventStream = Fluent::MultiEventStream.new
      insightsMetricsEventStream = Fluent::MultiEventStream.new
      @@istestvar = ENV["ISTEST"]

      begin 
        
        winNodes = KubernetesApiClient.getWindowsNodesArray
        podInventory["items"].each do |item| 
          
          podInventoryRecords = getPodInventoryRecords(item, serviceRecords, batchTime)
          @containerCount += podInventoryRecords.length
          podInventoryRecords.each do |record|
            if !record.nil?
              eventStream.add(emitTime, record) if record
              @inventoryToMdmConvertor.process_pod_inventory_record(record)
            end
          end
          
          
          if winNodes.length > 0
            nodeName = ""
            if !item["spec"]["nodeName"].nil?
              nodeName = item["spec"]["nodeName"]
            end
            @windowsNodeCount = winNodes.length
            if (!nodeName.empty? && (winNodes.include? nodeName))
              clusterCollectEnvironmentVar = ENV["AZMON_CLUSTER_COLLECT_ENV_VAR"]
              
              containerInventoryRecords = KubernetesContainerInventory.getContainerInventoryRecords(item, batchTime, clusterCollectEnvironmentVar, true)
              
              @winContainerCount += containerInventoryRecords.length
              containerInventoryRecords.each do |cirecord|
                if !cirecord.nil?
                  containerInventoryStream.add(emitTime, cirecord) if cirecord
                  ciRecordSize = cirecord.to_s.length
                  @winContainerInventoryTotalSizeBytes += ciRecordSize
                  if ciRecordSize >= Constants::MAX_RECORD_OR_FIELD_SIZE_FOR_TELEMETRY
                    @winContainerCountWithInventoryRecordSize64KBOrMore += 1
                  end
                  if !cirecord["EnvironmentVar"].nil? && !cirecord["EnvironmentVar"].empty? && cirecord["EnvironmentVar"].length >= Constants::MAX_RECORD_OR_FIELD_SIZE_FOR_TELEMETRY
                    @winContainerCountWithEnvVarSize64KBOrMore += 1
                  end
                  if !cirecord["Ports"].nil? && !cirecord["Ports"].empty? && cirecord["Ports"].length >= Constants::MAX_RECORD_OR_FIELD_SIZE_FOR_TELEMETRY
                    @winContainerCountWithPortsSize64KBOrMore += 1
                  end
                  if !cirecord["Command"].nil? && !cirecord["Command"].empty? && cirecord["Command"].length >= Constants::MAX_RECORD_OR_FIELD_SIZE_FOR_TELEMETRY
                    @winContainerCountWithCommandSize64KBOrMore += 1
                  end
                end
              end
            end
          end

          if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && eventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE
            $log.info("in_kube_podinventory::parse_and_emit_records: number of pod inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
            if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
              $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
            end
            router.emit_stream(@tag, eventStream) if eventStream
            eventStream = Fluent::MultiEventStream.new
          end

          
          containerMetricDataItems = []
          containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "requests", "cpu", "cpuRequestNanoCores", batchTime))
          containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "requests", "memory", "memoryRequestBytes", batchTime))
          containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "limits", "cpu", "cpuLimitNanoCores", batchTime))
          containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "limits", "memory", "memoryLimitBytes", batchTime))

          containerMetricDataItems.each do |record|
            kubePerfEventStream.add(emitTime, record) if record
          end

          if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && kubePerfEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE
            $log.info("in_kube_podinventory::parse_and_emit_records: number of container perf records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}")
            router.emit_stream(@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
            if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
              $log.info("kubeContainerPerfEventEmitStreamSuccess @ #{Time.now.utc.iso8601}")
            end
            kubePerfEventStream = Fluent::MultiEventStream.new
          end

          
          containerGPUInsightsMetricsDataItems = []
          containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "requests", "nvidia.com/gpu", "containerGpuRequests", batchTime))
          containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "limits", "nvidia.com/gpu", "containerGpuLimits", batchTime))
          containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "requests", "amd.com/gpu", "containerGpuRequests", batchTime))
          containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "limits", "amd.com/gpu", "containerGpuLimits", batchTime))
          containerGPUInsightsMetricsDataItems.each do |insightsMetricsRecord|
            insightsMetricsEventStream.add(emitTime, insightsMetricsRecord) if insightsMetricsRecord
          end

          if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && insightsMetricsEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE
            $log.info("in_kube_podinventory::parse_and_emit_records: number of GPU insights metrics records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}")
            if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
              $log.info("kubePodInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
            end
            router.emit_stream(@insightsMetricsTag, insightsMetricsEventStream) if insightsMetricsEventStream
            insightsMetricsEventStream = Fluent::MultiEventStream.new
          end
        end  

        if eventStream.count > 0
          $log.info("in_kube_podinventory::parse_and_emit_records: number of pod inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@tag, eventStream) if eventStream
          if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
            $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
          end
          eventStream = nil
        end

        if containerInventoryStream.count > 0
          $log.info("in_kube_podinventory::parse_and_emit_records: number of windows container inventory records emitted #{containerInventoryStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@containerInventoryTag, containerInventoryStream) if containerInventoryStream
          if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
            $log.info("kubeWindowsContainerInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
          end
          containerInventoryStream = nil
        end

        if kubePerfEventStream.count > 0
          $log.info("in_kube_podinventory::parse_and_emit_records: number of perf records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
          kubePerfEventStream = nil
          if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
            $log.info("kubeContainerPerfEventEmitStreamSuccess @ #{Time.now.utc.iso8601}")
          end
        end

        if insightsMetricsEventStream.count > 0
          $log.info("in_kube_podinventory::parse_and_emit_records: number of insights metrics records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}")
          router.emit_stream(@insightsMetricsTag, insightsMetricsEventStream) if insightsMetricsEventStream
          if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
            $log.info("kubePodInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
          end
          insightsMetricsEventStream = nil
        end

        if continuationToken.nil? 
          @log.info "Sending pod inventory mdm records to out_mdm"
          pod_inventory_mdm_records = @inventoryToMdmConvertor.get_pod_inventory_mdm_records(batchTime)
          @log.info "pod_inventory_mdm_records.size #{pod_inventory_mdm_records.size}"
          mdm_pod_inventory_es = Fluent::MultiEventStream.new
          pod_inventory_mdm_records.each { |pod_inventory_mdm_record|
            mdm_pod_inventory_es.add(batchTime, pod_inventory_mdm_record) if pod_inventory_mdm_record
          } if pod_inventory_mdm_records
          router.emit_stream(@@MDMKubePodInventoryTag, mdm_pod_inventory_es) if mdm_pod_inventory_es
        end

        if continuationToken.nil? 
          kubeServicesEventStream = Fluent::MultiEventStream.new
          serviceRecords.each do |kubeServiceRecord|
            if !kubeServiceRecord.nil?
              
              kubeServiceRecord["ClusterId"] = KubernetesApiClient.getClusterId
              kubeServiceRecord["ClusterName"] = KubernetesApiClient.getClusterName
              kubeServicesEventStream.add(emitTime, kubeServiceRecord) if kubeServiceRecord
              if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && kubeServicesEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE
                $log.info("in_kube_podinventory::parse_and_emit_records: number of service records emitted #{kubeServicesEventStream.count} @ #{Time.now.utc.iso8601}")
                router.emit_stream(@kubeservicesTag, kubeServicesEventStream) if kubeServicesEventStream
                kubeServicesEventStream = Fluent::MultiEventStream.new
                if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
                  $log.info("kubeServicesEventEmitStreamSuccess @ #{Time.now.utc.iso8601}")
                end
              end
            end
          end

          if kubeServicesEventStream.count > 0
            $log.info("in_kube_podinventory::parse_and_emit_records : number of service records emitted #{kubeServicesEventStream.count} @ #{Time.now.utc.iso8601}")
            router.emit_stream(@kubeservicesTag, kubeServicesEventStream) if kubeServicesEventStream
            if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0)
              $log.info("kubeServicesEventEmitStreamSuccess @ #{Time.now.utc.iso8601}")
            end
          end
          kubeServicesEventStream = nil
        end

        
        @podCount += podInventory["items"].length
      rescue => errorStr
        $log.warn "Failed in parse_and_emit_record pod inventory: #{errorStr}"
        $log.debug_backtrace(errorStr.backtrace)
        ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
      end 
    end