def copy_data()

in aliyun/log/logclient_operator.py [0:0]


def copy_data(from_client, from_project, from_logstore, from_time, to_time=None,
              to_client=None, to_project=None, to_logstore=None,
              shard_list=None,
              batch_size=None, compress=None, new_topic=None, new_source=None):
    """
    copy data from one logstore to another one (could be the same or in different region), the time is log received time on server side.

    """
    to_client = to_client or from_client
    # increase the timeout to 2 min at least
    from_client.timeout = max(from_client.timeout, 120)
    to_client.timeout = max(to_client.timeout, 120)

    to_project = to_project or from_project
    to_logstore = to_logstore or from_logstore
    to_time = to_time or "end"

    cpu_count = multiprocessing.cpu_count() * 2
    shards = from_client.list_shards(from_project, from_logstore).get_shards_info()
    current_shards = [str(shard['shardID']) for shard in shards]
    target_shards = _parse_shard_list(shard_list, current_shards)
    worker_size = min(cpu_count, len(target_shards))

    result = dict()
    total_count = 0
    with ProcessPoolExecutor(max_workers=worker_size) as pool:
        futures = [pool.submit(copy_worker, from_client, from_project, from_logstore, shard,
                               from_time, to_time,
                               to_client, to_project, to_logstore,
                               batch_size=batch_size, compress=compress,
                               new_topic=new_topic, new_source=new_source)
                   for shard in target_shards]

        for future in as_completed(futures):
            partition, count = future.result()
            total_count += count
            if count:
                result[partition] = count

    return LogResponse({}, {"total_count": total_count, "shards": result})