in ees_network_drive/checkpointing.py [0:0]
def set_checkpoint(self, current_time, index_type, obj_type):
""" This method updates the existing checkpoint json file or creates
a new checkpoint json file in case it is not present
:param current_time: current time
:index_type: indexing type from "incremental" or "full_sync"
:param obj_type: object type to set the checkpoint
"""
try:
with open(CHECKPOINT_PATH, encoding="UTF-8") as checkpoint_store:
checkpoint_list = json.load(checkpoint_store)
if checkpoint_list.get(obj_type):
self.logger.debug(
f"Setting the checkpoint contents: {current_time} for the {obj_type} \
to the checkpoint path: {CHECKPOINT_PATH}"
)
checkpoint_list[obj_type] = current_time
else:
self.logger.debug(
f"Setting the checkpoint contents: {self.config.get_value('end_time')} for the {obj_type} \
to the checkpoint path: {CHECKPOINT_PATH}"
)
checkpoint_list[obj_type] = self.config.get_value('end_time')
except Exception as exception:
if isinstance(exception, FileNotFoundError):
self.logger.debug(
f"Checkpoint file not found on path: {CHECKPOINT_PATH}. Generating the checkpoint file"
)
else:
self.logger.exception(
f"Error while fetching the json file of the checkpoint store from path: {CHECKPOINT_PATH}. \
Error: {exception}"
)
if index_type == "incremental":
checkpoint_time = self.config.get_value('end_time')
else:
checkpoint_time = current_time
self.logger.debug(
f"Setting the checkpoint contents: {checkpoint_time} for the {obj_type} \
to the checkpoint path: {CHECKPOINT_PATH}"
)
checkpoint_list = {obj_type: checkpoint_time}
with open(CHECKPOINT_PATH, "w", encoding="UTF-8") as checkpoint_store:
try:
json.dump(checkpoint_list, checkpoint_store, indent=4)
self.logger.info("Successfully saved the checkpoint")
except ValueError as exception:
self.logger.exception(
f"Error while updating the existing checkpoint json file. \
Adding the new content directly instead of updating. Error: {exception}"
)