def _transform_events_to_logstore()

in aliyun/log/logclient_operator.py [0:0]


def _transform_events_to_logstore(runner, events, to_client, to_project, to_logstore):
    count = removed = processed = failed = 0
    new_events = defaultdict(list)

    default_time = time.time()
    for src_event in events:
        count += 1
        new_event = runner(src_event)

        if new_event is None:
            removed += 1
            continue

        if not isinstance(new_event, (tuple, list)):
            new_event = (new_event, )

        for event in new_event:
            if not isinstance(event, dict):
                logger.error("transform_data: get unknown type of processed event: {0}".format(event))
                continue

            dt = int(event.get('__time__', default_time)) // 60  # group logs in same minute
            topic = ''
            source = ''
            if "__topic__" in event:
                topic = event['__topic__']
                del event["__topic__"]
            if "__source__" in event:
                source = event['__source__']
                del event["__source__"]

            new_events[(dt, topic, source)].append(event)

    for (dt, topic, source), contents in six.iteritems(new_events):

        items = []

        for content in contents:
            st = content.get("__time__", default_time)
            if "__time__" in content:
                del content['__time__']

            ct = list(six.iteritems(content))
            item = LogItem(st, ct)
            items.append(item)

        req = PutLogsRequest(project=to_project, logstore=to_logstore, topic=topic, source=source, logitems=items)
        res = put_logs_auto_div(to_client, req)
        processed += len(items)

    return count, removed, processed, failed