in aliyun/log/es_migration/migration_manager.py [0:0]
def _handle_cache(self, tasks):
file_tasks = op.join(self._config.cache_path, 'tasks.json')
if op.exists(file_tasks):
with open(file_tasks) as f:
cont = f.read()
else:
cont = '[]'
try:
old_tasks = json.loads(cont)
except json.JSONDecodeError:
self._logger.error('Invalid task cache', extra={'cache': cont})
old_tasks = []
task_map = {
(task['es_index'], task['es_shard']): task['id']
for task in old_tasks
}
_mappings = IndexLogstoreMappings(
list([task['es_index'] for task in tasks]),
self._config.get('logstore_index_mappings'),
)
cnt, new_tasks = len(old_tasks), []
for task in tasks:
_task = (task['es_index'], task['es_shard'])
if _task not in task_map:
task['id'] = cnt
task['logstore'] = _mappings.get_logstore(task['es_index'])
new_tasks.append(task)
cnt += 1
tasks = old_tasks + new_tasks
with open(file_tasks, 'w') as f:
f.write(json.dumps(tasks, indent=2))
if self._config.get('auto_creation'):
self._setup_aliyun_log(_mappings)
return tasks