in generate/util.py [0:0]
def ssm_repository(repo_name, ssm_url):
"""Lists repositories in a Secure Source Manager instance using the requests library."""
instance_id, project_number, location = ssm_url_extract(ssm_url)
access_token = get_access_token()
repo_uri = f"projects/{project_number}/locations/{location}/repositories"
base_url = (
f"https://{instance_id}-{project_number}-api.{location}.sourcemanager.dev/v1/"
)
repo_id = f"{repo_uri}/{repo_name}"
url = f"{base_url}/{repo_uri}"
headers = {"Authorization": f"Bearer {access_token}"}
# list repositories
params = {}
next_page = None
while True:
if next_page:
params = {"page_token": next_page}
response = requests.get(url, params=params, headers=headers, timeout=120)
rep = response.json()
if not response.ok:
print("list repositories failed", rep)
response.raise_for_status()
rep = response.json()
for repo in rep.get("repositories", []):
if repo.get("name") == repo_id:
print(f"git repo present, skipping creation {repo['uris']['gitHttps']}")
return repo["uris"]["gitHttps"]
next_page = rep.get("nextPageToken")
if not next_page:
break
# create repository
response = requests.post(
url, headers=headers, params={"repository_id": repo_name}, data={}, timeout=120
)
rep = response.json()
if not response.ok:
print("create repository failed", rep)
response.raise_for_status()
created_repo = ""
try:
created_repo = rep["response"]["uris"]["gitHttps"]
except KeyError:
print("create repository failed", rep)
print(f"ssm git repo created{created_repo}")
return created_repo