in src/blobfuse-launcher/blobfuse-launcher.py [0:0]
def main():
args = parse_args()
log_args(logger, args)
# Wait for SKR sidecar to be available as the secrets sidecar will invoke it.
utilities.wait_for_services_readiness(
logger,
tracer,
[args.otel_collector_port, args.imds_port, args.skr_port, args.secrets_port],
)
logger.info(
f"Releasing key '{args.kid}' from Key vault '{args.akv_endpoint}' using MAA '{args.maa_endpoint}'"
)
encryption_key = utilities.unwrap_secret(
logger,
tracer,
args.secrets_port,
args.client_id,
args.tenant_id,
args.wrapped_dek_secret,
args.wrapped_dek_akv_endpoint,
args.kid,
args.akv_endpoint,
args.maa_endpoint,
)
# Create directories if they don't exist.
os.makedirs(args.mount_path, exist_ok=True)
os.makedirs("/tmp/blobfuse_tmp", exist_ok=True)
encryption_key_base64 = base64.standard_b64encode(encryption_key).decode()
os.environ["AZURE_STORAGE_AUTH_TYPE"] = "msi"
os.environ["MSI_ENDPOINT"] = (
f"http://localhost:{args.imds_port}/metadata/identity/{args.tenant_id}/{args.client_id}/oauth2/token"
)
logger.info(
f"Starting blobfuse mount at '{args.mount_path}',"
+ f"Read Only: '{args.read_only}',"
+ f"encryption mode: '{args.custom_encryption_mode}'"
)
if args.custom_encryption_mode == "CPK":
# Hash the byte array
sha256_hash = hashlib.sha256(encryption_key).digest()
encryption_key_sha256 = base64.b64encode(sha256_hash).decode("utf-8")
os.environ["AZURE_STORAGE_CPK_ENCRYPTION_KEY"] = encryption_key_base64
os.environ["AZURE_STORAGE_CPK_ENCRYPTION_KEY_SHA256"] = encryption_key_sha256
returncode = utilities.launch_blobfuse(
logger,
tracer,
args.mount_path,
args.read_only,
args.sub_directory,
args.use_adls,
True,
telemetry_path,
)
elif args.custom_encryption_mode == "None":
returncode = utilities.launch_blobfuse(
logger,
tracer,
args.mount_path,
args.read_only,
args.sub_directory,
args.use_adls,
False,
telemetry_path,
)
else:
os.environ["ENCRYPTION_KEY"] = encryption_key_base64
returncode = utilities.launch_blobfuse_encrypted(
logger, tracer, args.mount_path, args.read_only, telemetry_path
)
logger.info(f"Blobfuse process returncode: {returncode}")
# Create a marker file for other containers that are waiting for the mount point to be
# available.
if returncode == 0:
with open(
os.path.join(volumestatus_path, f"{access_name}.volume.ready"), "w"
) as f:
f.write(json.dumps({"mount_path": args.mount_path}))
f.close()
# TODO (HPrabh): Handle SIGTERM.
os.system("sleep infinity")
else:
trace.get_current_span().set_status(
status=trace.StatusCode.ERROR,
description=f"Blobfuse process returncode: {returncode}",
)
# Non zero return code from blobfuse. Record error.
with open(
os.path.join(volumestatus_path, f"{access_name}.volume.error"), "w"
) as f:
f.write(json.dumps({"error_code": returncode}))
f.close()
sys.exit(returncode)