in azurelinuxagent/common/protocol/goal_state.py [0:0]
def __init__(self, xml_text, my_logger):
self.cert_list = CertList()
self.summary = [] # debugging info
self.warnings = []
# Save the certificates
local_file = os.path.join(conf.get_lib_dir(), CERTS_FILE_NAME)
fileutil.write_file(local_file, xml_text)
# Separate the certificates into individual files.
xml_doc = parse_doc(xml_text)
data = findtext(xml_doc, "Data")
if data is None:
return
# if the certificates format is not Pkcs7BlobWithPfxContents do not parse it
certificate_format = findtext(xml_doc, "Format")
if certificate_format and certificate_format != "Pkcs7BlobWithPfxContents":
message = "The Format is not Pkcs7BlobWithPfxContents. Format is {0}".format(certificate_format)
my_logger.warn(message)
add_event(op=WALAEventOperation.GoalState, message=message)
return
cryptutil = CryptUtil(conf.get_openssl_cmd())
p7m_file = os.path.join(conf.get_lib_dir(), P7M_FILE_NAME)
p7m = ("MIME-Version:1.0\n" # pylint: disable=W1308
"Content-Disposition: attachment; filename=\"{0}\"\n"
"Content-Type: application/x-pkcs7-mime; name=\"{1}\"\n"
"Content-Transfer-Encoding: base64\n"
"\n"
"{2}").format(p7m_file, p7m_file, data)
fileutil.write_file(p7m_file, p7m)
trans_prv_file = os.path.join(conf.get_lib_dir(), TRANSPORT_PRV_FILE_NAME)
trans_cert_file = os.path.join(conf.get_lib_dir(), TRANSPORT_CERT_FILE_NAME)
pem_file = os.path.join(conf.get_lib_dir(), PEM_FILE_NAME)
# decrypt certificates
cryptutil.decrypt_p7m(p7m_file, trans_prv_file, trans_cert_file, pem_file)
# The parsing process use public key to match prv and crt.
buf = []
prvs = {}
thumbprints = {}
index = 0
v1_cert_list = []
# Ensure pem_file exists before read the certs data since decrypt_p7m may clear the pem_file wen decryption fails
if os.path.exists(pem_file):
with open(pem_file) as pem:
for line in pem.readlines():
buf.append(line)
if re.match(r'[-]+END.*KEY[-]+', line):
tmp_file = Certificates._write_to_tmp_file(index, 'prv', buf)
pub = cryptutil.get_pubkey_from_prv(tmp_file)
prvs[pub] = tmp_file
buf = []
index += 1
elif re.match(r'[-]+END.*CERTIFICATE[-]+', line):
tmp_file = Certificates._write_to_tmp_file(index, 'crt', buf)
pub = cryptutil.get_pubkey_from_crt(tmp_file)
thumbprint = cryptutil.get_thumbprint_from_crt(tmp_file)
thumbprints[pub] = thumbprint
# Rename crt with thumbprint as the file name
crt = "{0}.crt".format(thumbprint)
v1_cert_list.append({
"name": None,
"thumbprint": thumbprint
})
os.rename(tmp_file, os.path.join(conf.get_lib_dir(), crt))
buf = []
index += 1
# Rename prv key with thumbprint as the file name
for pubkey in prvs:
thumbprint = thumbprints[pubkey]
if thumbprint:
tmp_file = prvs[pubkey]
prv = "{0}.prv".format(thumbprint)
os.rename(tmp_file, os.path.join(conf.get_lib_dir(), prv))
else:
# Since private key has *no* matching certificate,
# it will not be named correctly
self.warnings.append("Found NO matching cert/thumbprint for private key!")
for pubkey, thumbprint in thumbprints.items():
has_private_key = pubkey in prvs
self.summary.append({"thumbprint": thumbprint, "hasPrivateKey": has_private_key})
for v1_cert in v1_cert_list:
cert = Cert()
set_properties("certs", cert, v1_cert)
self.cert_list.certificates.append(cert)