in webserver_pkg/webserver_pkg/ssh_api.py [0:0]
def ssh_reset():
"""API called to execute commands to reset the ssh password with the new one.
Returns:
dict: Execution status if the API call was successful and with error
reason if failed.
"""
webserver_node = webserver_publisher_node.get_webserver_node()
webserver_node.get_logger().info("User requesting for resetting the ssh credentials")
try:
old_ssh_password = request.json["oldPassword"]
new_ssh_password = request.json["newPassword"]
# Check if old password is correct
if not pam.pam().authenticate(DEFAULT_USER, old_ssh_password):
return jsonify(success=False,
reason="The password is incorrect. "
"Provide your current password.")
# Check if the default password is updated
passwd_status_cmd = f"/usr/bin/passwd --status {DEFAULT_USER}"
passwd_status_output = utility.execute(passwd_status_cmd, shlex_split=True)[1]
# Execute passwd command based on the status of the password obtained above
if passwd_status_output.split()[1] == "P":
webserver_node.get_logger().info("Usable Password present")
if (utility.execute(f"/usr/bin/sudo -u {DEFAULT_USER} /usr/bin/passwd",
input_str=(f"{old_ssh_password}"
f"\n{new_ssh_password}"
f"\n{new_ssh_password}"),
shlex_split=True)[1]
.find("successfully")) == -1:
return jsonify(success=False, reason="The password update was unsuccessful.")
else:
return jsonify(success=True)
elif passwd_status_output.split()[1] == "NP":
webserver_node.get_logger().info("No Password present")
if (utility.execute(f"/usr/bin/passwd {DEFAULT_USER}",
input_str=(f"{new_ssh_password}\n"
f"{new_ssh_password}"),
shlex_split=True)[1]
.find("successfully")) > -1:
return jsonify(success=True)
else:
return jsonify(success=False, reason="Unable to change the SSH password")
except Exception:
return jsonify(success=False, reason="Error")