in aliyun/log/es_migration/migration_task.py [0:0]
def _run(self, shutdown_flag):
rnd = 0
while not op.exists(shutdown_flag):
if self._es_scroll_id is None:
# initial search
resp = self._es_client.search(
index=self._es_index,
body=self._es_query,
scroll=self._es_scroll,
size=self._batch_size,
params=self._es_params,
)
else:
try:
resp = self._es_client.scroll(
scroll_id=self._es_scroll_id,
scroll=self._es_scroll,
)
except NotFoundError:
msg = "cache is expired, which is with duration {}".format(self._es_scroll)
self._logger.error(msg, exc_info=True)
raise Exception(msg)
scroll_id, hits = resp.get('_scroll_id'), resp['hits']['hits']
if len(hits) > 0:
self._put_docs(hits)
self._cnt += len(hits)
self._last = hits[-1]
# end of scroll
if scroll_id is None or len(hits) <= 0:
self._status = Checkpoint.finished
self._logger.info('Migration finished')
break
self._es_scroll_id = scroll_id
rnd += 1
if rnd % 100 == 0 or rnd == 1:
_id = self._last.get('_id')
self._ckpt.update(count=self._cnt, _id=_id, scroll_id=scroll_id)
self._cnt = 0
self._logger.info('Migration progress', extra=self._ckpt.content)