def start_producer()

in ees_sharepoint/incremental_sync_command.py [0:0]


    def start_producer(self, queue):
        """This method starts async calls for the producer which is responsible for fetching documents from the
        SharePoint and pushing them in the shared queue
        :param queue: Shared queue to fetch the stored documents
        """
        self.logger.debug("Starting the incremental indexing..")
        current_time = (datetime.utcnow()).strftime("%Y-%m-%dT%H:%M:%SZ")

        thread_count = self.config.get_value("sharepoint_sync_thread_count")

        checkpoint = Checkpoint(self.config, self.logger)
        try:
            for collection in self.config.get_value("sharepoint.site_collections"):
                start_time, end_time = checkpoint.get_checkpoint(collection, current_time)
                sync_sharepoint = SyncSharepoint(
                    self.config,
                    self.logger,
                    self.workplace_search_custom_client,
                    self.sharepoint_client,
                    start_time,
                    end_time,
                    queue,
                )
                datelist = split_date_range_into_chunks(
                    start_time,
                    end_time,
                    thread_count,
                )
                storage_with_collection = self.local_storage.get_storage_with_collection(collection)
                self.logger.info(
                    "Starting to index all the objects configured in the object field: %s"
                    % (str(self.config.get_value("objects")))
                )

                ids = storage_with_collection["global_keys"][collection]
                storage_with_collection["global_keys"][collection] = sync_sharepoint.fetch_records_from_sharepoint(self.producer, datelist, thread_count, ids, collection)

                queue.put_checkpoint(collection, end_time, "incremental")

            enterprise_thread_count = self.config.get_value("enterprise_search_sync_thread_count")
            for _ in range(enterprise_thread_count):
                queue.end_signal()
        except Exception as exception:
            self.logger.exception(f"Error while fetching the objects . Error {exception}")
            raise exception
        self.local_storage.update_storage(storage_with_collection)