in source/plugins/ruby/filter_cadvisor2mdm.rb [294:425]
def filterPVInsightsMetrics(record)
begin
mdmMetrics = []
if record["Name"] == Constants::PV_USED_BYTES && @metrics_to_collect_hash.key?(record["Name"].downcase)
metricName = record["Name"]
usage = record["Value"]
capacity = record["Tags"][Constants::INSIGHTSMETRICS_TAGS_PV_CAPACITY_BYTES]
if capacity != 0
percentage_metric_value = (usage * 100.0) / capacity
end
@log.info "percentage_metric_value for metric: #{metricName} percentage: #{percentage_metric_value}"
@log.info "@@metric_threshold_hash for #{metricName}: #{@@metric_threshold_hash[metricName]}"
computer = record["Computer"]
resourceDimensions = record["Tags"]
thresholdPercentage = @@metric_threshold_hash[metricName]
flushMetricTelemetry
if percentage_metric_value >= thresholdPercentage
setThresholdExceededTelemetry(metricName)
return MdmMetricsGenerator.getPVResourceUtilMetricRecords(record["CollectionTime"],
metricName,
computer,
percentage_metric_value,
resourceDimensions,
thresholdPercentage)
else
return []
end
end
return []
rescue Exception => e
@log.info "Error processing cadvisor insights metrics record Exception: #{e.class} Message: #{e.message}"
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
return []
end
end
def ensure_cpu_memory_capacity_and_allocatable_set
@@controller_type = ENV["CONTROLLER_TYPE"]
if @cpu_capacity != 0.0 && @memory_capacity != 0.0 && @@controller_type.downcase == "replicaset"
@log.info "CPU And Memory Capacity are already set and their values are as follows @cpu_capacity : #{@cpu_capacity}, @memory_capacity: #{@memory_capacity}"
return
end
if @@controller_type.downcase == "daemonset" && @cpu_capacity != 0.0 && @memory_capacity != 0.0 && @cpu_allocatable != 0.0 && @memory_allocatable != 0.0
@log.info "CPU And Memory Capacity are already set and their values are as follows @cpu_capacity : #{@cpu_capacity}, @memory_capacity: #{@memory_capacity}"
@log.info "CPU And Memory Allocatable are already set and their values are as follows @cpu_allocatable : #{@cpu_allocatable}, @memory_allocatable: #{@memory_allocatable}"
return
end
if @@controller_type.downcase == "replicaset"
@log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}"
begin
resourceUri = KubernetesApiClient.getNodesResourceUri("nodes?fieldSelector=metadata.name%3D#{@@hostName}")
nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo(resourceUri).body)
rescue Exception => e
@log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} "
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
end
if !nodeInventory.nil?
cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")
if !cpu_capacity_json.nil?
metricVal = JSON.parse(cpu_capacity_json[0]["json_Collections"])[0]["Value"]
if !metricVal.to_s.nil?
@cpu_capacity = metricVal
@log.info "CPU Limit #{@cpu_capacity}"
end
else
@log.info "Error getting cpu_capacity"
end
memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes")
if !memory_capacity_json.nil?
metricVal = JSON.parse(cpu_capacity_json[0]["json_Collections"])[0]["Value"]
if !metricVal.to_s.nil?
@memory_capacity = metricVal
@log.info "Memory Limit #{@memory_capacity}"
end
else
@log.info "Error getting memory_capacity"
end
end
elsif @@controller_type.downcase == "daemonset"
capacity_from_kubelet = KubeletUtils.get_node_capacity
if !capacity_from_kubelet.nil? && capacity_from_kubelet.length > 1
@cpu_capacity = capacity_from_kubelet[0]
@memory_capacity = capacity_from_kubelet[1]
else
@log.error "Error getting capacity_from_kubelet: cpu_capacity and memory_capacity"
end
allocatable_from_kubelet = KubeletUtils.get_node_allocatable(@cpu_capacity, @memory_capacity)
if !allocatable_from_kubelet.nil? && allocatable_from_kubelet.length > 1
@cpu_allocatable = allocatable_from_kubelet[0]
@memory_allocatable = allocatable_from_kubelet[1]
else
@log.error "Error getting allocatable_from_kubelet: cpu_allocatable and memory_allocatable"
end
end
end
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
begin
ensure_cpu_memory_capacity_and_allocatable_set
if @process_incoming_stream
@containerCpuLimitHash, @containerMemoryLimitHash, @containerResourceDimensionHash = KubeletUtils.get_all_container_limits
end
es.each { |time, record|
filtered_records = filter(tag, time, record)
filtered_records.each { |filtered_record|
new_es.add(time, filtered_record) if filtered_record
} if filtered_records
}
rescue => e
@log.info "Error in filter_stream #{e.message}"
end
new_es
end
end
end