in source/plugins/ruby/filter_health_model_builder.rb [86:284]
def filter_stream(tag, es)
if !@@cluster_health_model_enabled
@log.info "Cluster Health Model disabled in filter_health_model_builder"
return Fluent::MultiEventStream.new
end
begin
new_es = Fluent::MultiEventStream.new
time = Time.now
if ExtensionUtils.isAADMSIAuthMode()
$log.info("filter_health_model_builder::enumerate: AAD AUTH MSI MODE")
if @rewrite_tag.nil? || !@rewrite_tag.start_with?(Constants::EXTENSION_OUTPUT_STREAM_ID_TAG_PREFIX)
@rewrite_tag = ExtensionUtils.getOutputStreamId(Constants::KUBE_HEALTH_DATA_TYPE)
end
$log.info("filter_health_model_builder::filter_stream: using tag -#{@rewrite_tag} @ #{Time.now.utc.iso8601}")
end
if tag.start_with?("kubehealth.DaemonSet.Node")
node_records = []
if !es.nil?
es.each{|time, record|
node_records.push(record)
}
@buffer.add_to_buffer(node_records)
end
return Fluent::MultiEventStream.new
elsif tag.start_with?("kubehealth.DaemonSet.Container")
container_records = []
if !es.nil?
es.each{|time, record|
container_records.push(record)
}
end
container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider)
if @container_cpu_memory_records.nil?
@log.info "@container_cpu_memory_records was not initialized"
@container_cpu_memory_records = []
end
@container_cpu_memory_records.push(*container_records)
return Fluent::MultiEventStream.new
elsif tag.start_with?("kubehealth.ReplicaSet")
records = []
es.each{|time, record|
records.push(record)
}
@buffer.add_to_buffer(records)
aggregated_container_records = []
if !@container_cpu_memory_records.nil? && !@container_cpu_memory_records.empty?
container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider)
deduped_records = container_records_aggregator.dedupe_records(@container_cpu_memory_records)
container_records_aggregator.aggregate(deduped_records)
container_records_aggregator.compute_state
aggregated_container_records = container_records_aggregator.get_records
end
@buffer.add_to_buffer(aggregated_container_records)
records_to_process = @buffer.get_buffer
@buffer.reset_buffer
@container_cpu_memory_records = []
health_monitor_records = []
records_to_process.each do |record|
monitor_instance_id = record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID]
monitor_id = record[HealthMonitorRecordFields::MONITOR_ID]
health_monitor_record = HealthMonitorRecord.new(
record[HealthMonitorRecordFields::MONITOR_ID],
record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID],
record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED],
record[HealthMonitorRecordFields::DETAILS]["state"],
@provider.get_labels(record),
@provider.get_config(monitor_id),
record[HealthMonitorRecordFields::DETAILS]
)
health_monitor_records.push(health_monitor_record)
end
@log.info "health_monitor_records.size #{health_monitor_records.size}"
reduced_records = @reducer.reduce_signals(health_monitor_records, @resources)
reduced_records.each{|record|
@state.update_state(record,
@provider.get_config(record.monitor_id),
false,
@telemetry
)
record.state = @state.get_state(record.monitor_instance_id).new_state
}
@log.info "after deduping and removing gone objects reduced_records.size #{reduced_records.size}"
reduced_records = @kube_api_down_handler.handle_kube_api_down(reduced_records)
@log.info "after kube api down handler health_monitor_records.size #{health_monitor_records.size}"
missing_signals = @generator.get_missing_signals(@@cluster_id, reduced_records, @resources, @provider)
@log.info "after getting missing signals missing_signals.size #{missing_signals.size}"
missing_signals.each{|signal|
@state.update_state(signal, @provider.get_config(signal.monitor_id), false, @telemetry)
@log.info "After Updating #{@state.get_state(signal.monitor_instance_id)} #{@state.get_state(signal.monitor_instance_id).new_state}"
signal.state = @state.get_state(signal.monitor_instance_id).new_state
}
@generator.update_last_received_records(reduced_records)
all_records = reduced_records.clone
all_records.push(*missing_signals)
@log.info "after Adding missing signals all_records.size #{all_records.size}"
HealthMonitorHelpers.add_agentpool_node_label_if_not_present(all_records)
@model_builder.process_records(all_records)
all_monitors = @model_builder.finalize_model
@log.info "after building health_model #{all_monitors.size}"
all_monitors.each{|monitor_instance_id, monitor|
if monitor.is_aggregate_monitor
@state.update_state(monitor,
@provider.get_config(monitor.monitor_id),
true,
@telemetry
)
end
instance_state = @state.get_state(monitor_instance_id)
should_send = instance_state.should_send
if !should_send && monitor_instance_id != MonitorId::CLUSTER
all_monitors.delete(monitor_instance_id)
end
}
@log.info "after optimizing health signals all_monitors.size #{all_monitors.size}"
emit_time = Fluent::Engine.now
all_monitors.keys.each{|key|
record = @provider.get_record(all_monitors[key], state)
if record[HealthMonitorRecordFields::MONITOR_ID] == MonitorId::CLUSTER
if !record[HealthMonitorRecordFields::DETAILS].nil?
details = JSON.parse(record[HealthMonitorRecordFields::DETAILS])
details[HealthMonitorRecordFields::HEALTH_MODEL_DEFINITION_VERSION] = "#{ENV['HEALTH_MODEL_DEFINITION_VERSION']}"
record[HealthMonitorRecordFields::DETAILS] = details.to_json
end
if all_monitors.size > 1
old_state = record[HealthMonitorRecordFields::OLD_STATE]
new_state = record[HealthMonitorRecordFields::NEW_STATE]
if old_state != new_state && @cluster_old_state != old_state && @cluster_new_state != new_state
ApplicationInsightsUtility.sendCustomEvent("HealthModel_ClusterStateChanged",{"old_state" => old_state , "new_state" => new_state, "monitor_count" => all_monitors.size})
@log.info "sent telemetry for cluster state change from #{record['OldState']} to #{record['NewState']}"
@cluster_old_state = old_state
@cluster_new_state = new_state
end
end
end
new_es.add(emit_time, record)
}
router.emit_stream(@rewrite_tag, new_es)
@monitor_set = HealthModel::MonitorSet.new
@model_builder = HealthModel::HealthModelBuilder.new(@hierarchy_builder, @state_finalizers, @monitor_set)
@cluster_health_state.update_state(@state.to_h)
@telemetry.send
return Fluent::MultiEventStream.new
elsif tag.start_with?(@rewrite_tag)
es
else
raise "Invalid tag #{tag} received"
end
rescue => e
ApplicationInsightsUtility.sendExceptionTelemetry(e, {"FeatureArea" => "Health"})
@log.warn "Message: #{e.message} Backtrace: #{e.backtrace}"
return nil
end
end