lib/core/ingestion/es_sink.rb (80 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'app/config'
require 'utility/bulk_queue'
require 'utility/es_client'
require 'utility/logger'
require 'elasticsearch/api'
#
# This class is responsible for sending the data to the data storage.
# While we don't actually allow to output our data anywhere except
# Elasticsearch, we still want to be able to do so sometime in future.
#
# This class should stay simple and any change to the class should be careful
# with the thought of introducing other sinks in future.
module Core
module Ingestion
class EsSink
def initialize(index_name, request_pipeline, bulk_queue = Utility::BulkQueue.new, max_allowed_document_size = Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_BYTES)
@client = Utility::EsClient.new(App::Config[:elasticsearch])
@index_name = index_name
@request_pipeline = request_pipeline
@operation_queue = bulk_queue
@max_allowed_document_size = max_allowed_document_size
@queued = {
:indexed_document_count => 0,
:deleted_document_count => 0,
:indexed_document_volume => 0
}
@completed = {
:indexed_document_count => 0,
:deleted_document_count => 0,
:indexed_document_volume => 0
}
end
def ingest(document)
if document.nil? || document.empty?
Utility::Logger.warn('Connector attempted to ingest an empty document, skipping')
return
end
id = document['id']
serialized_document = serialize(document)
document_size = serialized_document.bytesize
if @max_allowed_document_size > 0 && document_size > @max_allowed_document_size
Utility::Logger.warn("Connector attempted to ingest too large document with id=#{document['id']} [#{document_size}/#{@max_allowed_document_size}], skipping the document.")
return
end
index_op = serialize({ 'index' => { '_index' => @index_name, '_id' => id } })
flush unless @operation_queue.will_fit?(index_op, serialized_document)
@operation_queue.add(
index_op,
serialized_document
)
@queued[:indexed_document_count] += 1
@queued[:indexed_document_volume] += document_size
end
def ingest_multiple(documents)
documents.each { |doc| ingest(doc) }
end
def delete(id)
return if id.nil?
delete_op = serialize({ 'delete' => { '_index' => @index_name, '_id' => id } })
flush unless @operation_queue.will_fit?(delete_op)
@operation_queue.add(delete_op)
@queued[:deleted_document_count] += 1
end
def delete_multiple(ids)
ids.each { |id| delete(id) }
end
def flush
data = @operation_queue.pop_all
return if data.empty?
@client.bulk(:body => data, :pipeline => @request_pipeline)
@completed[:indexed_document_count] += @queued[:indexed_document_count]
@completed[:deleted_document_count] += @queued[:deleted_document_count]
@completed[:indexed_document_volume] += @queued[:indexed_document_volume]
@queued[:indexed_document_count] = 0
@queued[:deleted_document_count] = 0
@queued[:indexed_document_volume] = 0
end
def ingestion_stats
@completed.dup
end
private
def serialize(document)
Elasticsearch::API.serializer.dump(document)
end
end
end
end