in VMEncryption/main/DiskUtil.py [0:0]
def get_crypt_items(self):
crypt_items = []
rootfs_crypt_item_found = False
if self.should_use_azure_crypt_mount():
with open(self.encryption_environment.azure_crypt_mount_config_path, 'r') as f:
for line in f.readlines():
if not line.strip():
continue
crypt_item = self.parse_azure_crypt_mount_line(line)
if crypt_item.mount_point == "/" or crypt_item.mapper_name == CommonVariables.osmapper_name:
rootfs_crypt_item_found = True
crypt_items.append(crypt_item)
else:
self.logger.log("Using crypttab instead of azure_crypt_mount file.")
crypttab_path = "/etc/crypttab"
fstab_items = []
with open("/etc/fstab", "r") as f:
for line in f.readlines():
fstab_device, fstab_mount_point = self.parse_fstab_line(line)
if fstab_device is not None:
fstab_items.append((fstab_device, fstab_mount_point))
if not os.path.exists(crypttab_path):
self.logger.log("{0} does not exist".format(crypttab_path))
else:
with open(crypttab_path, 'r') as f:
for line in f.readlines():
if not line.strip():
continue
crypt_item = self.parse_crypttab_line(line)
if crypt_item is None:
continue
if crypt_item.mapper_name == CommonVariables.osmapper_name:
rootfs_crypt_item_found = True
for device_path, mount_path in fstab_items:
if crypt_item.mapper_name in device_path:
crypt_item.mount_point = mount_path
crypt_items.append(crypt_item)
encryption_status = json.loads(self.get_encryption_status())
if encryption_status["os"] == "Encrypted" and not rootfs_crypt_item_found:
crypt_item = CryptItem()
crypt_item.mapper_name = CommonVariables.osmapper_name
proc_comm = ProcessCommunicator()
grep_result = self.command_executor.ExecuteInBash("cryptsetup status {0} | grep device:".format(crypt_item.mapper_name), communicator=proc_comm)
if grep_result == 0:
crypt_item.dev_path = proc_comm.stdout.strip().split()[1]
else:
proc_comm = ProcessCommunicator()
self.command_executor.Execute("dmsetup table --target crypt", communicator=proc_comm)
for line in proc_comm.stdout.splitlines():
if crypt_item.mapper_name in line:
majmin = filter(lambda p: re.match(r'\d+:\d+', p), line.split())[0]
src_device = filter(lambda d: d.majmin == majmin, self.get_device_items(None))[0]
crypt_item.dev_path = '/dev/' + src_device.name
break
rootfs_dev = next((m for m in self.get_mount_items() if m["dest"] == "/"))
crypt_item.file_system = rootfs_dev["fs"]
if not crypt_item.dev_path:
raise Exception("Could not locate block device for rootfs")
crypt_item.luks_header_path = "/boot/luks/osluksheader"
if not os.path.exists(crypt_item.luks_header_path):
crypt_item.luks_header_path = crypt_item.dev_path
crypt_item.mount_point = "/"
crypt_item.uses_cleartext_key = False
crypt_item.current_luks_slot = -1
crypt_items.append(crypt_item)
return crypt_items