do_sync!

in lib/core/sync_job_runner.rb [76:180]


    def do_sync!
      return unless claim_job!

      begin
        
        if @connector_settings.any_filtering_feature_enabled?
          Utility::Logger.info("Checking active filtering for sync job #{@job_id} for connector #{@connector_id}.")
          validate_filtering(@job.filtering)
          Utility::Logger.debug("Active filtering for sync job #{@job_id} for connector #{@connector_id} is valid.")
        end

        @connector_instance = Connectors::REGISTRY.connector(@service_type, @connector_settings.configuration, job_description: @job)
        @connector_instance.do_health_check!

        @sync_status = nil
        @sync_error = nil
        @reporting_cycle_start = Time.now

        incoming_ids = []
        existing_ids = ElasticConnectorActions.fetch_document_ids(@index_name)

        Utility::Logger.debug("#{existing_ids.size} documents are present in index #{@index_name}.")

        post_processing_engine = @connector_settings.filtering_rule_feature_enabled? ? Core::Filtering::PostProcessEngine.new(@job.filtering) : nil

        yield_docs do |document|
          next if post_processing_engine && !post_processing_engine.process(document).is_include?
          @sink.ingest(document)
          incoming_ids << document['id']
        end

        ids_to_delete = existing_ids - incoming_ids.uniq

        Utility::Logger.info("Deleting #{ids_to_delete.size} documents from index #{@index_name}.")

        ids_to_delete.each do |id|
          @sink.delete(id)

          periodically do
            check_job
            @job.update_metadata(@sink.ingestion_stats, @connector_instance.metadata)
          end
        end

        @sink.flush

        
        check_job

        
        
        @sync_status = Connectors::SyncStatus::COMPLETED
        @sync_error = nil
      rescue ConnectorNotFoundError, ConnectorJobNotFoundError, ConnectorJobNotRunningError => e
        Utility::Logger.error(e.message)
        @sync_status = Connectors::SyncStatus::ERROR
        @sync_error = e.message
      rescue ConnectorJobCanceledError => e
        Utility::Logger.error(e.message)
        @sync_status = Connectors::SyncStatus::CANCELED
        
        @sync_error = nil
      rescue StandardError => e
        @sync_status = Connectors::SyncStatus::ERROR
        @sync_error = e.message
        Utility::ExceptionTracking.log_exception(e)
      ensure
        stats = @sink.ingestion_stats

        Utility::Logger.debug("Sync stats are: #{stats}")
        Utility::Logger.info("Upserted #{stats[:indexed_document_count]} documents into #{@index_name}.")
        Utility::Logger.info("Deleted #{stats[:deleted_document_count]} documents into #{@index_name}.")

        
        @sync_status ||= Connectors::SyncStatus::ERROR
        @sync_error = 'Sync thread didn\'t finish execution. Check connector logs for more details.' if @sync_status == Connectors::SyncStatus::ERROR && @sync_error.nil?

        
        if reload_job!
          case @sync_status
          when Connectors::SyncStatus::COMPLETED
            @job.done!(stats, @connector_instance&.metadata)
          when Connectors::SyncStatus::CANCELED
            @job.cancel!(stats, @connector_instance&.metadata)
          when Connectors::SyncStatus::ERROR
            @job.error!(@sync_error, stats, @connector_instance&.metadata)
          else
            Utility::Logger.error("The job is supposed to be in one of the terminal statuses (#{Connectors::SyncStatus::TERMINAL_STATUSES.join(', ')}), but it's #{@sync_status}")
            @sync_status = Connectors::SyncStatus::ERROR
            @sync_error = 'The job is not ended as expected for unknown reason'
            @job.error!(@sync_error, stats, @connector_instance&.metadata)
          end
          
          reload_job!
        end

        
        if reload_connector!
          @connector_settings.update_last_sync!(@job)
        end

        Utility::Logger.info("Completed the job (ID: #{@job_id}) with status: #{@sync_status}#{@sync_error ? " and error: 
      end
    end