ingest

in lib/core/ingestion/es_sink.rb [44:72]


      def ingest(document)
        if document.nil? || document.empty?
          Utility::Logger.warn('Connector attempted to ingest an empty document, skipping')
          return
        end

        id = document['id']
        serialized_document = serialize(document)

        document_size = serialized_document.bytesize

        if @max_allowed_document_size > 0 && document_size > @max_allowed_document_size
          Utility::Logger.warn("Connector attempted to ingest too large document with id=#{document['id']} [#{document_size}/#{@max_allowed_document_size}], skipping the document.")
          return
        end

        index_op = serialize({ 'index' => { '_index' => @index_name, '_id' => id } })

        flush unless @operation_queue.will_fit?(index_op, serialized_document)

        @operation_queue.add(
          index_op,
          serialized_document
        )

        @queued[:indexed_document_count] += 1
        @queued[:indexed_document_volume] += document_size
      end