filterPVInsightsMetrics

in source/plugins/ruby/filter_cadvisor2mdm.rb [294:425]


   def filterPVInsightsMetrics(record)
      begin
        mdmMetrics = []
        if record["Name"] == Constants::PV_USED_BYTES && @metrics_to_collect_hash.key?(record["Name"].downcase)
          metricName = record["Name"]
          usage = record["Value"]
          capacity = record["Tags"][Constants::INSIGHTSMETRICS_TAGS_PV_CAPACITY_BYTES]
          if capacity != 0
            percentage_metric_value = (usage * 100.0) / capacity
          end
          @log.info "percentage_metric_value for metric: #{metricName} percentage: #{percentage_metric_value}"
          @log.info "@@metric_threshold_hash for #{metricName}: #{@@metric_threshold_hash[metricName]}"

          computer = record["Computer"]
          resourceDimensions = record["Tags"]
          thresholdPercentage = @@metric_threshold_hash[metricName]

          flushMetricTelemetry
          if percentage_metric_value >= thresholdPercentage
            setThresholdExceededTelemetry(metricName)
            return MdmMetricsGenerator.getPVResourceUtilMetricRecords(record["CollectionTime"],
                                                                      metricName,
                                                                      computer,
                                                                      percentage_metric_value,
                                                                      resourceDimensions,
                                                                      thresholdPercentage)
          else
            return []
          end 
        end 
        return []
      rescue Exception => e
        @log.info "Error processing cadvisor insights metrics record Exception: #{e.class} Message: #{e.message}"
        ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
        return [] 
      end
    end

    def ensure_cpu_memory_capacity_and_allocatable_set
      @@controller_type = ENV["CONTROLLER_TYPE"]

      if @cpu_capacity != 0.0 && @memory_capacity != 0.0 && @@controller_type.downcase == "replicaset"
        @log.info "CPU And Memory Capacity are already set and their values are as follows @cpu_capacity : #{@cpu_capacity}, @memory_capacity: #{@memory_capacity}"
        return
      end

      if @@controller_type.downcase == "daemonset" && @cpu_capacity != 0.0 && @memory_capacity != 0.0 && @cpu_allocatable != 0.0 && @memory_allocatable != 0.0
        @log.info "CPU And Memory Capacity are already set and their values are as follows @cpu_capacity : #{@cpu_capacity}, @memory_capacity: #{@memory_capacity}"
        @log.info "CPU And Memory Allocatable are already set and their values are as follows @cpu_allocatable : #{@cpu_allocatable}, @memory_allocatable: #{@memory_allocatable}"
        return
      end

      if @@controller_type.downcase == "replicaset"
        @log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}"

        begin
          resourceUri = KubernetesApiClient.getNodesResourceUri("nodes?fieldSelector=metadata.name%3D#{@@hostName}")
          nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo(resourceUri).body)
        rescue Exception => e
          @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} "
          ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
        end
        if !nodeInventory.nil?
          cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")
          if !cpu_capacity_json.nil? 
             metricVal = JSON.parse(cpu_capacity_json[0]["json_Collections"])[0]["Value"]
            if !metricVal.to_s.nil?
              @cpu_capacity = metricVal
              @log.info "CPU Limit #{@cpu_capacity}"
            end
          else
            @log.info "Error getting cpu_capacity"
          end
          memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes")
          if !memory_capacity_json.nil? 
            metricVal = JSON.parse(cpu_capacity_json[0]["json_Collections"])[0]["Value"]          
            if !metricVal.to_s.nil?
              @memory_capacity = metricVal
              @log.info "Memory Limit #{@memory_capacity}"
            end
          else
            @log.info "Error getting memory_capacity"
          end
        end
      elsif @@controller_type.downcase == "daemonset"
        capacity_from_kubelet = KubeletUtils.get_node_capacity

        
        if !capacity_from_kubelet.nil? && capacity_from_kubelet.length > 1
          @cpu_capacity = capacity_from_kubelet[0]
          @memory_capacity = capacity_from_kubelet[1]
        else
          
          @log.error "Error getting capacity_from_kubelet: cpu_capacity and memory_capacity"
        end

        allocatable_from_kubelet = KubeletUtils.get_node_allocatable(@cpu_capacity, @memory_capacity)

        
        if !allocatable_from_kubelet.nil? && allocatable_from_kubelet.length > 1
          @cpu_allocatable = allocatable_from_kubelet[0]
          @memory_allocatable = allocatable_from_kubelet[1]
        else
          
          @log.error "Error getting allocatable_from_kubelet: cpu_allocatable and memory_allocatable"
        end
      end
    end

    def filter_stream(tag, es)
      new_es = Fluent::MultiEventStream.new
      begin
        ensure_cpu_memory_capacity_and_allocatable_set
        
        if @process_incoming_stream
          @containerCpuLimitHash, @containerMemoryLimitHash, @containerResourceDimensionHash = KubeletUtils.get_all_container_limits
        end

        es.each { |time, record|
          filtered_records = filter(tag, time, record)
          filtered_records.each { |filtered_record|
            new_es.add(time, filtered_record) if filtered_record
          } if filtered_records
        }
      rescue => e
        @log.info "Error in filter_stream #{e.message}"
      end
      new_es
    end
  end
end