def dump_worker()

in aliyun/log/logclient_operator.py [0:0]


def dump_worker(client, project_name, logstore_name, from_time, to_time,
                shard_id, file_path,
                batch_size=None, compress=None, encodings=None, no_escape=None, query=None):
    res = client.pull_log(project_name, logstore_name, shard_id, from_time, to_time, batch_size=batch_size,
                          compress=compress, query=query)
    encodings = encodings or ('utf8', 'latin1', 'gbk')
    ensure_ansi = not no_escape

    count = 0
    next_cursor = 'as from_time configured'
    try:
        for data in res:
            for log in data.get_flatten_logs_json(decode_bytes=True):
                with open(os.path.expanduser(file_path), "a+") as f:
                    count += 1
                    try:
                        if six.PY2:
                            last_ex = None
                            for encoding in encodings:
                                try:
                                    ret = json.dumps(log, encoding=encoding, ensure_ascii=ensure_ansi)
                                    if isinstance(ret, unicode):
                                        ret = ret.encode(encoding, errors="ignore")
                                    f.write(ret)
                                    f.write("\n")
                                    break
                                except UnicodeDecodeError as ex:
                                    last_ex = ex
                            else:
                                raise last_ex
                        else:
                            f.write(json.dumps(log, cls=get_encoder_cls(encodings), ensure_ascii=ensure_ansi))
                            f.write("\n")
                    except Exception as ex:
                        logger.error("shard: {0} Fail to dump log: {1}".format(shard_id, b64e(repr(log))), exc_info=True)
                        raise ex
            next_cursor = data.next_cursor
    except Exception as ex:
        logger.error("dump log failed: task info {0} failed to copy data to target, next cursor: {1} detail: {2}".
                     format(
            (project_name, logstore_name, shard_id, from_time, to_time),
            next_cursor, ex), exc_info=True)
        raise

    return file_path, count