bigquery-datatransfer/snippets/conftest.py (98 lines of code) (raw):

# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import os import random import uuid from google.api_core import client_options import google.api_core.exceptions import google.auth from google.cloud import bigquery, bigquery_datatransfer, pubsub_v1 import pytest RESOURCE_PREFIX = "python_bigquery_datatransfer_samples_snippets" RESOURCE_DATE_FORMAT = "%Y%m%d%H%M%S" RESOURCE_DATE_LENGTH = 4 + 2 + 2 + 2 + 2 + 2 def resource_prefix() -> str: timestamp = datetime.datetime.utcnow().strftime(RESOURCE_DATE_FORMAT) random_string = hex(random.randrange(1000000))[2:] return f"{RESOURCE_PREFIX}_{timestamp}_{random_string}" def resource_name_to_date(resource_name: str): start_date = len(RESOURCE_PREFIX) + 1 date_string = resource_name[start_date : start_date + RESOURCE_DATE_LENGTH] parsed_date = datetime.datetime.strptime(date_string, RESOURCE_DATE_FORMAT) return parsed_date @pytest.fixture(scope="session", autouse=True) def cleanup_pubsub_topics(pubsub_client: pubsub_v1.PublisherClient, project_id): yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1) for topic in pubsub_client.list_topics(project=f"projects/{project_id}"): topic_id = topic.name.split("/")[-1] if ( topic_id.startswith(RESOURCE_PREFIX) and resource_name_to_date(topic_id) < yesterday ): pubsub_client.delete_topic(topic=topic.name) def temp_suffix(): now = datetime.datetime.now() return f"{now.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}" @pytest.fixture(scope="session") def bigquery_client(default_credentials): credentials, project_id = default_credentials return bigquery.Client(credentials=credentials, project=project_id) @pytest.fixture(scope="session") def pubsub_client(default_credentials): credentials, _ = default_credentials return pubsub_v1.PublisherClient(credentials=credentials) @pytest.fixture(scope="session") def pubsub_topic(pubsub_client: pubsub_v1.PublisherClient, project_id): topic_id = resource_prefix() topic_path = pubsub_v1.PublisherClient.topic_path(project_id, topic_id) pubsub_client.create_topic(name=topic_path) yield topic_path pubsub_client.delete_topic(topic=topic_path) @pytest.fixture(scope="session") def dataset_id(bigquery_client, project_id): dataset_id = f"bqdts_{temp_suffix()}" bigquery_client.create_dataset(f"{project_id}.{dataset_id}") yield dataset_id bigquery_client.delete_dataset(dataset_id, delete_contents=True) @pytest.fixture(scope="session") def default_credentials(): return google.auth.default(["https://www.googleapis.com/auth/cloud-platform"]) @pytest.fixture(scope="session") def project_id(): return os.environ["GOOGLE_CLOUD_PROJECT"] @pytest.fixture(scope="session") def service_account_name(default_credentials): credentials, _ = default_credentials # The service_account_email attribute is not available when running with # user account credentials, but should be available when running from our # continuous integration tests. return getattr(credentials, "service_account_email", None) @pytest.fixture(scope="session") def transfer_client(default_credentials, project_id): credentials, _ = default_credentials options = client_options.ClientOptions(quota_project_id=project_id) transfer_client = bigquery_datatransfer.DataTransferServiceClient( credentials=credentials, client_options=options ) # Ensure quota is always attributed to the correct project. bigquery_datatransfer.DataTransferServiceClient = lambda: transfer_client return transfer_client @pytest.fixture(scope="session") def transfer_config_name(transfer_client, project_id, dataset_id, service_account_name): from . import manage_transfer_configs, scheduled_query # Use the transfer_client fixture so we know quota is attributed to the # correct project. assert transfer_client is not None # To conserve limited BQ-DTS quota, this fixture creates only one transfer # config for a whole session and is used to test the scheduled_query.py and # the delete operation in manage_transfer_configs.py. transfer_config = scheduled_query.create_scheduled_query( { "project_id": project_id, "dataset_id": dataset_id, "service_account_name": service_account_name, } ) yield transfer_config.name manage_transfer_configs.delete_config( {"transfer_config_name": transfer_config.name} ) @pytest.fixture def to_delete_configs(transfer_client): to_delete = [] yield to_delete for config_name in to_delete: try: transfer_client.delete_transfer_config(name=config_name) except google.api_core.exceptions.GoogleAPICallError: pass