components/authentication/scripts/user_tool.py (140 lines of code) (raw):
"""
Copyright 2023 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# pylint: disable = broad-exception-raised, broad-exception-caught
import os
import subprocess
import requests
import argparse
import getpass
import warnings
import secrets
from common.models import User
from google.cloud import secretmanager
PROJECT_ID = os.environ.get("PROJECT_ID")
assert PROJECT_ID, "PROJECT_ID is not set."
AUTH_API_PATH = "authentication/api/v1"
DEFAULT_BOT_USERNAME = "mira-bot@google.com"
LLM_BACKEND_ROBOT_USERNAME = "llm-backend-robot-username"
LLM_BACKEND_ROBOT_PASSWORD = "llm-backend-robot-password"
USER_DATA = {
"first_name": "test",
"last_name": "test",
"status": "active",
"user_type": "learner",
"user_groups": [],
"is_registered": True,
"failed_login_attempts_count": 0,
"access_api_docs": True,
"gaia_id": "fake-gaia-id",
}
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
sm_secrets = secretmanager.SecretManagerServiceClient()
def execute_command(command):
output = subprocess.check_output(command, shell=True, text=True)
return output.strip()
def get_input(prompt):
input_value = None
while not input_value:
input_value = input(prompt)
return input_value
def add_secret(secret_name, secret_value) -> str:
parent = f"projects/{PROJECT_ID}/secrets/{secret_name}"
payload = secret_value.encode("utf-8")
version = sm_secrets.add_secret_version(
request={"parent": parent, "payload": {"data": payload}}
)
return str(version)
def get_secret_value(secret_name) -> str:
secret_value = sm_secrets.access_secret_version(
request={"name": f"projects/{PROJECT_ID}" +
f"/secrets/{secret_name}/versions/latest"}
).payload.data.decode("utf-8")
return secret_value.strip()
def create_bot_account() -> (str, str):
# Get llm-backend robot username
try:
username = get_secret_value(LLM_BACKEND_ROBOT_USERNAME)
except Exception:
username = DEFAULT_BOT_USERNAME
add_secret(LLM_BACKEND_ROBOT_USERNAME, username)
# Get llm-backend robot password
try:
password = get_secret_value(LLM_BACKEND_ROBOT_PASSWORD)
except Exception:
password = secrets.token_urlsafe(16)
add_secret(LLM_BACKEND_ROBOT_PASSWORD, password)
return username, password
def create_user(user_email, user_password, base_url=None) -> None:
# Create user in firebase
input_user = {**USER_DATA, "email": user_email}
user = User.from_dict(input_user)
user.user_id = ""
user.save()
req_body = {
"email": user_email,
"password": user_password
}
url = f"{base_url}/{AUTH_API_PATH}/sign-up/credentials"
sign_up_req = requests.post(url, json=req_body, verify=False, timeout=10)
sign_up_res = sign_up_req.json()
# If returns 200, the user was created successfully. Print the token then.
if sign_up_req.status_code == 200:
print(f"User '{user_email}' created successfully. ID Token:\n")
print(sign_up_res["data"]["idToken"])
print()
# If the user already exists, sign in the user and get the token.
elif (sign_up_req.status_code == 422 and
sign_up_res.get("message") == "EMAIL_EXISTS"):
print(f"User with {user_email} already exists. Trying log in")
login_user(user_email, user_password, base_url=base_url)
else:
print(f"Sign up error. Status: {sign_up_req.status_code}")
print(sign_up_res)
def login_user(user_email, user_password, base_url=None) -> None:
req_body = {
"email": user_email,
"password": user_password
}
url = f"{base_url}/{AUTH_API_PATH}/sign-in/credentials"
sign_in_req = requests.post(url, json=req_body, verify=False, timeout=10)
sign_in_res = sign_in_req.json()
if sign_in_res is None or sign_in_res["data"] is None:
print("User signed in fail", sign_in_req.text)
raise Exception("User sign-in failed")
print(f"Signed in with existing user '{user_email}'. ID Token:\n")
id_token = sign_in_res["data"]["idToken"]
print(id_token)
print()
def main():
parser = argparse.ArgumentParser()
help_str = "Main action: [create_user|get_token|create_bot_account]"
parser.add_argument("action", type=str, help=help_str)
parser.add_argument("--base-url", type=str, help="API base URL")
args = parser.parse_args()
if not args.base_url:
base_url = get_input("Provide API base URL (e.g. http://127.0.0.1/): ")
else:
base_url = args.base_url
print(f"API base URL: {base_url}")
assert base_url, "base_url is empty."
# get default user email
user_email = execute_command(
"gcloud config list account --format 'value(core.account)' | head -n 1")
if args.action == "create_user":
user_email = input(f"User email ({user_email}): ") or user_email
user = User.find_by_email(user_email)
if user is None:
# Create new user
user_password = getpass.getpass(
prompt="Password (At least 6 alphanumeric): ")
confirm_password = getpass.getpass(prompt="Confirm password: ")
assert user_password == confirm_password, "Passwords don't match."
create_user(user_email, user_password, base_url=base_url)
else:
user_password = getpass.getpass(prompt="Password: ")
login_user(user_email, user_password, base_url=base_url)
elif args.action == "get_token":
user_email = input(f"User email ({user_email}): ") or user_email
user_password = getpass.getpass(prompt="Password: ")
print()
login_user(user_email, user_password, base_url=base_url)
elif args.action == "create_bot_account":
username, password = create_bot_account()
# TODO: Create user only if not already present
create_user(username, password, base_url=base_url)
else:
print(f"Action {args.action} not supported. Available actions:")
available_actions = ["create_user", "get_token", "create_bot_account"]
for action in available_actions:
print(f" - {action}")
if __name__ == "__main__":
main()