filter_stream

in source/plugins/ruby/filter_health_model_builder.rb [86:284]


        def filter_stream(tag, es)
            if !@@cluster_health_model_enabled
                @log.info "Cluster Health Model disabled in filter_health_model_builder"
                return Fluent::MultiEventStream.new
            end
            begin
                new_es = Fluent::MultiEventStream.new
                time = Time.now
                if ExtensionUtils.isAADMSIAuthMode()
                    $log.info("filter_health_model_builder::enumerate: AAD AUTH MSI MODE")
                    if @rewrite_tag.nil? || !@rewrite_tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
                      @rewrite_tag = ExtensionUtils.getOutputStreamId(Constants::KUBE_HEALTH_DATA_TYPE)
                    end
                    $log.info("filter_health_model_builder::filter_stream: using tag -#{@rewrite_tag} @ #{Time.now.utc.iso8601}")
                end

                if tag.start_with?("kubehealth.DaemonSet.Node")
                    node_records = []
                    if !es.nil?
                        es.each{|time, record|
                            node_records.push(record)
                        }
                        @buffer.add_to_buffer(node_records)
                    end
                    return Fluent::MultiEventStream.new
                elsif tag.start_with?("kubehealth.DaemonSet.Container")
                    container_records = []
                    if !es.nil?
                        es.each{|time, record|
                            container_records.push(record)
                        }
                    end
                    container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider)
                    if @container_cpu_memory_records.nil?
                        @log.info "@container_cpu_memory_records was not initialized"
                        @container_cpu_memory_records = [] 
                    end
                    @container_cpu_memory_records.push(*container_records) 
                    return Fluent::MultiEventStream.new
                elsif tag.start_with?("kubehealth.ReplicaSet")
                    records = []
                    es.each{|time, record|
                        records.push(record)
                    }
                    @buffer.add_to_buffer(records) 

                    aggregated_container_records = []
                    if !@container_cpu_memory_records.nil? && !@container_cpu_memory_records.empty?
                        container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider)
                        deduped_records = container_records_aggregator.dedupe_records(@container_cpu_memory_records)
                        container_records_aggregator.aggregate(deduped_records)
                        container_records_aggregator.compute_state
                        aggregated_container_records = container_records_aggregator.get_records
                    end
                    @buffer.add_to_buffer(aggregated_container_records) 
                    records_to_process = @buffer.get_buffer
                    @buffer.reset_buffer
                    @container_cpu_memory_records = []

                    health_monitor_records = []
                    records_to_process.each do |record|
                        monitor_instance_id = record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID]
                        monitor_id = record[HealthMonitorRecordFields::MONITOR_ID]
                        
                        health_monitor_record = HealthMonitorRecord.new(
                            record[HealthMonitorRecordFields::MONITOR_ID],
                            record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID],
                            record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED],
                            record[HealthMonitorRecordFields::DETAILS]["state"],
                            @provider.get_labels(record),
                            @provider.get_config(monitor_id),
                            record[HealthMonitorRecordFields::DETAILS]
                        )
                        health_monitor_records.push(health_monitor_record)
                        
                    end

                    @log.info "health_monitor_records.size #{health_monitor_records.size}"
                    
                    
                    
                    reduced_records = @reducer.reduce_signals(health_monitor_records, @resources)
                    reduced_records.each{|record|
                        @state.update_state(record,
                            @provider.get_config(record.monitor_id),
                            false,
                            @telemetry
                            )
                        
                        
                        record.state = @state.get_state(record.monitor_instance_id).new_state
                    }
                    @log.info "after deduping and removing gone objects reduced_records.size #{reduced_records.size}"

                    reduced_records = @kube_api_down_handler.handle_kube_api_down(reduced_records)
                    @log.info "after kube api down handler health_monitor_records.size #{health_monitor_records.size}"

                    
                    missing_signals = @generator.get_missing_signals(@@cluster_id, reduced_records, @resources, @provider)

                    @log.info "after getting missing signals missing_signals.size #{missing_signals.size}"
                    
                    missing_signals.each{|signal|

                        @state.update_state(signal, @provider.get_config(signal.monitor_id), false, @telemetry)
                        @log.info "After Updating #{@state.get_state(signal.monitor_instance_id)} #{@state.get_state(signal.monitor_instance_id).new_state}"
                        
                        signal.state = @state.get_state(signal.monitor_instance_id).new_state
                    }

                    @generator.update_last_received_records(reduced_records)
                    all_records = reduced_records.clone
                    all_records.push(*missing_signals)

                    @log.info "after Adding missing signals all_records.size #{all_records.size}"

                    HealthMonitorHelpers.add_agentpool_node_label_if_not_present(all_records)

                    
                    @model_builder.process_records(all_records)
                    all_monitors = @model_builder.finalize_model

                    @log.info "after building health_model #{all_monitors.size}"

                    
                    all_monitors.each{|monitor_instance_id, monitor|
                        if monitor.is_aggregate_monitor
                            @state.update_state(monitor,
                                @provider.get_config(monitor.monitor_id),
                                true,
                                @telemetry
                                )
                        end

                        instance_state = @state.get_state(monitor_instance_id)
                        
                        should_send = instance_state.should_send

                        
                        if !should_send && monitor_instance_id != MonitorId::CLUSTER
                            all_monitors.delete(monitor_instance_id)
                        end
                    }

                    @log.info "after optimizing health signals all_monitors.size #{all_monitors.size}"


                    
                    
                    
                    emit_time = Fluent::Engine.now
                    all_monitors.keys.each{|key|
                        record = @provider.get_record(all_monitors[key], state)
                        if record[HealthMonitorRecordFields::MONITOR_ID] == MonitorId::CLUSTER
                            if !record[HealthMonitorRecordFields::DETAILS].nil?
                                details = JSON.parse(record[HealthMonitorRecordFields::DETAILS])
                                details[HealthMonitorRecordFields::HEALTH_MODEL_DEFINITION_VERSION] = "#{ENV['HEALTH_MODEL_DEFINITION_VERSION']}"
                                record[HealthMonitorRecordFields::DETAILS] = details.to_json
                            end
                            if all_monitors.size > 1
                                old_state = record[HealthMonitorRecordFields::OLD_STATE]
                                new_state = record[HealthMonitorRecordFields::NEW_STATE]
                                if old_state != new_state && @cluster_old_state != old_state && @cluster_new_state != new_state
                                        ApplicationInsightsUtility.sendCustomEvent("HealthModel_ClusterStateChanged",{"old_state" => old_state , "new_state" => new_state, "monitor_count" => all_monitors.size})
                                        @log.info "sent telemetry for cluster state change from #{record['OldState']} to #{record['NewState']}"
                                        @cluster_old_state = old_state
                                        @cluster_new_state = new_state
                                end
                            end
                        end
                        new_es.add(emit_time, record)
                    }

                    
                    router.emit_stream(@rewrite_tag, new_es)

                    
                    @monitor_set = HealthModel::MonitorSet.new
                    @model_builder = HealthModel::HealthModelBuilder.new(@hierarchy_builder, @state_finalizers, @monitor_set)

                    
                    @cluster_health_state.update_state(@state.to_h)
                    @telemetry.send
                    
                    return Fluent::MultiEventStream.new
                elsif tag.start_with?(@rewrite_tag)
                    
                    es
                else
                    raise "Invalid tag #{tag} received"
                end

            rescue => e
                 ApplicationInsightsUtility.sendExceptionTelemetry(e, {"FeatureArea" => "Health"})
                 @log.warn "Message: #{e.message} Backtrace: #{e.backtrace}"
                 return nil
            end
        end