in collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py [0:0]
def __init__(self, history_path: str, dataset_id: str, signature_fun=None):
"""
Constructor
:param history_path:
:param dataset_id:
:param signature_fun: function which creates the signature of the cache,
a file path string as argument and returns a string (md5sum, time stamp)
"""
self._dataset_id = dataset_id
self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
self._signature_fun = signature_fun
self._history_dict = {}
self._load_history_dict()
Path(history_path).mkdir(parents=True, exist_ok=True)
self._history_file = open(f"{self._history_file_path}", 'a', buffering=1)
self._latest_ingested_file_update_file_path = os.path.join(history_path, f'{dataset_id}.ts')
if os.path.exists(self._latest_ingested_file_update_file_path):
with open(self._latest_ingested_file_update_file_path, 'r') as f_ts:
self._latest_ingested_file_update = float(f_ts.readline())