in ees_network_drive/checkpointing.py [0:0]
def get_checkpoint(self, current_time, obj_type):
"""This method fetches the checkpoint from the checkpoint file in
the local storage. If the file does not exist, it takes the
checkpoint details from the configuration file.
:param current_time: current time
:param obj_type: drive for which checkpoint is fetched
"""
self.logger.info(
f"Fetching the checkpoint details from the checkpoint file: {CHECKPOINT_PATH}"
)
start_time = self.config.get_value("start_time")
end_time = self.config.get_value("end_time")
if os.path.exists(CHECKPOINT_PATH) and os.path.getsize(CHECKPOINT_PATH) > 0:
self.logger.debug(
"Checkpoint file exists and has contents, hence considering the checkpoint time \
instead of start_time and end_time"
)
with open(CHECKPOINT_PATH, encoding="UTF-8") as checkpoint_store:
try:
checkpoint_list = json.load(checkpoint_store)
if not checkpoint_list.get(obj_type):
self.logger.debug(
f"The checkpoint file is present but it does not contain the start_time for {obj_type}, \
hence considering the start_time and end_time from the configuration file instead of the \
last successful fetch time"
)
else:
try:
start_time = \
coerce_rfc_3339_date(checkpoint_list.get(obj_type)).strftime(RFC_3339_DATETIME_FORMAT)
end_time = current_time
except ValueError as exception:
raise IncorrectFormatError(obj_type, checkpoint_list.get(obj_type), exception)
except ValueError as exception:
self.logger.exception(
f"Error while parsing the json file of the checkpoint store from path: {CHECKPOINT_PATH}. \
Error: {exception}"
)
self.logger.info(
"Considering the start_time and end_time from the configuration file"
)
else:
self.logger.debug(
f"Checkpoint file does not exist at {CHECKPOINT_PATH}, considering \
the start_time and end_time from the configuration file"
)
self.logger.debug(
f"Contents of the start_time: {start_time} and end_time: {end_time} for {obj_type}",
)
return start_time, end_time