in aliyun/log/logclient_operator.py [0:0]
def _transform_events_to_logstore(runner, events, to_client, to_project, to_logstore):
count = removed = processed = failed = 0
new_events = defaultdict(list)
default_time = time.time()
for src_event in events:
count += 1
new_event = runner(src_event)
if new_event is None:
removed += 1
continue
if not isinstance(new_event, (tuple, list)):
new_event = (new_event, )
for event in new_event:
if not isinstance(event, dict):
logger.error("transform_data: get unknown type of processed event: {0}".format(event))
continue
dt = int(event.get('__time__', default_time)) // 60 # group logs in same minute
topic = ''
source = ''
if "__topic__" in event:
topic = event['__topic__']
del event["__topic__"]
if "__source__" in event:
source = event['__source__']
del event["__source__"]
new_events[(dt, topic, source)].append(event)
for (dt, topic, source), contents in six.iteritems(new_events):
items = []
for content in contents:
st = content.get("__time__", default_time)
if "__time__" in content:
del content['__time__']
ct = list(six.iteritems(content))
item = LogItem(st, ct)
items.append(item)
req = PutLogsRequest(project=to_project, logstore=to_logstore, topic=topic, source=source, logitems=items)
res = put_logs_auto_div(to_client, req)
processed += len(items)
return count, removed, processed, failed