def _handle_cache()

in aliyun/log/es_migration/migration_manager.py [0:0]


    def _handle_cache(self, tasks):
        file_tasks = op.join(self._config.cache_path, 'tasks.json')
        if op.exists(file_tasks):
            with open(file_tasks) as f:
                cont = f.read()
        else:
            cont = '[]'

        try:
            old_tasks = json.loads(cont)
        except json.JSONDecodeError:
            self._logger.error('Invalid task cache', extra={'cache': cont})
            old_tasks = []

        task_map = {
            (task['es_index'], task['es_shard']): task['id']
            for task in old_tasks
        }
        _mappings = IndexLogstoreMappings(
            list([task['es_index'] for task in tasks]),
            self._config.get('logstore_index_mappings'),
        )
        cnt, new_tasks = len(old_tasks), []
        for task in tasks:
            _task = (task['es_index'], task['es_shard'])
            if _task not in task_map:
                task['id'] = cnt
                task['logstore'] = _mappings.get_logstore(task['es_index'])
                new_tasks.append(task)
                cnt += 1
        tasks = old_tasks + new_tasks

        with open(file_tasks, 'w') as f:
            f.write(json.dumps(tasks, indent=2))

        if self._config.get('auto_creation'):
            self._setup_aliyun_log(_mappings)
        return tasks