in connectors/utils.py [0:0]
def convert_to_b64(source, target=None, overwrite=False):
"""Converts a `source` file to base64 using the system's `base64`
When `target` is not provided, done in-place.
If `overwrite` is `True` and `target` exists, overwrites it.
If `False` and it exists, raises an `IOError`
If the `base64` utility could not be found, falls back to pure Python
using base64io.
This function blocks -- if you want to avoid blocking the event
loop, call it through `loop.run_in_executor`
Returns the target file.
"""
inplace = target is None
temp_target = f"{source}.b64"
if not inplace and not overwrite and os.path.exists(target):
msg = f"{target} already exists."
raise IOError(msg)
if _BASE64 is not None:
if platform.system() == "Darwin":
version = int(platform.mac_ver()[0].split(".")[0])
# MacOS 13 has changed base64 util
if version >= 13:
cmd = f"{_BASE64} -i {source} -o {temp_target}"
else:
cmd = f"{_BASE64} {source} > {temp_target}"
else:
# In Linuces, avoid line wrapping
cmd = f"{_BASE64} -w 0 {source} > {temp_target}"
logger.debug(f"Calling {cmd}")
subprocess.check_call(cmd, shell=True) # noqa S602
else:
# Pure Python version
with open(source, "rb") as sf, open(temp_target, "wb") as tf:
with Base64IO(tf) as encoded_target:
for line in sf:
encoded_target.write(line)
# success, let's move the file to the right place
if inplace:
os.remove(source)
os.rename(temp_target, source)
else:
if os.path.exists(target):
os.remove(target)
os.rename(temp_target, target)
return source if inplace else target