in lib/core/ingestion/es_sink.rb [44:72]
def ingest(document)
if document.nil? || document.empty?
Utility::Logger.warn('Connector attempted to ingest an empty document, skipping')
return
end
id = document['id']
serialized_document = serialize(document)
document_size = serialized_document.bytesize
if @max_allowed_document_size > 0 && document_size > @max_allowed_document_size
Utility::Logger.warn("Connector attempted to ingest too large document with id=#{document['id']} [#{document_size}/#{@max_allowed_document_size}], skipping the document.")
return
end
index_op = serialize({ 'index' => { '_index' => @index_name, '_id' => id } })
flush unless @operation_queue.will_fit?(index_op, serialized_document)
@operation_queue.add(
index_op,
serialized_document
)
@queued[:indexed_document_count] += 1
@queued[:indexed_document_volume] += document_size
end