tools/tcpdump_encap_helper/tcpdump_encap_helper.py (196 lines of code) (raw):

#!/usr/bin/env python3 # Copyright (C) 2018-present, Facebook, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import argparse import ipaddress import struct CHUNK_SIZE = 10 EMPTY_FILTER = 2 V4_HDR_SIZE = 20 V6_HDR_SIZE = 40 UDP_HDR_SIZE = 8 V4_SRC_OFFSET = 12 V4_DST_OFFSET = 16 V4_PROTO_OFFSET = 9 SPORT_OFFSET = 0 DPORT_OFFSET = 2 V6_SRC_OFFSET = 8 V6_DST_OFFSET = 24 V6_PROTO_OFFSET = 6 V4_ADDR_SIZE = 4 V6_PART_SIZE = 4 PROTO_SIZE = 1 PORT_SIZE = 2 def ipv4_in_hex(addr): """ @param string addr ipv4 address @return string hex representation of this address """ return "0x{:08X}".format(int(ipaddress.ip_address(addr))) def ip6_exploded(addr): """ @param string addr ipv6 address @return list<string> ipv6 addresses splited into 4 groups by 32bit each """ return [ "0x{:04X}".format(x) for x in struct.unpack("!IIII", ipaddress.ip_address(addr).packed) ] def modify_filter(tcpdump_filters, value, offset, size, v6=False): """ @param list<string> tcpdump_filter list of currently generated filters @param string value to match @param int offset offset of this value in the packet @param int size of the value in bytes @param bool v6 flag if the filter is related to ipv6 packet @return modified tcpdump_filter """ packet_type = "ip" if v6: packet_type = "ip6" tcpdump_filters.append( "({}[{}:{}] == {} )".format(packet_type, offset, size, value) ) return tcpdump_filters def create_tcpdump_line_v6(args, offset): """ @param args cli arguments @param int offset of internal header compare to external one @return None IPv6 Header Format +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |Version| Traffic Class | Flow Label | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Payload Length | Next Header | Hop Limit | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + + | | + Source Address + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + + | | + Destination Address + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ """ tcpdump_filters = [] if args.src: chunk_offset = 0 for v6_chunk in ip6_exploded(args.src): tcpdump_filters = modify_filter( tcpdump_filters, v6_chunk, offset + V6_SRC_OFFSET + chunk_offset, V6_PART_SIZE, True, ) chunk_offset += V6_PART_SIZE if args.dst: chunk_offset = 0 for v6_chunk in ip6_exploded(args.dst): tcpdump_filters = modify_filter( tcpdump_filters, v6_chunk, offset + V6_DST_OFFSET + chunk_offset, V6_PART_SIZE, True, ) chunk_offset += V6_PART_SIZE if args.proto: tcpdump_filters = modify_filter( tcpdump_filters, args.proto, offset + V6_PROTO_OFFSET, PROTO_SIZE, True ) if args.sport: tcpdump_filters = modify_filter( tcpdump_filters, args.sport, offset + V6_HDR_SIZE + SPORT_OFFSET, PORT_SIZE, True, ) if args.dport: tcpdump_filters = modify_filter( tcpdump_filters, args.dport, offset + V6_HDR_SIZE + DPORT_OFFSET, PORT_SIZE, True, ) print('"(' + " and ".join(tcpdump_filters) + ')"') def create_tcpdump_line_v4(args, offset, encap_v6=False): """ @param object args cli arguments @param int offset of internal header compare to external one @param bool encap_v6 flag to indicate that outer packet is ipv6 @return None IP header format: 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |Version| IHL |Type of Service| Total Length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Identification |Flags| Fragment Offset | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Time to Live | Protocol | Header Checksum | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Source Address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Destination Address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Options | Padding | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ """ tcpdump_filters = [] if args.src: tcpdump_filters = modify_filter( tcpdump_filters, ipv4_in_hex(args.src), offset + V4_SRC_OFFSET, V4_ADDR_SIZE, encap_v6, ) if args.dst: tcpdump_filters = modify_filter( tcpdump_filters, ipv4_in_hex(args.dst), offset + V4_DST_OFFSET, V4_ADDR_SIZE, encap_v6, ) if args.proto: tcpdump_filters = modify_filter( tcpdump_filters, args.proto, offset + V4_PROTO_OFFSET, PROTO_SIZE, encap_v6, ) if args.sport: tcpdump_filters = modify_filter( tcpdump_filters, args.sport, offset + V4_HDR_SIZE + SPORT_OFFSET, PORT_SIZE, encap_v6, ) if args.dport: tcpdump_filters = modify_filter( tcpdump_filters, args.dport, offset + V4_HDR_SIZE + DPORT_OFFSET, PORT_SIZE, encap_v6, ) print('"(' + " and ".join(tcpdump_filters) + ')"') def parse_args(): parser = argparse.ArgumentParser( description=( "this is a tool which helps to create a filter to match " "fields from internal header of IPIP/GUE packet" ) ) parser.add_argument( "-m", "--mode", choices=["4", "6", "46"], help=( "mode of the filter. possible values: 4 (for ipip) " "6 (for ip6ip6), 46 (for ip4ip6)" ), ) parser.add_argument( "-g", "--gue", action="store_true", help=("run helper in GUE mode (instead of default IPIP)"), ) parser.add_argument( "-s", "--src", default=None, help="src ip address of internal packet. could be ipv4 or ipv6", ) parser.add_argument( "-d", "--dst", default=None, help="dst ip address of internal packet. could be ipv4 or ipv6", ) parser.add_argument( "-p", "--proto", default=None, type=int, help=( "protocol of internal packet. must be a number. " "e.g. 6 for tcp or 17 for udp" ), ) parser.add_argument( "--sport", default=None, type=int, help="src port of internal packet (e.g. if it's udp or tcp)", ) parser.add_argument( "--dport", default=None, type=int, help="dst port of internal packet (e.g. if it's udp or tcp)", ) return parser.parse_args() def main(): args = parse_args() offset = 0 if args.gue: offset += UDP_HDR_SIZE if args.mode == "4": offset += V4_HDR_SIZE create_tcpdump_line_v4(args, offset) elif args.mode == "6": offset += V6_HDR_SIZE create_tcpdump_line_v6(args, offset) elif args.mode == "46": offset += V6_HDR_SIZE create_tcpdump_line_v4(args, offset, True) if __name__ == "__main__": main()