flush

in lib/crawler/output_sink/elasticsearch.rb [158:183]


      def flush 
        body = operation_queue.pop_all
        if body.empty?
          system_logger.debug('Queue was empty when attempting to flush.')
          return
        end

        
        indexing_docs_count = body.size / 2
        system_logger.info("Sending bulk request with #{indexing_docs_count} items and resetting queue...")

        begin
          client.bulk(
            body:, **(pipeline_enabled? ? { pipeline: } : {})
          ) 
          system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
          reset_ingestion_stats(true)
        rescue ES::Client::IndexingFailedError => e
          system_logger.warn("Bulk index failed: #{e}")
          reset_ingestion_stats(false)
        rescue StandardError => e
          system_logger.warn("Bulk index failed for unexpected reason: #{e}")
          reset_ingestion_stats(false)
        end
      end