function Audit()

in x-pack/filebeat/module/gcp/audit/config/pipeline.js [5:327]


function Audit(keep_original_message) {
    var processor = require("processor");

    // The pub/sub input writes the Stackdriver LogEntry object into the message
    // field. The message needs decoded as JSON.
    var decodeJson = new processor.DecodeJSONFields({
        fields: ["message"],
        target: "json",
    });

    // Set @timetamp the LogEntry's timestamp.
    var parseTimestamp = new processor.Timestamp({
        field: "json.timestamp",
        timezone: "UTC",
        layouts: ["2006-01-02T15:04:05.999999999Z07:00"],
        tests: ["2019-06-14T03:50:10.845445834Z"],
        ignore_missing: true,
    });

    var saveOriginalMessage = function(evt) {};
    if (keep_original_message) {
        saveOriginalMessage = new processor.Convert({
            fields: [
                {from: "message", to: "event.original"}
            ],
            mode: "rename"
        });
    }

    var dropPubSubFields = function(evt) {
        evt.Delete("message");
    };

    var saveMetadata = new processor.Convert({
        fields: [
            {from: "json.logName", to: "log.logger"},
            {from: "json.insertId", to: "event.id"},
        ],
        ignore_missing: true
    });

    // Use the monitored resource type's labels to set the cloud metadata.
    // The labels can vary based on the resource.type.
    // https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource
    var setCloudMetadata = new processor.Convert({
        fields: [
            {
                from: "json.resource.labels.project_id",
                to: "cloud.project.id",
                type: "string"
            },
            {
                from: "json.resource.labels.instance_id",
                to: "cloud.instance.id",
                type: "string"
            }
        ],
        ignore_missing: true,
        fail_on_error: false,
    });

    var setOrchestratorMetadata = function(evt) {
          if (evt.Get("json.resource.type") === "k8s_cluster") {
            evt.Put("orchestrator.type", "kubernetes");
            var convert_processor = new processor.Convert({
                fields: [
                    {
                        from: "json.resource.labels.cluster_name",
                        to: "orchestrator.cluster.name",
                        type: "string"
                    },
                    {
                        from: "json.protoPayload.resourceName",
                        to: "orchestrator.resource.type_temp",
                        type: "string"
                    }
                ],
                ignore_missing: true,
                fail_on_error: false,
            }).Run(evt);
        }
    };

    // The log includes a protoPayload field.
    // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
    var convertLogEntry = new processor.Convert({
        fields: [
            {from: "json.protoPayload", to: "json"},
        ],
        mode: "rename",
    });

    // The LogEntry's protoPayload is moved to the json field. The protoPayload
    // contains the structured audit log fields.
    // https://cloud.google.com/logging/docs/reference/audit/auditlog/rest/Shared.Types/AuditLog
    var convertProtoPayload = new processor.Convert({
        fields: [
            {
                from: "json.@type",
                to: "gcp.audit.type",
                type: "string"
            },
            {
                from: "json.authenticationInfo.principalEmail",
                to: "gcp.audit.authentication_info.principal_email",
                type: "string"
            },
            {
                from: "json.authenticationInfo.authoritySelector",
                to: "gcp.audit.authentication_info.authority_selector",
                type: "string"
            },
            {
                from: "json.authorizationInfo",
                to: "gcp.audit.authorization_info"
                // Type is an array of objects.
            },
            {
                from: "json.methodName",
                to: "gcp.audit.method_name",
                type: "string",
            },
            {
                from: "json.numResponseItems",
                to: "gcp.audit.num_response_items",
                type: "long"
            },
            {
                from: "json.request.@type",
                to: "gcp.audit.request.proto_name",
                type: "string"
            },
            // The values in the request object will depend on the proto type.
            // So be very careful about making any assumptions about data shape.
            {
                from: "json.request.filter",
                to: "gcp.audit.request.filter",
                type: "string"
            },
            {
                from: "json.request.name",
                to: "gcp.audit.request.name",
                type: "string"
            },
            {
                from: "json.request.resourceName",
                to: "gcp.audit.request.resource_name",
                type: "string"
            },
            {
                from: "json.requestMetadata.callerIp",
                to: "gcp.audit.request_metadata.caller_ip",
                type: "ip"
            },
            {
                from: "json.requestMetadata.callerSuppliedUserAgent",
                to: "gcp.audit.request_metadata.caller_supplied_user_agent",
                type: "string",
            },
            {
                from: "json.response.@type",
                to: "gcp.audit.response.proto_name",
                type: "string"
            },
            // The values in the response object will depend on the proto type.
            // So be very careful about making any assumptions about data shape.
            {
                from: "json.response.status",
                to: "gcp.audit.response.status",
                type: "string"
            },
            {
                from: "json.response.details.group",
                to: "gcp.audit.response.details.group",
                type: "string"
            },
            {
                from: "json.response.details.kind",
                to: "gcp.audit.response.details.kind",
                type: "string"
            },
            {
                from: "json.response.details.name",
                to: "gcp.audit.response.details.name",
                type: "string"
            },
            {
                from: "json.response.details.uid",
                to: "gcp.audit.response.details.uid",
                type: "string",
            },
            {
                from: "json.resourceName",
                to: "gcp.audit.resource_name",
                type: "string",
            },
            {
                from: "json.resourceLocation.currentLocations",
                to: "gcp.audit.resource_location.current_locations"
                // Type is a string array.
            },
            {
                from: "json.serviceName",
                to: "gcp.audit.service_name",
                type: "string",
            },
            {
                from: "json.status.code",
                to: "gcp.audit.status.code",
                type: "integer",
            },
            {
                from: "json.status.message",
                to: "gcp.audit.status.message",
                type: "string"
            },
        ],
        mode: "rename",
        ignore_missing: true,
        fail_on_error: false,
    });

    // Copy some fields
    var copyFields = new processor.Convert({
        fields: [
            {
                from: "gcp.audit.request_metadata.caller_ip",
                to: "source.ip",
                type: "ip"
            },
            {
                from: "gcp.audit.authentication_info.principal_email",
                to: "user.email",
                type: "string"
            },
            {
                from: "gcp.audit.service_name",
                to: "service.name",
                type: "string"
            },
            {
                from: "gcp.audit.request_metadata.caller_supplied_user_agent",
                to: "user_agent.original",
                type: "string"
            },
            {
                from: "gcp.audit.method_name",
                to: "event.action",
                type: "string"
            },
        ],
        ignore_missing: true,
        fail_on_error: false,
    });

    // Drop extra fields
    var dropExtraFields = function(evt) {
        evt.Delete("json");
    };

    // Rename nested fields.
    var renameNestedFields = function(evt) {
        var arr = evt.Get("gcp.audit.authorization_info");
        if (Array.isArray(arr)) {
            for (var i = 0; i < arr.length; i++) {
                if (arr[i].resourceAttributes) {
                    // Convert to snake_case.
                    arr[i].resource_attributes = arr[i].resourceAttributes;
                    delete arr[i].resourceAttributes;
                }
            }
        }
    };

    // Set ECS categorization fields.
    var setECSCategorization = function(evt) {
        evt.Put("event.kind", "event");

        // google.rpc.Code value for OK is 0.
        if (evt.Get("gcp.audit.status.code") === 0) {
            evt.Put("event.outcome", "success");
            return;
        }

        // Try to use authorization_info.granted when there was no status code.
        if (evt.Get("gcp.audit.status.code") == null) {
            var authorization_info = evt.Get("gcp.audit.authorization_info");
            if (Array.isArray(authorization_info) && authorization_info.length === 1) {
                if (authorization_info[0].granted === true) {
                    evt.Put("event.outcome", "success");
                } else if (authorization_info[0].granted === false) {
                    evt.Put("event.outcome", "failure");
                }
                return
            }

            evt.Put("event.outcome", "unknown");
            return;
        }

        evt.Put("event.outcome", "failure");
    };

    var pipeline = new processor.Chain()
        .Add(decodeJson)
        .Add(parseTimestamp)
        .Add(saveOriginalMessage)
        .Add(dropPubSubFields)
        .Add(saveMetadata)
        .Add(setCloudMetadata)
        .Add(setOrchestratorMetadata)
        .Add(convertLogEntry)
        .Add(convertProtoPayload)
        .Add(copyFields)
        .Add(dropExtraFields)
        .Add(renameNestedFields)
        .Add(setECSCategorization)
        .Build();

    return {
        process: pipeline.Run,
    };
}