lib/utility/error_monitor.rb (85 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. # # frozen_string_literal: true require 'time' require 'utility/errors' require 'utility/exception_tracking' module Utility class ErrorMonitor class MonitoringError < StandardError attr_accessor :tripped_by def initialize(message = nil, tripped_by: nil) super("#{message}#{tripped_by.present? ? " Tripped by - #{tripped_by.class}: #{tripped_by.message}" : ''}") @tripped_by = tripped_by end end class MaxSuccessiveErrorsExceededError < MonitoringError; end class MaxErrorsExceededError < MonitoringError; end class MaxErrorsInWindowExceededError < MonitoringError; end attr_reader :total_error_count, :success_count, :consecutive_error_count, :error_queue def initialize( max_errors: 1000, max_consecutive_errors: 10, max_error_ratio: 0.15, window_size: 100, error_queue_size: 20 ) @max_errors = max_errors @max_consecutive_errors = max_consecutive_errors @max_error_ratio = max_error_ratio @window_size = window_size @total_error_count = 0 @success_count = 0 @consecutive_error_count = 0 @window_errors = Array.new(window_size) { false } @window_index = 0 @last_error = nil @error_queue_size = error_queue_size @error_queue = [] end def note_success @consecutive_error_count = 0 @success_count += 1 track_window_error(false) end def note_error(error, id: Time.now.to_i) stack_trace = Utility::ExceptionTracking.generate_stack_trace(error) error_message = Utility::ExceptionTracking.generate_error_message(error, nil, nil) Utility::Logger.debug("Message id: #{id} - #{error_message}\n#{stack_trace}") @total_error_count += 1 @consecutive_error_count += 1 @error_queue << DocumentError.new(error.class.name, error_message, stack_trace, id) @error_queue = @error_queue.drop(1) if @error_queue.size > @error_queue_size track_window_error(true) @last_error = error raise_if_necessary end def finalize total_documents = @total_error_count + @success_count if total_documents > 0 && @total_error_count.to_f / total_documents > @max_error_ratio raise_with_last_cause(MaxErrorsInWindowExceededError.new("There were #{@total_error_count} errors out of #{total_documents} total documents", :tripped_by => @last_error)) end end private def raise_if_necessary error = if @consecutive_error_count > @max_consecutive_errors MaxSuccessiveErrorsExceededError.new("Exceeded maximum consecutive errors - saw #{@consecutive_error_count} errors in a row.", :tripped_by => @last_error) elsif @total_error_count > @max_errors MaxErrorsExceededError.new("Exceeded maximum number of errors - saw #{@total_error_count} errors in total.", :tripped_by => @last_error) elsif @window_size > 0 && num_errors_in_window / @window_size > @max_error_ratio MaxErrorsInWindowExceededError.new("Exceeded maximum error ratio of #{@max_error_ratio}. Of the last #{@window_size} documents, #{num_errors_in_window} had errors", :tripped_by => @last_error) end raise_with_last_cause(error) if error end def num_errors_in_window @window_errors.count(true).to_f end def track_window_error(is_error) # We keep the errors array of the size @window_size this way, imagine @window_size = 5 # Error array inits as falses: # [ false, false, false, false, false ] # Third document raises an error: # [ false, false, true, false, false ] # ^^^^ # 2 % 5 == 2 # Fifth document raises an error: # [ false, false, true, false, true ] # ^^^^ # 4 % 5 == 4 # Sixth document raises an error: # [ true, false, true, false, true ] # ^^^^ # 5 % 5 == 0 # # Eigth document is successful: # [ true, false, false, false, true ] # ^^^^^ # 7 % 5 == 2 # And so on. @window_errors[@window_index] = is_error @window_index = (@window_index + 1) % @window_size end def raise_with_last_cause(error) raise @last_error rescue StandardError raise error end end end