def _validate_rrsig()

in functions/source/populate_NLB_TG_with_ALB/dns/dnssec.py [0:0]


def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
    """Validate an RRset against a single signature rdata

    The owner name of the rrsig is assumed to be the same as the owner name
    of the rrset.

    @param rrset: The RRset to validate
    @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
    tuple
    @param rrsig: The signature rdata
    @type rrsig: dns.rrset.Rdata
    @param keys: The key dictionary.
    @type keys: a dictionary keyed by dns.name.Name with node or rdataset
    values
    @param origin: The origin to use for relative names
    @type origin: dns.name.Name or None
    @param now: The time to use when validating the signatures.  The default
    is the current time.
    @type now: int
    """

    if isinstance(origin, string_types):
        origin = dns.name.from_text(origin, dns.name.root)

    for candidate_key in _find_candidate_keys(keys, rrsig):
        if not candidate_key:
            raise ValidationFailure('unknown key')

        # For convenience, allow the rrset to be specified as a (name,
        # rdataset) tuple as well as a proper rrset
        if isinstance(rrset, tuple):
            rrname = rrset[0]
            rdataset = rrset[1]
        else:
            rrname = rrset.name
            rdataset = rrset

        if now is None:
            now = time.time()
        if rrsig.expiration < now:
            raise ValidationFailure('expired')
        if rrsig.inception > now:
            raise ValidationFailure('not yet valid')

        hash = _make_hash(rrsig.algorithm)

        if _is_rsa(rrsig.algorithm):
            keyptr = candidate_key.key
            (bytes_,) = struct.unpack('!B', keyptr[0:1])
            keyptr = keyptr[1:]
            if bytes_ == 0:
                (bytes_,) = struct.unpack('!H', keyptr[0:2])
                keyptr = keyptr[2:]
            rsa_e = keyptr[0:bytes_]
            rsa_n = keyptr[bytes_:]
            keylen = len(rsa_n) * 8
            pubkey = Crypto.PublicKey.RSA.construct(
                (Crypto.Util.number.bytes_to_long(rsa_n),
                 Crypto.Util.number.bytes_to_long(rsa_e)))
            sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
        elif _is_dsa(rrsig.algorithm):
            keyptr = candidate_key.key
            (t,) = struct.unpack('!B', keyptr[0:1])
            keyptr = keyptr[1:]
            octets = 64 + t * 8
            dsa_q = keyptr[0:20]
            keyptr = keyptr[20:]
            dsa_p = keyptr[0:octets]
            keyptr = keyptr[octets:]
            dsa_g = keyptr[0:octets]
            keyptr = keyptr[octets:]
            dsa_y = keyptr[0:octets]
            pubkey = Crypto.PublicKey.DSA.construct(
                (Crypto.Util.number.bytes_to_long(dsa_y),
                 Crypto.Util.number.bytes_to_long(dsa_g),
                 Crypto.Util.number.bytes_to_long(dsa_p),
                 Crypto.Util.number.bytes_to_long(dsa_q)))
            (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
            sig = (Crypto.Util.number.bytes_to_long(dsa_r),
                   Crypto.Util.number.bytes_to_long(dsa_s))
        elif _is_ecdsa(rrsig.algorithm):
            if rrsig.algorithm == ECDSAP256SHA256:
                curve = ecdsa.curves.NIST256p
                key_len = 32
            elif rrsig.algorithm == ECDSAP384SHA384:
                curve = ecdsa.curves.NIST384p
                key_len = 48
            else:
                # shouldn't happen
                raise ValidationFailure('unknown ECDSA curve')
            keyptr = candidate_key.key
            x = Crypto.Util.number.bytes_to_long(keyptr[0:key_len])
            y = Crypto.Util.number.bytes_to_long(keyptr[key_len:key_len * 2])
            assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y)
            point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order)
            verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point,
                                                                      curve)
            pubkey = ECKeyWrapper(verifying_key, key_len)
            r = rrsig.signature[:key_len]
            s = rrsig.signature[key_len:]
            sig = ecdsa.ecdsa.Signature(Crypto.Util.number.bytes_to_long(r),
                                        Crypto.Util.number.bytes_to_long(s))
        else:
            raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)

        hash.update(_to_rdata(rrsig, origin)[:18])
        hash.update(rrsig.signer.to_digestable(origin))

        if rrsig.labels < len(rrname) - 1:
            suffix = rrname.split(rrsig.labels + 1)[1]
            rrname = dns.name.from_text('*', suffix)
        rrnamebuf = rrname.to_digestable(origin)
        rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
                              rrsig.original_ttl)
        rrlist = sorted(rdataset)
        for rr in rrlist:
            hash.update(rrnamebuf)
            hash.update(rrfixed)
            rrdata = rr.to_digestable(origin)
            rrlen = struct.pack('!H', len(rrdata))
            hash.update(rrlen)
            hash.update(rrdata)

        digest = hash.digest()

        if _is_rsa(rrsig.algorithm):
            # PKCS1 algorithm identifier goop
            digest = _make_algorithm_id(rrsig.algorithm) + digest
            padlen = keylen // 8 - len(digest) - 3
            digest = struct.pack('!%dB' % (2 + padlen + 1),
                                 *([0, 1] + [0xFF] * padlen + [0])) + digest
        elif _is_dsa(rrsig.algorithm) or _is_ecdsa(rrsig.algorithm):
            pass
        else:
            # Raise here for code clarity; this won't actually ever happen
            # since if the algorithm is really unknown we'd already have
            # raised an exception above
            raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)

        if pubkey.verify(digest, sig):
            return
    raise ValidationFailure('verify failure')