def _run()

in aliyun/log/es_migration/migration_task.py [0:0]


    def _run(self, shutdown_flag):
        rnd = 0
        while not op.exists(shutdown_flag):
            if self._es_scroll_id is None:
                # initial search
                resp = self._es_client.search(
                    index=self._es_index,
                    body=self._es_query,
                    scroll=self._es_scroll,
                    size=self._batch_size,
                    params=self._es_params,
                )
            else:
                try:
                    resp = self._es_client.scroll(
                        scroll_id=self._es_scroll_id,
                        scroll=self._es_scroll,
                    )
                except NotFoundError:
                    msg = "cache is expired, which is with duration {}".format(self._es_scroll)
                    self._logger.error(msg, exc_info=True)
                    raise Exception(msg)

            scroll_id, hits = resp.get('_scroll_id'), resp['hits']['hits']
            if len(hits) > 0:
                self._put_docs(hits)
                self._cnt += len(hits)
                self._last = hits[-1]

            # end of scroll
            if scroll_id is None or len(hits) <= 0:
                self._status = Checkpoint.finished
                self._logger.info('Migration finished')
                break

            self._es_scroll_id = scroll_id
            rnd += 1
            if rnd % 100 == 0 or rnd == 1:
                _id = self._last.get('_id')
                self._ckpt.update(count=self._cnt, _id=_id, scroll_id=scroll_id)
                self._cnt = 0
                self._logger.info('Migration progress', extra=self._ckpt.content)