in x-pack/filebeat/module/gcp/firewall/config/pipeline.js [68:328]
function FirewallProcessor(keep_original_message, debug, internalNetworks) {
var builder = new PipelineBuilder("firewall", debug);
// The pub/sub input writes the Stackdriver LogEntry object into the message
// field. The message needs decoded as JSON.
builder.Add("decodeJson", new processor.DecodeJSONFields({
fields: ["message"],
target: "json"
}));
// Set @timestamp to the LogEntry's timestamp.
builder.Add("parseTimestamp", new processor.Timestamp({
field: "json.timestamp",
timezone: "UTC",
layouts: ["2006-01-02T15:04:05.999999999Z07:00"],
tests: ["2019-06-14T03:50:10.845445834Z"],
ignore_missing: true
}));
if (keep_original_message) {
builder.Add("saveOriginalMessage", new processor.Convert({
fields: [
{from: "message", to: "event.original"}
],
mode: "rename"
}));
}
builder.Add("dropPubSubFields", function(evt) {
evt.Delete("message");
evt.Delete("labels");
});
builder.Add("categorizeEvent", new processor.AddFields({
target: "event",
fields: {
kind: "event",
category: "network",
type: "connection",
action: "firewall-rule"
},
}));
builder.Add("saveMetadata", new processor.Convert({
fields: [
{from: "json.logName", to: "log.logger"},
{from: "json.resource.labels.subnetwork_name", to: "network.name"},
{from: "json.insertId", to: "event.id"}
],
ignore_missing: true
}));
// Firewall logs are structured so the LogEntry includes a jsonPayload field.
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
// The LogEntry's jsonPayload is moved to the json field. The jsonPayload
// contains the structured VPC flow log fields.
builder.Add("convertLogEntry", new processor.Convert({
fields: [
{from: "json.jsonPayload", to: "json"},
],
mode: "rename"
}));
builder.Add("addType", function(evt) {
var disp = evt.Get("json.disposition");
if (disp != null) {
evt.AppendTo("event.type", disp.toLowerCase());
}
});
builder.Add("addDirection", makeMapper({
from: "json.rule_details.direction",
to: "network.direction",
mappings: {
INGRESS: "inbound",
EGRESS: "outbound"
},
default: "unknown"
}));
builder.Add("conditionalRename", makeConditional({
condition: function(evt) {
return evt.Get("json.rule_details.direction");
},
EGRESS: processor.Convert({
fields: [
{from: "json.vpc", to: "json.src_vpc"},
{from: "json.instance", to: "json.src_instance"},
{from: "json.location", to: "json.src_location"},
{from: "json.remote_vpc", to: "json.dest_vpc"},
{from: "json.remote_instance", to: "json.dest_instance"},
{from: "json.remote_location", to: "json.dest_location"}
],
mode: "rename",
fail_on_error: false,
ignore_missing: true
}),
INGRESS: processor.Convert({
fields: [
{from: "json.vpc", to: "json.dest_vpc"},
{from: "json.instance", to: "json.dest_instance"},
{from: "json.location", to: "json.dest_location"},
{from: "json.remote_vpc", to: "json.src_vpc"},
{from: "json.remote_instance", to: "json.src_instance"},
{from: "json.remote_location", to: "json.src_location"}
],
mode: "rename",
fail_on_error: false,
ignore_missing: true
})
}));
// Set network.iana_number from connection.protocol, converting it to long
// and ignoring the failure if it's not numeric.
builder.Add("ianaNumber", new processor.Convert({
fields: [{
from: "json.connection.protocol",
to: "network.iana_number",
type: "long"
}],
fail_on_error: false
}));
// Set network.transport from iana_number. GCP Firewall only supports
// logging of tcp and udp connections, added icmp just in case as it's the
// other protocol supported by firewall rules.
builder.Add("transportFromIANA", makeMapper({
from: "network.iana_number",
to: "network.transport",
mappings: {
1: "icmp",
6: "tcp",
17: "udp"
}
}));
builder.Add("convertJsonPayload", new processor.Convert({
fields: [
{from: "json.connection.dest_ip", to: "destination.address"},
{from: "json.connection.dest_port", to: "destination.port", type: "long"},
{from: "json.connection.src_ip", to: "source.address"},
{from: "json.connection.src_port", to: "source.port", type: "long"},
{from: "json.src_instance.vm_name", to: "source.domain"},
{from: "json.dest_instance.vm_name", to: "destination.domain"},
{from: "json.dest_location.asn", to: "destination.as.number", type: "long"},
{from: "json.dest_location.continent", to: "destination.geo.continent_name"},
{from: "json.dest_location.country", to: "destination.geo.country_name"},
{from: "json.dest_location.region", to: "destination.geo.region_name"},
{from: "json.dest_location.city", to: "destination.geo.city_name"},
{from: "json.src_location.asn", to: "source.as.number", type: "long"},
{from: "json.src_location.continent", to: "source.geo.continent_name"},
{from: "json.src_location.country", to: "source.geo.country_name"},
{from: "json.src_location.region", to: "source.geo.region_name"},
{from: "json.src_location.city", to: "source.geo.city_name"},
{from: "json.dest_instance", to: "gcp.destination.instance"},
{from: "json.dest_vpc", to: "gcp.destination.vpc"},
{from: "json.src_instance", to: "gcp.source.instance"},
{from: "json.src_vpc", to: "gcp.source.vpc"},
{from: "json.rule_details.reference", to: "rule.name"},
{from: "json", to: "gcp.firewall"},
],
mode: "rename",
ignore_missing: true,
fail_on_error: false
}));
// Delete emtpy object's whose fields have been renamed leaving them childless.
builder.Add("dropEmptyObjects", function (evt) {
evt.Delete("gcp.firewall.connection");
evt.Delete("gcp.firewall.dest_location");
evt.Delete("gcp.firewall.disposition");
evt.Delete("gcp.firewall.src_location");
});
// Copy the source/destination.address to source/destination.ip if they are
// valid IP addresses.
builder.Add("copyAddressFields", new processor.Convert({
fields: [
{from: "source.address", to: "source.ip", type: "ip"},
{from: "destination.address", to: "destination.ip", type: "ip"}
],
fail_on_error: false
}));
builder.Add("setCloudMetadata", makeConditional({
condition: function (evt) {
return evt.Get("json.rule_details.direction");
},
EGRESS: new processor.Convert({
fields: [
{from: "gcp.source.instance.project_id", to: "cloud.project.id"},
{from: "gcp.source.instance.vm_name", to: "cloud.instance.name"},
{from: "gcp.source.instance.region", to: "cloud.region"},
{from: "gcp.source.instance.zone", to: "cloud.availability_zone"},
{from: "gcp.source.vpc.subnetwork_name", to: "network.name"}
],
ignore_missing: true
}),
INGRESS: new processor.Convert({
fields: [
{from: "gcp.destination.instance.project_id", to: "cloud.project.id"},
{from: "gcp.destination.instance.vm_name", to: "cloud.instance.name"},
{from: "gcp.destination.instance.region", to: "cloud.region"},
{from: "gcp.destination.instance.zone", to: "cloud.availability_zone"},
{from: "gcp.destination.vpc.subnetwork_name", to: "network.name"},
],
ignore_missing: true
})
}));
builder.Add("communityId", new processor.CommunityID({
fields: {
transport: "network.iana_number"
}
}));
builder.Add("setInternalDirection", function(event) {
var srcInstance = event.Get("gcp.source.instance");
var destInstance = event.Get("gcp.destination.instance");
if (srcInstance && destInstance) {
event.Put("network.direction", "internal");
}
});
builder.Add("setNetworkType", function(event) {
var ip = event.Get("source.ip");
if (!ip) {
return;
}
if (ip.indexOf(".") !== -1) {
event.Put("network.type", "ipv4");
} else {
event.Put("network.type", "ipv6");
}
});
builder.Add("setRelatedIP", function(event) {
event.AppendTo("related.ip", event.Get("source.ip"));
event.AppendTo("related.ip", event.Get("destination.ip"));
});
if (internalNetworks) {
builder.Add("addNetworkDirection", processor.AddNetworkDirection({
source: "source.ip",
destination: "destination.ip",
target: "network.direction",
internal_networks: internalNetworks,
}))
}
return {
process: builder.Build().Run
};
}