lib/es/bulk_queue.rb (70 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # # frozen_string_literal: true require 'elasticsearch/api' require_dependency File.join(__dir__, '..', 'errors') module ES class BulkQueue # Maximum number of operations in BULK Elasticsearch operation that will ingest the data DEFAULT_OP_COUNT_THRESHOLD = 100 # Maximum size of either whole BULK Elasticsearch operation or one document in it DEFAULT_SIZE_THRESHOLD = 1 * 1024 * 1024 # 1 megabyte def initialize(op_count_threshold, size_threshold, system_logger) @op_count_threshold = (op_count_threshold || DEFAULT_OP_COUNT_THRESHOLD).freeze @size_threshold = (size_threshold || DEFAULT_SIZE_THRESHOLD).freeze @system_logger = system_logger @system_logger.debug( "Initialized BulkQueue with op_count_threshold #{@op_count_threshold} and size_threshold #{@size_threshold}." ) @buffer = [] @current_op_count = 0 @current_buffer_size = 0 @current_data_size = 0 end def pop_all result = @buffer reset result end def add(operation, payload = nil) unless will_fit?(operation, payload) log = <<~LOG.squish Operation failed to add to bulk queue. Current operation count is #{@current_op_count}. Operation payload was #{bytesize(payload)} bytes, current buffer size is #{@current_buffer_size} bytes. LOG @system_logger.error(log) raise Errors::BulkQueueOverflowError end operation_size = bytesize(operation) payload_size = bytesize(payload) @current_op_count += 1 @current_buffer_size += operation_size @current_buffer_size += payload_size @current_data_size += payload_size @buffer << operation @buffer << payload if payload end def will_fit?(operation, payload = nil) return false if @current_op_count + 1 > @op_count_threshold operation_size = bytesize(operation) payload_size = bytesize(payload) @current_buffer_size + operation_size + payload_size < @size_threshold end def current_stats { current_op_count: @current_op_count, current_buffer_size: @current_buffer_size } end def bytesize(item) return 0 unless item serialize(item).bytesize end private def serialize(document) return '' unless document Elasticsearch::API.serializer.dump(document) end def reset @current_op_count = 0 @current_buffer_size = 0 @current_data_size = 0 @buffer = [] end end end