in aliyun/log/logclient_operator.py [0:0]
def transform_data(from_client, from_project, from_logstore, from_time,
to_time=None,
to_client=None, to_project=None, to_logstore=None,
shard_list=None,
config=None,
batch_size=None, compress=None,
cg_name=None, c_name=None,
cg_heartbeat_interval=None, cg_data_fetch_interval=None, cg_in_order=None,
cg_worker_pool_size=None
):
"""
transform data from one logstore to another one (could be the same or in different region), the time is log received time on server side.
"""
if not config:
logger.info("transform_data: config is not configured, use copy data by default.")
return copy_data(from_client, from_project, from_logstore, from_time, to_time=to_time,
to_client=to_client, to_project=to_project, to_logstore=to_logstore,
shard_list=shard_list,
batch_size=batch_size, compress=compress)
to_client = to_client or from_client
# increase the timeout to 2 min at least
from_client.timeout = max(from_client.timeout, 120)
to_client.timeout = max(to_client.timeout, 120)
to_project = to_project or from_project
to_logstore = to_logstore or from_logstore
if not cg_name:
# batch mode
to_time = to_time or "end"
cpu_count = multiprocessing.cpu_count() * 2
shards = from_client.list_shards(from_project, from_logstore).get_shards_info()
current_shards = [str(shard['shardID']) for shard in shards]
target_shards = _parse_shard_list(shard_list, current_shards)
worker_size = min(cpu_count, len(target_shards))
result = dict()
total_count = 0
total_removed = 0
with ProcessPoolExecutor(max_workers=worker_size) as pool:
futures = [pool.submit(transform_worker, from_client, from_project, from_logstore, shard,
from_time, to_time, config,
to_client, to_project, to_logstore,
batch_size=batch_size, compress=compress)
for shard in target_shards]
for future in as_completed(futures):
if future.exception():
logger.error("get error when transforming data: {0}".format(future.exception()))
else:
partition, count, removed, processed, failed = future.result()
total_count += count
total_removed += removed
if count:
result[partition] = {"total_count": count, "transformed":
processed, "removed": removed, "failed": failed}
return LogResponse({}, {"total_count": total_count, "shards": result})
else:
# consumer group mode
c_name = c_name or "transform_data_{0}".format(multiprocessing.current_process().pid)
cg_heartbeat_interval = cg_heartbeat_interval or 20
cg_data_fetch_interval = cg_data_fetch_interval or 2
cg_in_order = False if cg_in_order is None else cg_in_order
cg_worker_pool_size = cg_worker_pool_size or 3
option = LogHubConfig(from_client._endpoint, from_client._accessKeyId, from_client._accessKey,
from_project, from_logstore, cg_name,
c_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
cursor_start_time=from_time,
cursor_end_time=to_time,
heartbeat_interval=cg_heartbeat_interval, data_fetch_interval=cg_data_fetch_interval,
in_order=cg_in_order,
worker_pool_size=cg_worker_pool_size)
TransformDataConsumer.set_transform_options(config, to_client, to_project, to_logstore)
result = {"total_count": 0, "shards": {}}
l = RLock()
def status_updator(shard_id, count=0, removed=0, processed=0, failed=0):
logger.info("status update is called, shard: {0}, count: {1}, removed: {2}, processed: {3}, failed: {4}".format(shard_id, count, removed, processed, failed))
with l:
result["total_count"] += count
if shard_id in result["shards"]:
data = result["shards"][shard_id]
result["shards"][shard_id] = {"total_count": data["total_count"] + count, "transformed": data["transformed"] + processed, "removed": data["removed"] + removed, "failed": data["failed"] + failed}
else:
result["shards"][shard_id] = {"total_count": count, "transformed": processed, "removed": removed, "failed": failed}
worker = ConsumerWorker(TransformDataConsumer, consumer_option=option, args=(status_updator, ) )
worker.start()
try:
while worker.is_alive():
worker.join(timeout=60)
logger.info("transform_data: worker exit unexpected, try to shutdown it")
worker.shutdown()
except KeyboardInterrupt:
logger.info("transform_data: *** try to exit **** ")
print("try to stop transforming data.")
worker.shutdown()
worker.join(timeout=120)
return LogResponse({}, result)