lib/es/bulk_queue.rb (70 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
# frozen_string_literal: true
require 'elasticsearch/api'
require_dependency File.join(__dir__, '..', 'errors')
module ES
class BulkQueue
# Maximum number of operations in BULK Elasticsearch operation that will ingest the data
DEFAULT_OP_COUNT_THRESHOLD = 100
# Maximum size of either whole BULK Elasticsearch operation or one document in it
DEFAULT_SIZE_THRESHOLD = 1 * 1024 * 1024 # 1 megabyte
def initialize(op_count_threshold, size_threshold, system_logger)
@op_count_threshold = (op_count_threshold || DEFAULT_OP_COUNT_THRESHOLD).freeze
@size_threshold = (size_threshold || DEFAULT_SIZE_THRESHOLD).freeze
@system_logger = system_logger
@system_logger.debug(
"Initialized BulkQueue with op_count_threshold #{@op_count_threshold} and size_threshold #{@size_threshold}."
)
@buffer = []
@current_op_count = 0
@current_buffer_size = 0
@current_data_size = 0
end
def pop_all
result = @buffer
reset
result
end
def add(operation, payload = nil)
unless will_fit?(operation, payload)
log = <<~LOG.squish
Operation failed to add to bulk queue. Current operation count is #{@current_op_count}.
Operation payload was #{bytesize(payload)} bytes, current buffer size is #{@current_buffer_size} bytes.
LOG
@system_logger.error(log)
raise Errors::BulkQueueOverflowError
end
operation_size = bytesize(operation)
payload_size = bytesize(payload)
@current_op_count += 1
@current_buffer_size += operation_size
@current_buffer_size += payload_size
@current_data_size += payload_size
@buffer << operation
@buffer << payload if payload
end
def will_fit?(operation, payload = nil)
return false if @current_op_count + 1 > @op_count_threshold
operation_size = bytesize(operation)
payload_size = bytesize(payload)
@current_buffer_size + operation_size + payload_size < @size_threshold
end
def current_stats
{
current_op_count: @current_op_count,
current_buffer_size: @current_buffer_size
}
end
def bytesize(item)
return 0 unless item
serialize(item).bytesize
end
private
def serialize(document)
return '' unless document
Elasticsearch::API.serializer.dump(document)
end
def reset
@current_op_count = 0
@current_buffer_size = 0
@current_data_size = 0
@buffer = []
end
end
end