def watch()

in cosmos-db-migration-utility/src/migrator-app/migrators/CollectionMigrator.py [0:0]


    def watch(self, token_data, notify_callback):
        # token_data from database may have resume_token
        # try resume from that. if not use big_bang_token
        # if not use resume_token = None
        big_bang_token = {'_data': b'[{"token":"\\"0\\"","range":{"min":"","max":"FF"}}]'}
        resume_token = big_bang_token
        if token_data is not None and "resume_token" in token_data:
            resume_token = bson.json_util.loads(token_data['resume_token'])
            if type(resume_token) == str:
                resume_token = bson.json_util.loads(token_data['resume_token'])
            logger.info("Resuming watch on cluster_name: %s, namespace: %s using resume_token: %s",
                self.__cluster_name, self.__namespace, dumps(resume_token))
        # from the given token/earliest available token/None
        retry_count = 0
        while retry_count < 3:
            try:
                self.__watch(resume_token, notify_callback)
                break
            except pymongo.errors.OperationFailure as of:
                if "Change feed token format is invalid" in of.details["errmsg"] or "Bad resume token: _data of missing" in of.details["errmsg"]:
                    logger.info("Detected invalid change feed token. Retrying with no resume_token. Error Message: %s", of.details["errmsg"])
                elif "operation was interrupted" in of.details["errmsg"]:
                    logger.info("Detected CTRL-C. Closing watch on cluster: %s. namespace: %s.", self.__cluster_name, self.__namespace)
                    break
                else:
                    logger.info("Watch operation failed on cluster: %s. namespace: %s.", self.__cluster_name, self.__namespace, exc_info=True)
                retry_count = retry_count + 1
            except Exception as ex:
                logger.info("Unexpected exception while watching on namespace")
                logger.exception(ex, exc_info=True)
            finally:
                if resume_token == big_bang_token:
                    resume_token = None
                elif resume_token is None:
                    break
                else:
                    resume_token = big_bang_token