enumerate

in source/plugins/ruby/in_kubestate_deployments.rb [77:211]


    def enumerate
      begin
        deploymentList = nil
        currentTime = Time.now
        batchTime = currentTime.utc.iso8601

        
        @deploymentsRunningTotal = 0

        if ExtensionUtils.isAADMSIAuthMode()
          $log.info("in_kubestate_deployments::enumerate: AAD AUTH MSI MODE")
          if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
            @tag = ExtensionUtils.getOutputStreamId(Constants::INSIGHTS_METRICS_DATA_TYPE)
          end
        end
        
        continuationToken = nil
        $log.info("in_kubestate_deployments::enumerate : Getting deployments from Kube API @ #{Time.now.utc.iso8601}")
        continuationToken, deploymentList = KubernetesApiClient.getResourcesAndContinuationToken("deployments?limit=#{@DEPLOYMENTS_CHUNK_SIZE}", api_group: @DEPLOYMENTS_API_GROUP)
        $log.info("in_kubestate_deployments::enumerate : Done getting deployments from Kube API @ #{Time.now.utc.iso8601}")
        if (!deploymentList.nil? && !deploymentList.empty? && deploymentList.key?("items") && !deploymentList["items"].nil? && !deploymentList["items"].empty?)
          $log.info("in_kubestate_deployments::enumerate : number of deployment items :#{deploymentList["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
          parse_and_emit_records(deploymentList, batchTime)
        else
          $log.warn "in_kubestate_deployments::enumerate:Received empty deploymentList"
        end

        
        while (!continuationToken.nil? && !continuationToken.empty?)
          continuationToken, deploymentList = KubernetesApiClient.getResourcesAndContinuationToken("deployments?limit=#{@DEPLOYMENTS_CHUNK_SIZE}&continue=#{continuationToken}", api_group: @DEPLOYMENTS_API_GROUP)
          if (!deploymentList.nil? && !deploymentList.empty? && deploymentList.key?("items") && !deploymentList["items"].nil? && !deploymentList["items"].empty?)
            $log.info("in_kubestate_deployments::enumerate : number of deployment items :#{deploymentList["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
            parse_and_emit_records(deploymentList, batchTime)
          else
            $log.warn "in_kubestate_deployments::enumerate:Received empty deploymentList"
          end
        end

        
        deploymentList = nil

        $log.info("successfully emitted a total of #{@deploymentsRunningTotal} kube_state_deployment metrics")
        
        if (@deploymentsRunningTotal > @@deploymentsCount)
          @@deploymentsCount = @deploymentsRunningTotal
        end
        if (((DateTime.now.to_time.to_i - @@telemetryLastSentTime).abs) / 60) >= Constants::KUBE_STATE_TELEMETRY_FLUSH_INTERVAL_IN_MINUTES
          
          $log.info "sending deployemt telemetry..."
          ApplicationInsightsUtility.sendMetricTelemetry("MaxDeploymentCount", @@deploymentsCount, {})
          
          @@deploymentsCount = 0
          @@telemetryLastSentTime = DateTime.now.to_time.to_i
        end
      rescue => errorStr
        $log.warn "in_kubestate_deployments::enumerate:Failed in enumerate: #{errorStr}"
        ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::enumerate:Failed in enumerate: #{errorStr}")
      end
    end 

    def parse_and_emit_records(deployments, batchTime = Time.utc.iso8601)
      metricItems = []
      insightsMetricsEventStream = Fluent::MultiEventStream.new
      begin
        metricInfo = deployments
        metricInfo["items"].each do |deployment|
          deploymentName = deployment["metadata"]["name"]
          deploymentNameSpace = deployment["metadata"]["namespace"]
          deploymentCreatedTime = ""
          if !deployment["metadata"]["creationTimestamp"].nil?
            deploymentCreatedTime = deployment["metadata"]["creationTimestamp"]
          end
          deploymentStrategy = "RollingUpdate" 
          if !deployment["spec"]["strategy"].nil? && !deployment["spec"]["strategy"]["type"].nil?
            deploymentStrategy = deployment["spec"]["strategy"]["type"]
          end
          deploymentSpecReplicas = 1 
          if !deployment["spec"]["replicas"].nil?
            deploymentSpecReplicas = deployment["spec"]["replicas"]
          end
          deploymentStatusReadyReplicas = 0
          if !deployment["status"]["readyReplicas"].nil?
            deploymentStatusReadyReplicas = deployment["status"]["readyReplicas"]
          end
          deploymentStatusUpToDateReplicas = 0
          if !deployment["status"]["updatedReplicas"].nil?
            deploymentStatusUpToDateReplicas = deployment["status"]["updatedReplicas"]
          end
          deploymentStatusAvailableReplicas = 0
          if !deployment["status"]["availableReplicas"].nil?
            deploymentStatusAvailableReplicas = deployment["status"]["availableReplicas"]
          end

          metricItem = {}
          metricItem["CollectionTime"] = batchTime
          metricItem["Computer"] = @NodeName
          metricItem["Name"] = Constants::INSIGHTSMETRICS_METRIC_NAME_KUBE_STATE_DEPLOYMENT_STATE
          metricItem["Value"] = deploymentStatusReadyReplicas
          metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN
          metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_KUBESTATE_NAMESPACE

          metricTags = {}
          metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = @ClusterId
          metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = @ClusterName
          metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_NAME] = deploymentName
          metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = deploymentNameSpace
          metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STRATEGY] = deploymentStrategy
          metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_CREATIONTIME] = deploymentCreatedTime
          metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_SPEC_REPLICAS] = deploymentSpecReplicas
          metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STATUS_REPLICAS_UPDATED] = deploymentStatusUpToDateReplicas
          metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STATUS_REPLICAS_AVAILABLE] = deploymentStatusAvailableReplicas

          metricItem["Tags"] = metricTags

          metricItems.push(metricItem)
        end

        time = Fluent::Engine.now
        metricItems.each do |insightsMetricsRecord|
          insightsMetricsEventStream.add(time, insightsMetricsRecord) if insightsMetricsRecord
        end

        router.emit_stream(@tag, insightsMetricsEventStream) if insightsMetricsEventStream
        $log.info("successfully emitted #{metricItems.length()} kube_state_deployment metrics")

        @deploymentsRunningTotal = @deploymentsRunningTotal + metricItems.length()
        if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0)
          $log.info("kubestatedeploymentsInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
        end
      rescue => error
        $log.warn("in_kubestate_deployments::parse_and_emit_records failed: #{error} ")
        ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::parse_and_emit_records failed: #{error}")
      end
    end