def transform_data()

in aliyun/log/logclient_operator.py [0:0]


def transform_data(from_client, from_project, from_logstore, from_time,
                   to_time=None,
                   to_client=None, to_project=None, to_logstore=None,
                   shard_list=None,
                   config=None,
                   batch_size=None, compress=None,
                   cg_name=None, c_name=None,
                   cg_heartbeat_interval=None, cg_data_fetch_interval=None, cg_in_order=None,
                   cg_worker_pool_size=None
                   ):
    """
    transform data from one logstore to another one (could be the same or in different region), the time is log received time on server side.

    """
    if not config:
        logger.info("transform_data: config is not configured, use copy data by default.")
        return copy_data(from_client, from_project, from_logstore, from_time, to_time=to_time,
                         to_client=to_client, to_project=to_project, to_logstore=to_logstore,
                         shard_list=shard_list,
                         batch_size=batch_size, compress=compress)

    to_client = to_client or from_client

    # increase the timeout to 2 min at least
    from_client.timeout = max(from_client.timeout, 120)
    to_client.timeout = max(to_client.timeout, 120)
    to_project = to_project or from_project
    to_logstore = to_logstore or from_logstore

    if not cg_name:
        # batch mode
        to_time = to_time or "end"
        cpu_count = multiprocessing.cpu_count() * 2
        shards = from_client.list_shards(from_project, from_logstore).get_shards_info()
        current_shards = [str(shard['shardID']) for shard in shards]
        target_shards = _parse_shard_list(shard_list, current_shards)

        worker_size = min(cpu_count, len(target_shards))

        result = dict()
        total_count = 0
        total_removed = 0
        with ProcessPoolExecutor(max_workers=worker_size) as pool:
            futures = [pool.submit(transform_worker, from_client, from_project, from_logstore, shard,
                                   from_time, to_time, config,
                                   to_client, to_project, to_logstore,
                                   batch_size=batch_size, compress=compress)
                       for shard in target_shards]

            for future in as_completed(futures):
                if future.exception():
                    logger.error("get error when transforming data: {0}".format(future.exception()))
                else:
                    partition, count, removed, processed, failed = future.result()
                    total_count += count
                    total_removed += removed
                    if count:
                        result[partition] = {"total_count": count, "transformed":
                            processed, "removed": removed, "failed": failed}

        return LogResponse({}, {"total_count": total_count, "shards": result})

    else:
        # consumer group mode
        c_name = c_name or "transform_data_{0}".format(multiprocessing.current_process().pid)
        cg_heartbeat_interval = cg_heartbeat_interval or 20
        cg_data_fetch_interval = cg_data_fetch_interval or 2
        cg_in_order = False if cg_in_order is None else cg_in_order
        cg_worker_pool_size = cg_worker_pool_size or 3

        option = LogHubConfig(from_client._endpoint, from_client._accessKeyId, from_client._accessKey,
                              from_project, from_logstore, cg_name,
                              c_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                              cursor_start_time=from_time,
                              cursor_end_time=to_time,
                              heartbeat_interval=cg_heartbeat_interval, data_fetch_interval=cg_data_fetch_interval,
                              in_order=cg_in_order,
                              worker_pool_size=cg_worker_pool_size)

        TransformDataConsumer.set_transform_options(config, to_client, to_project, to_logstore)

        result = {"total_count": 0, "shards": {}}
        l = RLock()

        def status_updator(shard_id, count=0, removed=0, processed=0, failed=0):
            logger.info("status update is called, shard: {0}, count: {1}, removed: {2}, processed: {3}, failed: {4}".format(shard_id, count, removed, processed, failed))

            with l:
                result["total_count"] += count
                if shard_id in result["shards"]:
                    data = result["shards"][shard_id]
                    result["shards"][shard_id] = {"total_count": data["total_count"] + count, "transformed": data["transformed"] + processed, "removed": data["removed"] + removed, "failed": data["failed"] + failed}
                else:
                    result["shards"][shard_id] = {"total_count": count, "transformed": processed, "removed": removed, "failed": failed}

        worker = ConsumerWorker(TransformDataConsumer, consumer_option=option, args=(status_updator, ) )
        worker.start()

        try:
            while worker.is_alive():
                worker.join(timeout=60)
            logger.info("transform_data: worker exit unexpected, try to shutdown it")
            worker.shutdown()
        except KeyboardInterrupt:
            logger.info("transform_data: *** try to exit **** ")
            print("try to stop transforming data.")
            worker.shutdown()
            worker.join(timeout=120)

        return LogResponse({}, result)