in aliyun/log/logclient_operator.py [0:0]
def dump_worker(client, project_name, logstore_name, from_time, to_time,
shard_id, file_path,
batch_size=None, compress=None, encodings=None, no_escape=None, query=None):
res = client.pull_log(project_name, logstore_name, shard_id, from_time, to_time, batch_size=batch_size,
compress=compress, query=query)
encodings = encodings or ('utf8', 'latin1', 'gbk')
ensure_ansi = not no_escape
count = 0
next_cursor = 'as from_time configured'
try:
for data in res:
for log in data.get_flatten_logs_json(decode_bytes=True):
with open(os.path.expanduser(file_path), "a+") as f:
count += 1
try:
if six.PY2:
last_ex = None
for encoding in encodings:
try:
ret = json.dumps(log, encoding=encoding, ensure_ascii=ensure_ansi)
if isinstance(ret, unicode):
ret = ret.encode(encoding, errors="ignore")
f.write(ret)
f.write("\n")
break
except UnicodeDecodeError as ex:
last_ex = ex
else:
raise last_ex
else:
f.write(json.dumps(log, cls=get_encoder_cls(encodings), ensure_ascii=ensure_ansi))
f.write("\n")
except Exception as ex:
logger.error("shard: {0} Fail to dump log: {1}".format(shard_id, b64e(repr(log))), exc_info=True)
raise ex
next_cursor = data.next_cursor
except Exception as ex:
logger.error("dump log failed: task info {0} failed to copy data to target, next cursor: {1} detail: {2}".
format(
(project_name, logstore_name, shard_id, from_time, to_time),
next_cursor, ex), exc_info=True)
raise
return file_path, count