in lib/crawler/output_sink/elasticsearch.rb [158:183]
def flush
body = operation_queue.pop_all
if body.empty?
system_logger.debug('Queue was empty when attempting to flush.')
return
end
indexing_docs_count = body.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and resetting queue...")
begin
client.bulk(
body:, **(pipeline_enabled? ? { pipeline: } : {})
)
system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
reset_ingestion_stats(true)
rescue ES::Client::IndexingFailedError => e
system_logger.warn("Bulk index failed: #{e}")
reset_ingestion_stats(false)
rescue StandardError => e
system_logger.warn("Bulk index failed for unexpected reason: #{e}")
reset_ingestion_stats(false)
end
end