in src/key_server_common.py [0:0]
def fill_request(self):
"""
Fill the XML document with data about the requested keys.
"""
content_id = self.get_content_id()
# self.use_playready_content_key = False
system_ids = {}
# check whether to perform CPIX 2.0 document encryption
encrypted_response_recipients = self.root.findall("./{urn:dashif:org:cpix}DeliveryDataList/{urn:dashif:org:cpix}DeliveryData")
if encrypted_response_recipients:
print("ENCRYPTED-RESPONSE")
# generate a random document key and HMAC key
self.document_key = secrets.token_bytes(DOCUMENT_KEY_SIZE)
self.hmac_key = secrets.token_bytes(HMAC_KEY_SIZE)
backend = default_backend()
for delivery_data in encrypted_response_recipients:
delivery_key = delivery_data.find("./{urn:dashif:org:cpix}DeliveryKey")
x509data = delivery_key.find("./{http://www.w3.org/2000/09/xmldsig#}X509Data")
x509cert = x509data.find("./{http://www.w3.org/2000/09/xmldsig#}X509Certificate")
cert = x509.load_der_x509_certificate(base64.b64decode(x509cert.text), backend)
public_key = cert.public_key()
self.public_key = x509cert.text
asym_padder = asym_padding.OAEP(mgf=asym_padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None)
# encrypt the document and HMAC keys using the x509 public key
encoded_document_key = public_key.encrypt(self.document_key, asym_padder)
encoded_hmac_key = public_key.encrypt(self.hmac_key, asym_padder)
# insert document key
document_key_leaf = element_tree.SubElement(delivery_data, "{urn:dashif:org:cpix}DocumentKey")
document_key_leaf.set("Algorithm", "http://www.w3.org/2001/04/xmlenc#aes256-cbc")
data_leaf = element_tree.SubElement(document_key_leaf, "{urn:dashif:org:cpix}Data")
secret_leaf = element_tree.SubElement(data_leaf, "{urn:ietf:params:xml:ns:keyprov:pskc}Secret")
self.insert_encrypted_value(secret_leaf, "http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p", base64.b64encode(encoded_document_key).decode('utf-8'))
# insert HMAC key
mac_method = element_tree.SubElement(delivery_data, "{urn:dashif:org:cpix}MACMethod")
mac_method.set("Algorithm", "http://www.w3.org/2001/04/xmldsig-more#hmac-sha512")
mac_method_key = element_tree.SubElement(mac_method, "{urn:dashif:org:cpix}Key")
self.insert_encrypted_value(mac_method_key, "http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p", base64.b64encode(encoded_hmac_key).decode('utf-8'))
else:
print("CLEAR-RESPONSE")
for drm_system in self.root.findall("./{urn:dashif:org:cpix}DRMSystemList/{urn:dashif:org:cpix}DRMSystem"):
kid = drm_system.get("kid")
system_id = drm_system.get("systemId")
system_ids[system_id] = kid
print("SYSTEM-ID {}".format(system_id.lower()))
self.fixup_document(drm_system, system_id, content_id, kid)
for content_key in self.root.findall("./{urn:dashif:org:cpix}ContentKeyList/{urn:dashif:org:cpix}ContentKey"):
kid = content_key.get("kid")
init_vector = content_key.get("explicitIV")
data = element_tree.SubElement(content_key, "{urn:dashif:org:cpix}Data")
secret = element_tree.SubElement(data, "{urn:ietf:params:xml:ns:keyprov:pskc}Secret")
# HLS SAMPLE AES Only
if init_vector is None and system_ids.get(HLS_SAMPLE_AES_SYSTEM_ID, False) == kid:
content_key.set('explicitIV', base64.b64encode(self.generator.key(content_id, kid)).decode('utf-8'))
# generate the key
key_bytes = self.generator.key(content_id, kid)
# store to the key in the cache
self.cache.store(content_id, kid, key_bytes)
# log
print("NEW-KEY {} {}".format(content_id, kid))
# update the encrypted response
if encrypted_response_recipients:
# store the key encrypted
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(key_bytes) + padder.finalize()
random_iv = secrets.token_bytes(RANDOM_IV_SIZE)
cipher = Cipher(algorithms.AES(self.document_key), modes.CBC(random_iv), backend=backend)
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
cipher_data = random_iv + encrypted_data
encrypted_string = base64.b64encode(cipher_data).decode('utf-8')
self.insert_encrypted_value(secret, "http://www.w3.org/2001/04/xmlenc#aes256-cbc", encrypted_string)
else:
plain_value = element_tree.SubElement(secret, "{urn:ietf:params:xml:ns:keyprov:pskc}PlainValue")
# PLAYREADY ONLY
if self.use_playready_content_key:
plain_value.text = PLAYREADY_CONTENT_KEY
else:
plain_value.text = base64.b64encode(key_bytes).decode('utf-8')