in ees_sharepoint/checkpointing.py [0:0]
def set_checkpoint(self, collection, current_time, index_type):
"""This method updates the existing checkpoint json file or creates
a new checkpoint json file in case it is not present
:param collection: collection name
:param current_time: current time"""
if os.path.exists(CHECKPOINT_PATH) and os.path.getsize(CHECKPOINT_PATH) > 0:
self.logger.debug(
f"""Setting the checkpoint contents: {current_time} \
for the collection {collection} \
to the checkpoint path:{CHECKPOINT_PATH}"""
)
with open(CHECKPOINT_PATH) as checkpoint_store:
try:
checkpoint_list = json.load(checkpoint_store)
checkpoint_list[collection] = current_time
except ValueError as exception:
self.logger.exception(
"Error while parsing the json file of the checkpoint store from path: %s. Error: %s"
% (CHECKPOINT_PATH, exception)
)
else:
if index_type == "incremental":
checkpoint_time = self.config.get_value("end_time")
else:
checkpoint_time = current_time
self.logger.debug(
"Setting the checkpoint contents: %s for the collection %s to the checkpoint path:%s"
% (checkpoint_time, collection, CHECKPOINT_PATH)
)
checkpoint_list = {collection: checkpoint_time}
with open(CHECKPOINT_PATH, "w") as checkpoint_store:
try:
json.dump(checkpoint_list, checkpoint_store, indent=4)
self.logger.info("Successfully saved the checkpoint")
except ValueError as exception:
self.logger.exception(
"Error while updating the existing checkpoint json file. Adding the new content directly instead of updating. Error: %s"
% exception
)