in cosmos-db-migration-utility/src/migrator-app/migrators/CollectionMigrator.py [0:0]
def watch(self, token_data, notify_callback):
# token_data from database may have resume_token
# try resume from that. if not use big_bang_token
# if not use resume_token = None
big_bang_token = {'_data': b'[{"token":"\\"0\\"","range":{"min":"","max":"FF"}}]'}
resume_token = big_bang_token
if token_data is not None and "resume_token" in token_data:
resume_token = bson.json_util.loads(token_data['resume_token'])
if type(resume_token) == str:
resume_token = bson.json_util.loads(token_data['resume_token'])
logger.info("Resuming watch on cluster_name: %s, namespace: %s using resume_token: %s",
self.__cluster_name, self.__namespace, dumps(resume_token))
# from the given token/earliest available token/None
retry_count = 0
while retry_count < 3:
try:
self.__watch(resume_token, notify_callback)
break
except pymongo.errors.OperationFailure as of:
if "Change feed token format is invalid" in of.details["errmsg"] or "Bad resume token: _data of missing" in of.details["errmsg"]:
logger.info("Detected invalid change feed token. Retrying with no resume_token. Error Message: %s", of.details["errmsg"])
elif "operation was interrupted" in of.details["errmsg"]:
logger.info("Detected CTRL-C. Closing watch on cluster: %s. namespace: %s.", self.__cluster_name, self.__namespace)
break
else:
logger.info("Watch operation failed on cluster: %s. namespace: %s.", self.__cluster_name, self.__namespace, exc_info=True)
retry_count = retry_count + 1
except Exception as ex:
logger.info("Unexpected exception while watching on namespace")
logger.exception(ex, exc_info=True)
finally:
if resume_token == big_bang_token:
resume_token = None
elif resume_token is None:
break
else:
resume_token = big_bang_token