scripts/metric_reporter/gcs_client.py (47 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """GCSClient and related objects""" from google.cloud.storage import Client from pydantic import BaseModel from scripts.metric_reporter.error import BaseError class GCSClientError(BaseError): """Custom error raised for errors encountered by the GCSClient.""" pass class GCSArtifacts(BaseModel): """Represents grouped GCS file names for coverage and JUnit artifacts.""" coverage_artifact_files: list[str] = [] junit_artifact_files: list[str] = [] class GCSClient: """Client to interact with GCS.""" def __init__( self, gcs_client: Client, test_result_bucket: str, coverage_artifact_dir: str, junit_artifact_dir: str, ) -> None: """Initialize the GCSClient. Args: gcs_client (Client): Client to interact with GCS. test_result_bucket (str): Name of the test result bucket. coverage_artifact_dir (str): Name of the coverage artifact subdirectory. junit_artifact_dir (str): Name of the JUnit artifact subdirectory. """ self._client: Client = gcs_client self._bucket = self._client.bucket(test_result_bucket) self.coverage_artifact_dir: str = coverage_artifact_dir self.junit_artifact_dir: str = junit_artifact_dir def get_artifacts(self) -> list[GCSArtifacts]: """Get artifact files from GCS, grouped by repository. Returns: List[GCSArtifactPaths]: A list of grouped GCS paths per repository. Raises: GCSClientError: If the structure of the test_result_bucket is unexpected """ # Here we assume that the structure of the test_result_bucket is as follows: # test_result_bucket/ # ├── repository/ # ├── coverage_artifact_dir/ # ├── coverage-1.json # ├── coverage-2.json # ├── junit_artifact_dir/ # ├── junit-1.xml # ├── junit-2.xml artifact_map = {} # Using a regular dictionary for blob in self._client.list_blobs(self._bucket): parts = blob.name.rstrip("/").split("/") if len(parts) != 3: continue repository, artifact_dir, artifact_file = parts if repository not in artifact_map: artifact_map[repository] = GCSArtifacts() if artifact_dir == self.coverage_artifact_dir: artifact_map[repository].coverage_artifact_files.append(artifact_file) elif artifact_dir == self.junit_artifact_dir: artifact_map[repository].junit_artifact_files.append(artifact_file) else: raise GCSClientError( f"The artifact directory name {artifact_dir} from {blob.name} is unsupported." ) return list(artifact_map.values()) def get_coverage_artifact_content(self, repository: str, file_name: str) -> str: """Get the content of a coverage artifact. Args: repository (str): The repository name. file_name (str): The file name. Returns: str: Coverage JSON file content. """ return self._get_artifact_content(repository, self.coverage_artifact_dir, file_name) def get_junit_artifact_content(self, repository: str, file_name: str) -> str: """Get the content of a JUnit artifact. Args: repository (str): The repository name. file_name (str): The file name. Returns: str: JUnit XML file content. """ return self._get_artifact_content(repository, self.junit_artifact_dir, file_name) def _get_artifact_content(self, repository: str, artifact_dir: str, file_name: str) -> str: blob_name = f"{repository}/{artifact_dir}/{file_name}" blob = self._bucket.blob(blob_name) content: str = blob.download_as_text() return content