def validate_route_nexthops()

in openr/py/openr/cli/utils/utils.py [0:0]


def validate_route_nexthops(routes, interfaces, sources, quiet: bool = False):
    """
    Validate between fib routes and lm interfaces

    :param routes: list network_types.UnicastRoute (structured routes)
    :param interfaces: dict<interface-name, InterfaceDetail>
    """

    # record invalid routes in dict<error, list<route_db>>
    invalid_routes = defaultdict(list)

    # define error types
    MISSING_NEXTHOP = "Nexthop does not exist"
    INVALID_SUBNET = "Nexthop address is not in the same subnet as interface"
    INVALID_LINK_LOCAL = "Nexthop address is not link local"

    # return error type
    error_msg = []

    for route in routes:
        dest = ipnetwork.sprint_prefix(route.dest)
        # record invalid nexthops in dict<error, list<nexthops>>
        invalid_nexthop = defaultdict(list)
        for nextHop in route.nextHops:
            nh = nextHop.address

            # if nexthop addr is v6 link-local, then ifName must be specified
            if (
                ipnetwork.ip_version(nh.addr) == 6
                and ipnetwork.is_link_local(nh.addr)
                and not nh.ifName
            ):
                invalid_nexthop[INVALID_LINK_LOCAL].append(nextHop)

            # next-hop can be empty for other types. Skip if it is the case
            if nh.ifName is None:
                continue

            if nh.ifName not in interfaces or not interfaces[nh.ifName].info.isUp:
                invalid_nexthop[MISSING_NEXTHOP].append(nextHop)
                continue
            # if nexthop addr is v4, make sure it belongs to same subnets as
            # interface addr
            if ipnetwork.ip_version(nh.addr) == 4:
                networks = interfaces[nh.ifName].info.networks
                if networks is None:
                    # maintain backward compatbility
                    networks = []
                for prefix in networks:
                    if ipnetwork.ip_version(
                        prefix.prefixAddress.addr
                    ) == 4 and not ipnetwork.is_same_subnet(
                        nh.addr, prefix.prefixAddress.addr, "31"
                    ):
                        invalid_nexthop[INVALID_SUBNET].append(nextHop)

        # build routes per error type
        for k, v in invalid_nexthop.items():
            invalid_routes[k].append(
                network_types.UnicastRoute(dest=route.dest, nextHops=v)
            )

    # if all good, then return early
    if not invalid_routes:
        if not quiet:
            if is_color_output_supported():
                click.echo(click.style("PASS", bg="green", fg="black"))
            else:
                click.echo("PASS")
            print("Route validation successful")
        return True, error_msg

    # Something failed.. report it
    if not quiet:
        if is_color_output_supported():
            click.echo(click.style("FAIL", bg="red", fg="black"))
        else:
            click.echo("FAIL")
        print("Route validation failed")
    # Output report per error type
    for err, route_db in invalid_routes.items():
        caption = "Error: {}".format(err)
        if not quiet:
            print_unicast_routes(caption, route_db)
        else:
            error_msg.append(caption)

    return False, error_msg