function transform()

in CWLogsToOpenSearch/index.js [53:112]


function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);

        // index name format: cwl-YYYY.MM.DD
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');

        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;


        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = '_doc';
        action.index._id = logEvent.id;
        
        // Use pipeline for application log and slow logs as they does not come as json
        if (payload.logStream.includes("-es-application-logs")) {
            action.index.pipeline = "application-logs";
            source['@cw_log_type'] = "application-logs";
        } else if (payload.logStream.includes("-search-slow-logs")) {
            action.index.pipeline = "search-slow-logs";
            source['@cw_log_type'] = "search-slow-logs";
        } else if (payload.logStream.includes("-index-slow-logs")) {
            action.index.pipeline = "index-slow-logs";
            source['@cw_log_type'] = "index-slow-logs";
        } else if (payload.logStream.includes("-audit-logs")) {
            source['@cw_log_type'] = "audit-logs";
        }
        
        // Extrtact domain name from audit_cluster_name which has the value as accountId:domainName fr other logs fetch second last items from group name assuming it has the format /aws/aes/domains/cluster_name/log_group_name
        if (source["audit_cluster_name"]) {
            source["domain_name"] = source["audit_cluster_name"].substring(source["audit_cluster_name"].indexOf(':') + 1);
        } else {
            source["domain_name"] = source['@log_group'].split("/").reverse()[1];
        }

    
        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('\n') + '\n';
    });
    return bulkRequestBody;
}