in aliyun/log/logclient_operator.py [0:0]
def copy_data(from_client, from_project, from_logstore, from_time, to_time=None,
to_client=None, to_project=None, to_logstore=None,
shard_list=None,
batch_size=None, compress=None, new_topic=None, new_source=None):
"""
copy data from one logstore to another one (could be the same or in different region), the time is log received time on server side.
"""
to_client = to_client or from_client
# increase the timeout to 2 min at least
from_client.timeout = max(from_client.timeout, 120)
to_client.timeout = max(to_client.timeout, 120)
to_project = to_project or from_project
to_logstore = to_logstore or from_logstore
to_time = to_time or "end"
cpu_count = multiprocessing.cpu_count() * 2
shards = from_client.list_shards(from_project, from_logstore).get_shards_info()
current_shards = [str(shard['shardID']) for shard in shards]
target_shards = _parse_shard_list(shard_list, current_shards)
worker_size = min(cpu_count, len(target_shards))
result = dict()
total_count = 0
with ProcessPoolExecutor(max_workers=worker_size) as pool:
futures = [pool.submit(copy_worker, from_client, from_project, from_logstore, shard,
from_time, to_time,
to_client, to_project, to_logstore,
batch_size=batch_size, compress=compress,
new_topic=new_topic, new_source=new_source)
for shard in target_shards]
for future in as_completed(futures):
partition, count = future.result()
total_count += count
if count:
result[partition] = count
return LogResponse({}, {"total_count": total_count, "shards": result})