receiver/netflowreceiver/parser.go (220 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" import ( "errors" "net/netip" "time" "github.com/netsampler/goflow2/v2/producer" protoproducer "github.com/netsampler/goflow2/v2/producer/proto" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" ) var ( // https://en.wikipedia.org/wiki/EtherType etypeNames = map[uint32]string{ 0x806: "arp", 0x800: "ipv4", 0x86dd: "ipv6", } // https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml transportProtocolNames = map[uint32]string{ 0: "hopopt", 1: "icmp", 2: "igmp", 3: "ggp", 4: "ipv4", 5: "st", 6: "tcp", 7: "cbt", 8: "egp", 9: "igp", 10: "bbn-rcc-mon", 11: "nvp-ii", 12: "pup", 13: "argus", 14: "emcon", 15: "xnet", 16: "chaos", 17: "udp", 18: "mux", 19: "dcn-meas", 20: "hmp", 21: "prm", 22: "xns-idp", 23: "trunk-1", 24: "trunk-2", 25: "leaf-1", 26: "leaf-2", 27: "rdp", 28: "irtp", 29: "iso-tp4", 30: "netblt", 31: "mfe-nsp", 32: "merit-inp", 33: "dccp", 34: "3pc", 35: "idpr", 36: "xtp", 37: "ddp", 38: "idpr-cmtp", 39: "tp++", 40: "il", 41: "ipv6", 42: "sdrp", 43: "ipv6-route", 44: "ipv6-frag", 45: "idrp", 46: "rsvp", 47: "gre", 48: "dsr", 49: "bna", 50: "esp", 51: "ah", 52: "i-nlsp", 53: "swipe", 54: "narp", 55: "min-ipv4", 56: "tlsp", 57: "skip", 58: "ipv6-icmp", 59: "ipv6-nonxt", 60: "ipv6-opts", 61: "any-host-internal-protocol", 62: "cftp", 63: "any-local-network", 64: "sat-expak", 65: "kryptolan", 66: "rvd", 67: "ippc", 68: "any-distributed-file-system", 69: "sat-mon", 70: "visa", 71: "ipcv", 72: "cpnx", 73: "cphb", 74: "wsn", 75: "pvp", 76: "br-sat-mon", 77: "sun-nd", 78: "wb-mon", 79: "wb-expak", 80: "iso-ip", 81: "vmtp", 82: "secure-vmtp", 83: "vines", 84: "iptm", 85: "nsfnet-igp", 86: "dgp", 87: "tcf", 88: "eigrp", 89: "ospfigp", 90: "sprite-rpc", 91: "larp", 92: "mtp", 93: "ax.25", 94: "ipip", 95: "micp", 96: "scc-sp", 97: "etherip", 98: "encap", 99: "any-private-encryption-scheme", 100: "gmtp", 101: "ifmp", 102: "pnni", 103: "pim", 104: "aris", 105: "scps", 106: "qnx", 107: "a/n", 108: "ipcomp", 109: "snp", 110: "compaq-peer", 111: "ipx-in-ip", 112: "vrrp", 113: "pgm", 114: "any-0-hop-protocol", 115: "l2tp", 116: "ddx", 117: "iatp", 118: "stp", 119: "srp", 120: "uti", 121: "smp", 122: "sm", 123: "ptp", 124: "isis over ipv4", 125: "fire", 126: "crtp", 127: "crudp", 128: "sscopmce", 129: "iplt", 130: "sps", 131: "pipe", 132: "sctp", 133: "fc", 134: "rsvp-e2e-ignore", 135: "mobility header", 136: "udplite", 137: "mpls-in-ip", 138: "manet", 139: "hip", 140: "shim6", 141: "wesp", 142: "rohc", 143: "ethernet", 144: "aggfrag", 145: "nsh", } flowTypeNames = map[int32]string{ 0: "unknown", 1: "sflow_5", 2: "netflow_v5", 3: "netflow_v9", 4: "ipfix", } ) func getEtypeName(etype uint32) string { if name, ok := etypeNames[etype]; ok { return name } return "unknown" } func getTransportName(proto uint32) string { if name, ok := transportProtocolNames[proto]; ok { return name } return "unknown" } func getFlowTypeName(flowType int32) string { if name, ok := flowTypeNames[flowType]; ok { return name } return "unknown" } // addMessageAttributes parses the message attributes and adds them to the log record func addMessageAttributes(m producer.ProducerMessage, r *plog.LogRecord) error { // we know msg is ProtoProducerMessage because that is the parent producer pm, ok := m.(*protoproducer.ProtoProducerMessage) if !ok { return errors.New("this flow message is not ProtoProducerMessage, this is not expected") } // Parse IP addresses bytes to netip.Addr srcAddr, _ := netip.AddrFromSlice(pm.SrcAddr) dstAddr, _ := netip.AddrFromSlice(pm.DstAddr) samplerAddr, _ := netip.AddrFromSlice(pm.SamplerAddress) // Time the receiver received the message receivedTime := time.Unix(0, int64(pm.TimeReceivedNs)) startTime := time.Unix(0, int64(pm.TimeFlowStartNs)) r.SetObservedTimestamp(pcommon.NewTimestampFromTime(receivedTime)) r.SetTimestamp(pcommon.NewTimestampFromTime(startTime)) // Source and destination attributes r.Attributes().PutStr(semconv.AttributeSourceAddress, srcAddr.String()) r.Attributes().PutInt(semconv.AttributeSourcePort, int64(pm.SrcPort)) r.Attributes().PutStr(semconv.AttributeDestinationAddress, dstAddr.String()) r.Attributes().PutInt(semconv.AttributeDestinationPort, int64(pm.DstPort)) // Network attributes r.Attributes().PutStr(semconv.AttributeNetworkTransport, getTransportName(pm.Proto)) r.Attributes().PutStr(semconv.AttributeNetworkType, getEtypeName(pm.Etype)) // There is no semconv as of today for these r.Attributes().PutInt("flow.io.bytes", int64(pm.Bytes)) r.Attributes().PutInt("flow.io.packets", int64(pm.Packets)) r.Attributes().PutStr("flow.type", getFlowTypeName(int32(pm.Type))) r.Attributes().PutInt("flow.sequence_num", int64(pm.SequenceNum)) r.Attributes().PutInt("flow.time_received", int64(pm.TimeReceivedNs)) r.Attributes().PutInt("flow.start", int64(pm.TimeFlowStartNs)) r.Attributes().PutInt("flow.end", int64(pm.TimeFlowEndNs)) r.Attributes().PutInt("flow.sampling_rate", int64(pm.SamplingRate)) r.Attributes().PutStr("flow.sampler_address", samplerAddr.String()) return nil }