azext_edge/edge/util/x509.py (66 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
"""
x509: certificate utilities.
"""
from datetime import datetime, timedelta, timezone
from typing import Optional, Tuple
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import NameOID
from knack.log import get_logger
# aka prime256v1
DEFAULT_EC_ALGO = ec.SECP256R1()
DEFAULT_VALID_DAYS = 365
logger = get_logger(__name__)
def generate_self_signed_cert(valid_days: int = DEFAULT_VALID_DAYS) -> Tuple[bytes, bytes]:
if not valid_days or valid_days < 0:
valid_days = DEFAULT_VALID_DAYS
# Not using DEFAULT_EC_ALGO due CodeQL issue parsing private key algo
key = ec.generate_private_key(curve=ec.SECP256R1(), backend=default_backend())
key_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, "Azure IoT Operations Quickstart Root CA - Not for Production"),
]
)
public_key = key.public_key()
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(timezone.utc))
.not_valid_after(datetime.now(timezone.utc) + timedelta(days=valid_days))
.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)
.add_extension(
x509.KeyUsage(
key_cert_sign=True,
digital_signature=False,
crl_sign=False,
content_commitment=False,
data_encipherment=False,
decipher_only=False,
encipher_only=False,
key_agreement=False,
key_encipherment=False,
),
critical=False,
)
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(public_key),
critical=False,
)
.sign(key, hashes.SHA256())
)
return (cert.public_bytes(serialization.Encoding.PEM), key_bytes)
def decode_der_certificate(der_data: bytes) -> Optional[x509.Certificate]:
# Decodes a DER-encoded X.509 certificate.
try:
cert = x509.load_der_x509_certificate(der_data, default_backend())
return cert
except Exception as e:
logger.debug(f"Error decoding DER certificate: {e}")
return