workflows/cloud-client/conftest.py (61 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import uuid
from google.cloud import workflows_v1
import pytest
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
LOCATION = "us-central1"
WORKFLOW_ID_BASE = "myFirstWorkflow"
def workflow_exists(client: workflows_v1.WorkflowsClient, workflow_id: str) -> bool:
"""Returns True if the workflow exists in this project."""
try:
workflow_name = client.workflow_path(
PROJECT_ID, LOCATION, workflow_id
)
client.get_workflow(request={"name": workflow_name})
return True
except Exception as e:
print(f"Workflow doesn't exist: {e}")
return False
@pytest.fixture(scope="module")
def client() -> str:
assert PROJECT_ID, "'GOOGLE_CLOUD_PROJECT' environment variable not set."
workflows_client = workflows_v1.WorkflowsClient()
return workflows_client
@pytest.fixture(scope="module")
def project_id() -> str:
return PROJECT_ID
@pytest.fixture(scope="module")
def location() -> str:
return LOCATION
@pytest.fixture(scope="function")
def workflow_id(client: workflows_v1.WorkflowsClient) -> str:
workflow_id_str = f"{WORKFLOW_ID_BASE}_{uuid.uuid4()}"
creating_workflow = False
backoff_delay = 1 # Start wait with delay of 1 second.
# Create the workflow if it doesn't exist.
while not workflow_exists(client, workflow_id_str):
if not creating_workflow:
# Create the workflow.
workflow_file = open("myFirstWorkflow.workflows.yaml").read()
parent = client.common_location_path(PROJECT_ID, LOCATION)
client.create_workflow(
request={
"parent": parent,
"workflow_id": workflow_id_str,
"workflow": {
"name": workflow_id_str,
"source_contents": workflow_file
},
}
)
creating_workflow = True
# Wait until the workflow is created.
print("- Waiting for the Workflow to be created...")
time.sleep(backoff_delay)
# Double the delay to provide exponential backoff.
backoff_delay *= 2
yield workflow_id_str
# Delete the workflow.
workflow_full_name = client.workflow_path(
PROJECT_ID, LOCATION, workflow_id_str
)
client.delete_workflow(
request={
"name": workflow_full_name,
}
)