lib/utility/bulk_queue.rb (58 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
require 'json'
require 'utility/constants'
module Utility
class BulkQueue
class QueueOverflowError < StandardError; end
# 500 items or 5MB
def initialize(operation_count_threshold = Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_SIZE, size_threshold = Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_BYTES)
@operation_count_threshold = operation_count_threshold.freeze
@size_threshold = size_threshold.freeze
@buffer = ''
@current_operation_count = 0
@current_buffer_size = 0
@current_data_size = 0
end
def pop_all
result = @buffer
reset
result
end
def add(operation, payload = nil)
raise QueueOverflowError unless will_fit?(operation, payload)
operation_size = get_size(operation)
payload_size = get_size(payload)
@current_operation_count += 1
@current_buffer_size += operation_size
@current_buffer_size += payload_size
@current_data_size += payload_size
@buffer << operation
@buffer << "\n"
if payload
@buffer << payload
@buffer << "\n"
end
end
def will_fit?(operation, payload = nil)
return false if @current_operation_count + 1 > @operation_count_threshold
operation_size = get_size(operation)
payload_size = get_size(payload)
@current_buffer_size + operation_size + payload_size < @size_threshold
end
def current_stats
{
:current_operation_count => @current_operation_count,
:current_buffer_size => @current_buffer_size
}
end
private
def get_size(str)
return 0 unless str
str.bytesize
end
def reset
@current_operation_count = 0
@current_buffer_size = 0
@current_data_size = 0
@buffer = ''
end
end
end