in src/datamigration/scripts/teradata/agent_controller/controller.py [0:0]
def _setup_agent(self):
"""
Setups directory and configurations for a new transfer run
Returns:
None
"""
agent_config = base64.b64decode(self.data["params"]["agent_config"]).decode(
"utf-8"
)
agent_config = json.loads(agent_config)
# creates transfer run directory
_LOGGER.info(f"Creating directory: {self.transfer_run_dir}")
os.makedirs(self.transfer_run_dir, exist_ok=True)
# creates credential file for transfer run
db_username = agent_config["teradata-config"]["connection"]["username"]
if "secret_resource_id" in agent_config["teradata-config"]["connection"]:
secret_resource_id = agent_config["teradata-config"]["connection"][
"secret_resource_id"
]
else:
password = agent_config["teradata-config"]["connection"]["password"]
if password.startswith(SECRET_PREFIX):
secret_resource_id = (
f"projects/{_PROJECT_ID}/secrets/{password}/versions/latest"
)
else:
secret_resource_id = None
if secret_resource_id:
cred_file_content = (
f"username={db_username}\nsecret_resource_id={secret_resource_id}"
)
else:
cred_file_content = f"username={db_username}\npassword={password}"
cred_file = os.path.join(self.transfer_run_dir, "credentials")
_LOGGER.info(f"Creating credential file: {cred_file}")
with open(cred_file, "w") as f:
f.write(cred_file_content)
# create agent config file for transfer run
agent_config_file = os.path.join(
self.transfer_run_dir, f"{self.transfer_id}.json"
)
agent_config["teradata-config"]["database-credentials-file-path"] = cred_file
_LOGGER.info(f"Creating agent config file: {agent_config_file}")
with open(agent_config_file, "w") as f:
json.dump(agent_config, f)