in source/plugins/ruby/in_kubestate_deployments.rb [77:211]
def enumerate
begin
deploymentList = nil
currentTime = Time.now
batchTime = currentTime.utc.iso8601
@deploymentsRunningTotal = 0
if ExtensionUtils.isAADMSIAuthMode()
$log.info("in_kubestate_deployments::enumerate: AAD AUTH MSI MODE")
if @tag.nil? || !@tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@tag = ExtensionUtils.getOutputStreamId(Constants::INSIGHTS_METRICS_DATA_TYPE)
end
end
continuationToken = nil
$log.info("in_kubestate_deployments::enumerate : Getting deployments from Kube API @ #{Time.now.utc.iso8601}")
continuationToken, deploymentList = KubernetesApiClient.getResourcesAndContinuationToken("deployments?limit=#{@DEPLOYMENTS_CHUNK_SIZE}", api_group: @DEPLOYMENTS_API_GROUP)
$log.info("in_kubestate_deployments::enumerate : Done getting deployments from Kube API @ #{Time.now.utc.iso8601}")
if (!deploymentList.nil? && !deploymentList.empty? && deploymentList.key?("items") && !deploymentList["items"].nil? && !deploymentList["items"].empty?)
$log.info("in_kubestate_deployments::enumerate : number of deployment items :#{deploymentList["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
parse_and_emit_records(deploymentList, batchTime)
else
$log.warn "in_kubestate_deployments::enumerate:Received empty deploymentList"
end
while (!continuationToken.nil? && !continuationToken.empty?)
continuationToken, deploymentList = KubernetesApiClient.getResourcesAndContinuationToken("deployments?limit=#{@DEPLOYMENTS_CHUNK_SIZE}&continue=#{continuationToken}", api_group: @DEPLOYMENTS_API_GROUP)
if (!deploymentList.nil? && !deploymentList.empty? && deploymentList.key?("items") && !deploymentList["items"].nil? && !deploymentList["items"].empty?)
$log.info("in_kubestate_deployments::enumerate : number of deployment items :#{deploymentList["items"].length} from Kube API @ #{Time.now.utc.iso8601}")
parse_and_emit_records(deploymentList, batchTime)
else
$log.warn "in_kubestate_deployments::enumerate:Received empty deploymentList"
end
end
deploymentList = nil
$log.info("successfully emitted a total of #{@deploymentsRunningTotal} kube_state_deployment metrics")
if (@deploymentsRunningTotal > @@deploymentsCount)
@@deploymentsCount = @deploymentsRunningTotal
end
if (((DateTime.now.to_time.to_i - @@telemetryLastSentTime).abs) / 60) >= Constants::KUBE_STATE_TELEMETRY_FLUSH_INTERVAL_IN_MINUTES
$log.info "sending deployemt telemetry..."
ApplicationInsightsUtility.sendMetricTelemetry("MaxDeploymentCount", @@deploymentsCount, {})
@@deploymentsCount = 0
@@telemetryLastSentTime = DateTime.now.to_time.to_i
end
rescue => errorStr
$log.warn "in_kubestate_deployments::enumerate:Failed in enumerate: #{errorStr}"
ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::enumerate:Failed in enumerate: #{errorStr}")
end
end
def parse_and_emit_records(deployments, batchTime = Time.utc.iso8601)
metricItems = []
insightsMetricsEventStream = Fluent::MultiEventStream.new
begin
metricInfo = deployments
metricInfo["items"].each do |deployment|
deploymentName = deployment["metadata"]["name"]
deploymentNameSpace = deployment["metadata"]["namespace"]
deploymentCreatedTime = ""
if !deployment["metadata"]["creationTimestamp"].nil?
deploymentCreatedTime = deployment["metadata"]["creationTimestamp"]
end
deploymentStrategy = "RollingUpdate"
if !deployment["spec"]["strategy"].nil? && !deployment["spec"]["strategy"]["type"].nil?
deploymentStrategy = deployment["spec"]["strategy"]["type"]
end
deploymentSpecReplicas = 1
if !deployment["spec"]["replicas"].nil?
deploymentSpecReplicas = deployment["spec"]["replicas"]
end
deploymentStatusReadyReplicas = 0
if !deployment["status"]["readyReplicas"].nil?
deploymentStatusReadyReplicas = deployment["status"]["readyReplicas"]
end
deploymentStatusUpToDateReplicas = 0
if !deployment["status"]["updatedReplicas"].nil?
deploymentStatusUpToDateReplicas = deployment["status"]["updatedReplicas"]
end
deploymentStatusAvailableReplicas = 0
if !deployment["status"]["availableReplicas"].nil?
deploymentStatusAvailableReplicas = deployment["status"]["availableReplicas"]
end
metricItem = {}
metricItem["CollectionTime"] = batchTime
metricItem["Computer"] = @NodeName
metricItem["Name"] = Constants::INSIGHTSMETRICS_METRIC_NAME_KUBE_STATE_DEPLOYMENT_STATE
metricItem["Value"] = deploymentStatusReadyReplicas
metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN
metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_KUBESTATE_NAMESPACE
metricTags = {}
metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = @ClusterId
metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = @ClusterName
metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_NAME] = deploymentName
metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = deploymentNameSpace
metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STRATEGY] = deploymentStrategy
metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_CREATIONTIME] = deploymentCreatedTime
metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_SPEC_REPLICAS] = deploymentSpecReplicas
metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STATUS_REPLICAS_UPDATED] = deploymentStatusUpToDateReplicas
metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STATUS_REPLICAS_AVAILABLE] = deploymentStatusAvailableReplicas
metricItem["Tags"] = metricTags
metricItems.push(metricItem)
end
time = Fluent::Engine.now
metricItems.each do |insightsMetricsRecord|
insightsMetricsEventStream.add(time, insightsMetricsRecord) if insightsMetricsRecord
end
router.emit_stream(@tag, insightsMetricsEventStream) if insightsMetricsEventStream
$log.info("successfully emitted #{metricItems.length()} kube_state_deployment metrics")
@deploymentsRunningTotal = @deploymentsRunningTotal + metricItems.length()
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0)
$log.info("kubestatedeploymentsInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}")
end
rescue => error
$log.warn("in_kubestate_deployments::parse_and_emit_records failed: #{error} ")
ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::parse_and_emit_records failed: #{error}")
end
end