tools/azure-rest-api-specs-examples-automation/automation/main.py (325 lines of code) (raw):

import os import shutil from os import path import sys import subprocess import tempfile import time from datetime import timedelta, timezone import json import argparse import logging import itertools from models import * from github import GitHubRepository from csv_database import CsvDatabase github_token: str root_path: str = "." csv_database: CsvDatabase start_time_secs: float timeout_secs: float = 45 * 60 * 60 # 45 minutes clean_tmp_dir: bool = True tmp_folder: str = "tmp" tmp_spec_folder: str = "spec" tmp_example_folder: str = "example" tmp_sdk_folder: str = "sdk" def load_configuration(command_line: CommandLineConfiguration) -> Configuration: with open(path.join(root_path, "automation/configuration.json"), "r", encoding="utf-8") as f_in: config = json.load(f_in) now = datetime.now(timezone.utc) operation_configuration = OperationConfiguration( config["sdkExample"]["repository"], command_line.build_id, command_line.skip_processed, command_line.persist_data, now - timedelta(days=command_line.release_in_days), now, ) sdk_configurations = [] for sdk_config in config["sdkConfigurations"]: script = Script(sdk_config["script"]["run"]) release_tag = ReleaseTagConfiguration( sdk_config["releaseTag"]["regexMatch"], sdk_config["releaseTag"]["packageRegexGroup"], sdk_config["releaseTag"]["versionRegexGroup"], ) ignored_packages = sdk_config["ignoredPackages"] if "ignoredPackages" in sdk_config else [] sdk_configuration = SdkConfiguration( sdk_config["name"], sdk_config["language"], sdk_config["repository"], release_tag, script, ignored_packages ) sdk_configurations.append(sdk_configuration) return Configuration(operation_configuration, sdk_configurations) def merge_pull_requests(operation: OperationConfiguration): logging.info("Merge pull requests") repo = GitHubRepository(operation.repository_owner, operation.repository_name, github_token) pull_requests = repo.list_pull_requests() for pull_request in pull_requests: title = pull_request["title"] if title.startswith("[Automation]"): if "labels" in pull_request and any(label["name"] == "auto-merge" for label in pull_request["labels"]): repo.merge_pull_request(pull_request) # wait a few seconds to avoid 409 time.sleep(5) def process_release(operation: OperationConfiguration, sdk: SdkConfiguration, release: Release, report: Report): # process per release logging.info(f"Processing release: {release.tag}") tmp_root_path = path.join(root_path, tmp_folder) os.makedirs(tmp_root_path, exist_ok=True) tmp_path = tempfile.mkdtemp(prefix="tmp", dir=tmp_root_path) logging.info(f"Work directory: {tmp_path}") try: example_repo_path = path.join(tmp_path, tmp_example_folder) sdk_repo_path = path.join(tmp_path, tmp_sdk_folder) spec_repo_path = path.join(tmp_root_path, tmp_spec_folder) # checkout azure-rest-api-specs-examples repo cmd = ["git", "clone", "--quiet", "--depth", "1", operation.sdk_examples_repository, example_repo_path] logging.info(f"Checking out repository: {operation.sdk_examples_repository}") logging.info("Command line: " + " ".join(cmd)) subprocess.check_call(cmd, cwd=tmp_path) # checkout sdk repo cmd = [ "git", "clone", "-c", "advice.detachedHead=false", "--quiet", "--depth", "1", "--branch", release.tag, sdk.repository, sdk_repo_path, ] logging.info(f"Checking out repository: {sdk.repository}") logging.info("Command line: " + " ".join(cmd)) subprocess.check_call(cmd, cwd=tmp_path) # prepare input.json input_json_path = path.join(tmp_path, "input.json") output_json_path = path.join(tmp_path, "output.json") with open(input_json_path, "w", encoding="utf-8") as f_out: input_json = { "specsPath": spec_repo_path, "sdkExamplesPath": example_repo_path, "sdkPath": sdk_repo_path, "tempPath": tmp_path, "release": {"tag": release.tag, "package": release.package, "version": release.version}, } logging.info(f"Input JSON for worker: {input_json}") json.dump(input_json, f_out, indent=2) # run script logging.info(f"Running worker: {sdk.script.run}") start = time.perf_counter() subprocess.check_call([sdk.script.run, input_json_path, output_json_path], cwd=root_path) end = time.perf_counter() logging.info(f"Worker ran: {str(timedelta(seconds=end-start))}") # parse output.json release_name = release.tag succeeded = True files = [] if path.isfile(output_json_path): with open(output_json_path, "r", encoding="utf-8") as f_in: output = json.load(f_in) logging.info(f"Output JSON from worker: {output}") release_name = output["name"] succeeded = "succeeded" == output["status"] files = output["files"] if not succeeded: report.statuses[release.tag] = "failed at worker" report.aggregated_error.errors.append(RuntimeError(f"Worker failed for release tag: {release.tag}")) return # commit and create pull request # check for new examples cmd = ["git", "status", "--porcelain"] logging.info("Command line: " + " ".join(cmd)) output = subprocess.check_output(cmd, cwd=example_repo_path) if len(output) == 0: logging.info(f"No change to repository: {example_repo_path}") report.statuses[release.tag] = "succeeded, no change" else: output_str = str(output, "utf-8") logging.info(f"git status:\n{output_str}") # git add cmd = ["git", "add", "--all"] logging.info("Command line: " + " ".join(cmd)) subprocess.check_call(cmd, cwd=example_repo_path) # find added/modified files cmd = ["git", "status", "--porcelain"] logging.info("Command line: " + " ".join(cmd)) output = subprocess.check_output(cmd, cwd=example_repo_path) output_str = str(output, "utf-8") changed_files = [file.strip()[3:] for file in output_str.splitlines()] # git checkout new branch branch = f"automation-examples_{sdk.name}_{release.tag}_{operation.build_id}" cmd = ["git", "checkout", "-b", branch] logging.info("Command line: " + " ".join(cmd)) subprocess.check_call(cmd, cwd=example_repo_path) # git commit title = f"[Automation] Collect examples from {sdk.name}#{release.tag}" logging.info(f"git commit: {title}") cmd = ["git", "-c", "user.name=azure-sdk", "-c", "user.email=azuresdk@microsoft.com", "commit", "-m", title] logging.info("Command line: " + " ".join(cmd)) subprocess.check_call(cmd, cwd=example_repo_path) # git push remote_uri = "https://" + github_token + "@" + operation.sdk_examples_repository[len("https://") :] cmd = ["git", "push", remote_uri, branch] # do not print this as it contains token # logging.info('Command line: ' + ' '.join(cmd)) subprocess.check_call(cmd, cwd=example_repo_path) try: # create github pull request head = f"{operation.repository_owner}:{branch}" repo = GitHubRepository(operation.repository_owner, operation.repository_name, github_token) pull_number = repo.create_pull_request(title, head, "main") repo.add_label(pull_number, ["auto-merge"]) except Exception as e: logging.error(f"Error: {e}") report.statuses[release.tag] = "failed to create pull request" report.aggregated_error.errors.append(e) return try: if operation.persist_data: # commit changes to database commit_database(release_name, sdk.language, release, files) except Exception as e: logging.error(f"Error: {e}") report.statuses[release.tag] = "failed to update database" report.aggregated_error.errors.append(e) return report.statuses[release.tag] = f"succeeded, {len(changed_files)} files changed, pull number {pull_number}" except subprocess.CalledProcessError as e: logging.error(f"Call error: {e}") report.statuses[release.tag] = "failed to invoke git" report.aggregated_error.errors.append(e) finally: if clean_tmp_dir: shutil.rmtree(tmp_path, ignore_errors=True) def query_releases_in_database(language: str) -> List[Release]: # query local database on processed releases return csv_database.query_releases(language) def commit_database(release_name: str, language: str, release: Release, changed_files: List[str]): # write to local database and commit to repository # exclude metadata JSON changed_files = [file for file in changed_files if not file.endswith(".json")] if changed_files: database_succeeded = csv_database.new_release( release_name, language, release.tag, release.package, release.version, release.date, changed_files ) if database_succeeded: csv_database.dump() csv_database.commit(release_name) def process_sdk(operation: OperationConfiguration, sdk: SdkConfiguration, report: Report): # process for sdk if time.time() > start_time_secs + timeout_secs: logging.warning(f"Timeout, skip sdk: {sdk.name}") return logging.info(f"Processing sdk: {sdk.name}") count = 0 max_count = 5000 releases: List[Release] = [] repo = GitHubRepository(sdk.repository_owner, sdk.repository_name, github_token) # since there is no ordering from GitHub, just get all releases (exclude draft=True), and hope paging is correct for page in itertools.count(start=1): try: releases_response_json = repo.list_releases(100, page) if len(releases_response_json) == 0: # no more result, we are done break count += len(releases_response_json) for release in releases_response_json: if not release["draft"]: published_at = datetime.fromisoformat(release["published_at"].replace("Z", "+00:00")) if operation.date_start < published_at < operation.date_end: release_tag = release["tag_name"] if re.match(sdk.release_tag.regex_match, release_tag): package = re.match(sdk.release_tag.package_regex_group, release_tag).group(1) version = re.match(sdk.release_tag.version_regex_group, release_tag).group(1) release = Release(release_tag, package, version, published_at) releases.append(release) logging.info(f"Found release tag: {release.tag}") if count > max_count: # typically we only need releases from recent 10 days, abort before hit GitHub rate limit break except Exception as e: report.aggregated_error.errors.append(e) break logging.info(f"Count of all releases: {count}") releases.sort(key=lambda r: r.date, reverse=True) for release in releases: logging.info(f"Candidate release tag: {release.tag}, on {release.date.date()}") processed_release_tags = set() if operation.skip_processed: processed_releases = query_releases_in_database(sdk.language) processed_release_tags.update([r.tag for r in processed_releases]) processed_release_packages = set() for release in releases: if time.time() > start_time_secs + timeout_secs: logging.warning("Timeout, skip remaining packages") break if release.tag in processed_release_tags: logging.info(f"Skip processed tag: {release.tag}") processed_release_packages.add(release.package) elif release.package in processed_release_packages: logging.info(f"Skip processed package: {release.tag}") elif release.package in sdk.ignored_packages: logging.info(f"Skip ignored package: {release.tag}") else: process_release(operation, sdk, release, report) processed_release_packages.add(release.package) def process(command_line: CommandLineConfiguration, report: Report): configuration = load_configuration(command_line) if command_line.merge_pr: merge_pull_requests(configuration.operation) # checkout azure-rest-api-specs repo tmp_root_path = path.join(root_path, tmp_folder) os.makedirs(tmp_root_path, exist_ok=True) spec_repo_path = path.join(tmp_root_path, tmp_spec_folder) spec_repo = "https://github.com/Azure/azure-rest-api-specs" cmd = ["git", "clone", "--quiet", "--depth", "1", spec_repo, spec_repo_path] logging.info(f"Checking out repository: {spec_repo}") logging.info("Command line: " + " ".join(cmd)) subprocess.check_call(cmd, cwd=tmp_root_path) # checkout and load database global csv_database csv_database = CsvDatabase(tmp_root_path) csv_database.checkout() csv_database.load() for sdk_configuration in configuration.sdks: if not command_line.language or command_line.language == sdk_configuration.language: process_sdk(configuration.operation, sdk_configuration, report) if command_line.persist_data: csv_database.push(github_token) def main(): global root_path global github_token global start_time_secs logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %X") start_time_secs = time.time() script_path = path.abspath(path.dirname(sys.argv[0])) root_path = path.abspath(path.join(script_path, "..")) parser = argparse.ArgumentParser(description="") parser.add_argument("--build-id", type=str, required=True, help="Build ID") parser.add_argument("--github-token", type=str, required=True, help="GitHub token") parser.add_argument( "--release-in-days", type=int, required=False, default=3, help="Process SDK released within given days" ) parser.add_argument( "--language", type=str, required=False, help='Process SDK for specific language. Currently supports "java" and "go".', ) parser.add_argument( "--persist-data", type=str, required=False, default="false", help="Persist data about release and files to database", ) parser.add_argument( "--skip-processed", type=str, required=False, default="false", help="Skip SDK releases that already been processed", ) parser.add_argument( "--merge-pull-request", type=str, required=False, default="false", help="Merge GitHub pull request before new processing", ) args = parser.parse_args() github_token = args.github_token command_line_configuration = CommandLineConfiguration( args.build_id, args.release_in_days, args.language, args.persist_data.lower() == "true", args.skip_processed.lower() == "true", args.merge_pull_request.lower() == "true", ) report = Report({}, AggregatedError([])) process(command_line_configuration, report) if report.statuses: statuses_str = "Statuses:" for tag, status in report.statuses.items(): statuses_str += f"\n{tag}: {status}" logging.info(statuses_str) if report.aggregated_error.errors: raise RuntimeError(report.aggregated_error.errors) if __name__ == "__main__": main()