def srcNatAclTranslationDisableTest()

in ptf/sainat.py [0:0]


    def srcNatAclTranslationDisableTest(self):
        '''
        Verifies if translation doesn't occur when source NAT entry exists
        but ACL is configured to disable NAT translation.
        Test is performed for different RIFs (regular L3 port, L3 LAG and SVI)
        and different nexhtops (regular L3 port, L3 LAG and SVI)
        with two kinds of packets - TCP and UDP.
        '''
        print("\nsrcNatAclTranslationDisableTest()")

        src_ip = "20.20.20.1"
        nat_src_ip = "150.10.10.10"

        def verify_translation(src_rif, src_port_dev):
            '''
            Additional helper function for translation verification.
            Verifies if translation doesn't occur when ACL is configured
            to disable it.

            Args:
                src_rif (oid): object ID of source RIF
                src_port_dev (int): source device port number
            '''
            acl_counter = sai_thrift_get_acl_counter_attribute(
                self.client, self.ingr_acl_counter, packets=True)['packets']

            # use route to L3 port
            print("   -> Egress L3 port")
            tcp_pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
                                        ip_src=src_ip,
                                        ip_dst=self.port_nbor_ip,
                                        ip_ttl=64,
                                        pktlen=100,
                                        with_tcp_chksum=True)
            nat_tcp_pkt = simple_tcp_packet(eth_dst=self.port_nbor_mac,
                                            eth_src=ROUTER_MAC,
                                            ip_src=nat_src_ip,
                                            ip_dst=self.port_nbor_ip,
                                            ip_ttl=63,
                                            pktlen=100,
                                            with_tcp_chksum=True)
            no_nat_tcp_pkt = simple_tcp_packet(eth_dst=self.port_nbor_mac,
                                               eth_src=ROUTER_MAC,
                                               ip_src=src_ip,
                                               ip_dst=self.port_nbor_ip,
                                               ip_ttl=63,
                                               pktlen=100,
                                               with_tcp_chksum=True)

            udp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC,
                                        ip_src=src_ip,
                                        ip_dst=self.port_nbor_ip,
                                        ip_ttl=64,
                                        pktlen=100)
            nat_udp_pkt = simple_udp_packet(eth_dst=self.port_nbor_mac,
                                            eth_src=ROUTER_MAC,
                                            ip_src=nat_src_ip,
                                            ip_dst=self.port_nbor_ip,
                                            ip_ttl=63,
                                            pktlen=100)
            no_nat_udp_pkt = simple_udp_packet(eth_dst=self.port_nbor_mac,
                                               eth_src=ROUTER_MAC,
                                               ip_src=src_ip,
                                               ip_dst=self.port_nbor_ip,
                                               ip_ttl=63,
                                               pktlen=100)

            print("Sending TCP packet with NAT enabled")
            send_packet(self, src_port_dev, tcp_pkt)
            verify_packet(self, nat_tcp_pkt, self.egr_port_dev)
            self.assertTrue(self._verifyNatHit(snat))
            print("\tOK")

            print("Sending UDP packet with NAT enabled")
            send_packet(self, src_port_dev, udp_pkt)
            verify_packet(self, nat_udp_pkt, self.egr_port_dev)
            self.assertTrue(self._verifyNatHit(snat))
            print("\tOK")

            print("Disabling NAT on src RIF")
            sai_thrift_set_router_interface_attribute(
                self.client, src_rif, ingress_acl=self.ingr_acl_table)

            print("Sending TCP packet with NAT disabled by ACL")
            send_packet(self, src_port_dev, tcp_pkt)
            verify_packet(self, no_nat_tcp_pkt, self.egr_port_dev)
            self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
                                                   acl_counter))
            acl_counter += 1
            print("\tOK")

            print("Sending UDP packet with NAT disabled by ACL")
            send_packet(self, src_port_dev, udp_pkt)
            verify_packet(self, no_nat_udp_pkt, self.egr_port_dev)
            self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
                                                   acl_counter))
            acl_counter += 1
            print("\tOK")

            print("Enabling NAT on src RIF")
            sai_thrift_set_router_interface_attribute(
                self.client, src_rif, ingress_acl=0)

            # use route to L3 LAG
            print("   -> Egress L3 LAG")
            tcp_pkt[IP].dst = self.lag_nbor_ip

            nat_tcp_pkt[Ether].dst = self.lag_nbor_mac
            nat_tcp_pkt[IP].dst = self.lag_nbor_ip
            no_nat_tcp_pkt[Ether].dst = self.lag_nbor_mac
            no_nat_tcp_pkt[IP].dst = self.lag_nbor_ip

            udp_pkt[IP].dst = self.lag_nbor_ip
            nat_udp_pkt[Ether].dst = self.lag_nbor_mac
            nat_udp_pkt[IP].dst = self.lag_nbor_ip
            no_nat_udp_pkt[Ether].dst = self.lag_nbor_mac
            no_nat_udp_pkt[IP].dst = self.lag_nbor_ip

            print("Sending TCP packet with NAT enabled")
            send_packet(self, src_port_dev, tcp_pkt)
            verify_packet_any_port(self, nat_tcp_pkt, self.egr_lag_dev)
            self.assertTrue(self._verifyNatHit(snat))
            print("\tOK")

            print("Sending UDP packet with NAT enabled")
            send_packet(self, src_port_dev, udp_pkt)
            verify_packet_any_port(self, nat_udp_pkt, self.egr_lag_dev)
            self.assertTrue(self._verifyNatHit(snat))
            print("\tOK")

            print("Disabling NAT on src RIF")
            sai_thrift_set_router_interface_attribute(
                self.client, src_rif, ingress_acl=self.ingr_acl_table)

            print("Sending TCP packet with NAT disabled by ACL")
            send_packet(self, src_port_dev, tcp_pkt)
            verify_packet_any_port(self, no_nat_tcp_pkt, self.egr_lag_dev)
            self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
                                                   acl_counter))
            acl_counter += 1
            print("\tOK")

            print("Sending UDP packet with NAT disabled by ACL")
            send_packet(self, src_port_dev, udp_pkt)
            verify_packet_any_port(self, no_nat_udp_pkt, self.egr_lag_dev)
            self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
                                                   acl_counter))
            acl_counter += 1
            print("\tOK")

            print("Enabling NAT on src RIF")
            sai_thrift_set_router_interface_attribute(
                self.client, src_rif, ingress_acl=0)

            # use route to SVI
            print("   -> Egress SVI")
            tcp_pkt[IP].dst = self.svi_nbor_ip

            nat_tcp_pkt[Ether].dst = self.svi_nbor_mac
            nat_tcp_pkt[IP].dst = self.svi_nbor_ip
            no_nat_tcp_pkt[Ether].dst = self.svi_nbor_mac
            no_nat_tcp_pkt[IP].dst = self.svi_nbor_ip

            udp_pkt[IP].dst = self.svi_nbor_ip
            nat_udp_pkt[Ether].dst = self.svi_nbor_mac
            nat_udp_pkt[IP].dst = self.svi_nbor_ip
            no_nat_udp_pkt[Ether].dst = self.svi_nbor_mac
            no_nat_udp_pkt[IP].dst = self.svi_nbor_ip

            print("Sending TCP packet with NAT enabled")
            send_packet(self, src_port_dev, tcp_pkt)
            verify_packets(self, nat_tcp_pkt, self.egr_svi_dev)
            self.assertTrue(self._verifyNatHit(snat))
            print("\tOK")

            print("Sending UDP packet with NAT enabled")
            send_packet(self, src_port_dev, udp_pkt)
            verify_packets(self, nat_udp_pkt, self.egr_svi_dev)
            self.assertTrue(self._verifyNatHit(snat))
            print("\tOK")

            print("Disabling NAT on src RIF")
            sai_thrift_set_router_interface_attribute(
                self.client, src_rif, ingress_acl=self.ingr_acl_table)

            print("Sending TCP packet with NAT disabled by ACL")
            send_packet(self, src_port_dev, tcp_pkt)
            verify_packets(self, no_nat_tcp_pkt, self.egr_svi_dev)
            self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
                                                   acl_counter))
            acl_counter += 1
            print("\tOK")

            print("Sending UDP packet with NAT disabled by ACL")
            send_packet(self, src_port_dev, udp_pkt)
            verify_packets(self, no_nat_udp_pkt, self.egr_svi_dev)
            self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
                                                   acl_counter))
            acl_counter += 1
            print("\tOK")

            print("Enabling NAT on src RIF")
            sai_thrift_set_router_interface_attribute(
                self.client, src_rif, ingress_acl=0)

        try:
            nat_data = sai_thrift_nat_entry_data_t(
                key=sai_thrift_nat_entry_key_t(
                    src_ip=src_ip),
                mask=sai_thrift_nat_entry_mask_t(
                    src_ip='255.255.255.255'))

            snat = sai_thrift_nat_entry_t(vr_id=self.default_vrf,
                                          data=nat_data,
                                          nat_type=SAI_NAT_TYPE_SOURCE_NAT)
            sai_thrift_create_nat_entry(self.client,
                                        snat,
                                        src_ip=nat_src_ip,
                                        nat_type=SAI_NAT_TYPE_SOURCE_NAT,
                                        enable_packet_count=True)

            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_port_rif, nat_zone_id=0)
            sai_thrift_set_router_interface_attribute(
                self.client, self.egr_port_rif, nat_zone_id=1)

            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_lag_rif, nat_zone_id=0)
            sai_thrift_set_router_interface_attribute(
                self.client, self.egr_lag_rif, nat_zone_id=1)

            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_svi_rif, nat_zone_id=0)
            sai_thrift_set_router_interface_attribute(
                self.client, self.egr_svi_rif, nat_zone_id=1)

            print("\n***Ingress L3 port***")
            verify_translation(self.ingr_port_rif, self.ingr_port_dev)

            print("\n***Ingress L3 LAG***")
            for lag_port in self.ingr_lag_dev:
                verify_translation(self.ingr_lag_rif, lag_port)

            print("\n***Ingress SVI***")
            for svi_port in self.ingr_svi_dev:
                verify_translation(self.ingr_svi_rif, svi_port)

        finally:
            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_svi_rif, ingress_acl=0)
            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_lag_rif, ingress_acl=0)
            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_port_rif, ingress_acl=0)

            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_svi_rif, nat_zone_id=0)
            sai_thrift_set_router_interface_attribute(
                self.client, self.egr_svi_rif, nat_zone_id=0)

            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_lag_rif, nat_zone_id=0)
            sai_thrift_set_router_interface_attribute(
                self.client, self.egr_lag_rif, nat_zone_id=0)

            sai_thrift_set_router_interface_attribute(
                self.client, self.ingr_port_rif, nat_zone_id=0)
            sai_thrift_set_router_interface_attribute(
                self.client, self.egr_port_rif, nat_zone_id=0)

            sai_thrift_remove_nat_entry(self.client, snat)