function VPCFlow()

in x-pack/filebeat/module/gcp/vpcflow/config/pipeline.js [5:256]


function VPCFlow(keep_original_message, internalNetworks) {
    var processor = require("processor");

    // The pub/sub input writes the Stackdriver LogEntry object into the message
    // field. The message needs decoded as JSON.
    var decodeJson = new processor.DecodeJSONFields({
        fields: ["message"],
        target: "json",
    });

    // Set @timetamp the LogEntry's timestamp.
    var parseTimestamp = new processor.Timestamp({
        field: "json.timestamp",
        timezone: "UTC",
        layouts: ["2006-01-02T15:04:05.999999999Z07:00"],
        tests: ["2019-06-14T03:50:10.845445834Z"],
        ignore_missing: true,
    });

    var saveOriginalMessage = function(evt) {};
    if (keep_original_message) {
        saveOriginalMessage = new processor.Convert({
            fields: [
                {from: "message", to: "event.original"}
            ],
            mode: "rename"
        });
    }

    var dropPubSubFields = function(evt) {
        evt.Delete("message");
        evt.Delete("labels");
    };

    var categorizeEvent = new processor.AddFields({
        target: "event",
        fields: {
            kind: "event",
            category: "network",
            type: "connection",
        },
    });


    var saveMetadata = new processor.Convert({
        fields: [
            {from: "json.logName", to: "log.logger"},
            {from: "json.insertId", to: "event.id"},
        ],
        ignore_missing: true
    });

    // Use the LogEntry object's timestamp. VPC flow logs are structured so the
    // LogEntry includes a jsonPayload field.
    // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
    var convertLogEntry = new processor.Convert({
        fields: [
            {from: "json.jsonPayload", to: "json"},
        ],
        mode: "rename",
    });

    // The LogEntry's jsonPayload is moved to the json field. The jsonPayload
    // contains the structured VPC flow log fields.
    // https://cloud.google.com/vpc/docs/using-flow-logs#record_format
    var convertJsonPayload = new processor.Convert({
        fields: [
            {from: "json.connection.dest_ip", to: "destination.address"},
            {from: "json.connection.dest_port", to: "destination.port", type: "long"},
            {from: "json.connection.protocol", to: "network.iana_number", type: "string"},
            {from: "json.connection.src_ip", to: "source.address"},
            {from: "json.connection.src_port", to: "source.port", type: "long"},

            {from: "json.src_instance.vm_name", to: "source.domain"},
            {from: "json.dest_instance.vm_name", to: "destination.domain"},

            {from: "json.bytes_sent", to: "source.bytes", type: "long"},
            {from: "json.packets_sent", to: "source.packets", type: "long"},

            {from: "json.start_time", to: "event.start"},
            {from: "json.end_time", to: "event.end"},

            {from: "json.dest_location.asn", to: "destination.as.number", type: "long"},
            {from: "json.dest_location.continent", to: "destination.geo.continent_name"},
            {from: "json.dest_location.country", to: "destination.geo.country_name"},
            {from: "json.dest_location.region", to: "destination.geo.region_name"},
            {from: "json.dest_location.city", to: "destination.geo.city_name"},

            {from: "json.src_location.asn", to: "source.as.number", type: "long"},
            {from: "json.src_location.continent", to: "source.geo.continent_name"},
            {from: "json.src_location.country", to: "source.geo.country_name"},
            {from: "json.src_location.region", to: "source.geo.region_name"},
            {from: "json.src_location.city", to: "source.geo.city_name"},

            {from: "json.dest_instance", to: "gcp.destination.instance"},
            {from: "json.dest_vpc", to: "gcp.destination.vpc"},
            {from: "json.src_instance", to: "gcp.source.instance"},
            {from: "json.src_vpc", to: "gcp.source.vpc"},

            {from: "json.rtt_msec", to: "json.rtt.ms", type: "long"},
            {from: "json", to: "gcp.vpcflow"},
        ],
        mode: "rename",
        ignore_missing: true,
    });

    // Delete emtpy object's whose fields have been renamed leaving them childless.
    var dropEmptyObjects = function (evt) {
        evt.Delete("gcp.vpcflow.connection");
        evt.Delete("gcp.vpcflow.dest_location");
        evt.Delete("gcp.vpcflow.src_location");
    };

    // Copy the source/destination.address to source/destination.ip if they are
    // valid IP addresses.
    var copyAddressFields = new processor.Convert({
        fields: [
            {from: "source.address", to: "source.ip", type: "ip"},
            {from: "destination.address", to: "destination.ip", type: "ip"},
        ],
        fail_on_error: false,
    });

    var setCloudFromDestInstance = new processor.Convert({
        fields: [
            {from: "gcp.destination.instance.project_id", to: "cloud.project.id"},
            {from: "gcp.destination.instance.vm_name", to: "cloud.instance.name"},
            {from: "gcp.destination.instance.region", to: "cloud.region"},
            {from: "gcp.destination.instance.zone", to: "cloud.availability_zone"},
            {from: "gcp.destination.vpc.subnetwork_name", to: "network.name"},
        ],
        ignore_missing: true,
    });

    var setCloudFromSrcInstance = new processor.Convert({
        fields: [
            {from: "gcp.source.instance.project_id", to: "cloud.project.id"},
            {from: "gcp.source.instance.vm_name", to: "cloud.instance.name"},
            {from: "gcp.source.instance.region", to: "cloud.region"},
            {from: "gcp.source.instance.zone", to: "cloud.availability_zone"},
            {from: "gcp.source.vpc.subnetwork_name", to: "network.name"},
        ],
        ignore_missing: true,
    });

    // Set the cloud metadata fields based on the instance that reported the
    // event.
    var setCloudMetadata = function(evt) {
        var reporter = evt.Get("gcp.vpcflow.reporter");

        if (reporter === "DEST") {
            setCloudFromDestInstance.Run(evt);
        } else if (reporter === "SRC") {
            setCloudFromSrcInstance.Run(evt);
        }
    };

    var communityId = new processor.CommunityID({
        fields: {
            transport: "network.iana_number",
        }
    });

    // VPC flows are unidirectional so we only have to worry about copy the
    // source.bytes/packets over to network.bytes/packets.
    var setNetworkBytesPackets = new processor.Convert({
        fields: [
            {from: "source.bytes", to: "network.bytes"},
            {from: "source.packets", to: "network.packets"},
        ],
        ignore_missing: true,
    });

    // VPC flow logs are reported for TCP and UDP traffic only so handle these
    // protocols' IANA numbers.
    var setNetworkTransport = function(event) {
        var ianaNumber = event.Get("network.iana_number");
        switch (ianaNumber) {
            case "6":
                event.Put("network.transport", "tcp");
                break;
            case "17":
                event.Put("network.transport", "udp");
                break;
        }
    };

    var setNetworkDirection = function(event) {
        var srcInstance = event.Get("gcp.source.instance");
        var destInstance = event.Get("gcp.destination.instance");
        var direction = "unknown";

        if (srcInstance && destInstance) {
            direction = "internal";
        } else if (srcInstance) {
            direction = "outbound";
        } else if (destInstance) {
            direction = "inbound";
        }
        event.Put("network.direction", direction);
    };

    var setNetworkType = function(event) {
        var ip = event.Get("source.ip");
        if (!ip) {
            return;
        }

        if (ip.indexOf(".") !== -1) {
            event.Put("network.type", "ipv4");
        } else {
            event.Put("network.type", "ipv6");
        }
    };

    var setRelatedIP = function(event) {
        event.AppendTo("related.ip", event.Get("source.ip"));
        event.AppendTo("related.ip", event.Get("destination.ip"));
    };

    var pipeline = new processor.Chain()
        .Add(decodeJson)
        .Add(parseTimestamp)
        .Add(saveOriginalMessage)
        .Add(dropPubSubFields)
        .Add(categorizeEvent)
        .Add(saveMetadata)
        .Add(convertLogEntry)
        .Add(convertJsonPayload)
        .Add(dropEmptyObjects)
        .Add(copyAddressFields)
        .Add(setCloudMetadata)
        .Add(communityId)
        .Add(setNetworkBytesPackets)
        .Add(setNetworkTransport)
        .Add(setNetworkDirection)
        .Add(setNetworkType)
        .Add(setRelatedIP);

    if (internalNetworks) {
        pipeline = pipeline.AddNetworkDirection({
            source: "source.ip",
            destination: "destination.ip",
            target: "network.direction",
            internal_networks: internalNetworks,
        })
    }

    return {
        process: pipeline.Build().Run,
    };
}