privateca/snippets/conftest.py (119 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from multiprocessing import Pool
import os
import random
import uuid
import backoff
from google.api_core.exceptions import FailedPrecondition
from google.api_core.exceptions import ServiceUnavailable
import google.auth
from google.cloud.security import privateca_v1
import pytest
from create_ca_pool import create_ca_pool
from create_certificate_authority import create_certificate_authority
from create_certificate_template import create_certificate_template
from delete_certificate_authority import delete_certificate_authority
from delete_certificate_template import delete_certificate_template
from disable_certificate_authority import disable_certificate_authority
from enable_certificate_authority import enable_certificate_authority
PROJECT = google.auth.default()[1]
LOCATIONS = (
"us-central1",
"europe-north1",
"europe-central2",
"europe-west2",
"us-east4",
"europe-west1",
)
LOCATION = random.choice(LOCATIONS)
COMMON_NAME = "COMMON_NAME"
ORGANIZATION = "ORGANIZATION"
CA_DURATION = 1000000
def delete_cas_from_pool(ca_pool_name: str) -> None:
client = privateca_v1.CertificateAuthorityServiceClient()
for ca in client.list_certificate_authorities(parent=ca_pool_name):
# Check if the CA is enabled.
if ca.state == privateca_v1.CertificateAuthority.State.ENABLED:
request = privateca_v1.DisableCertificateAuthorityRequest(name=ca.name)
client.disable_certificate_authority(request=request)
# Delete CA.
ca_state = client.get_certificate_authority(name=ca.name).state
try:
if ca_state != privateca_v1.CertificateAuthority.State.DELETED:
delete_ca_request = privateca_v1.DeleteCertificateAuthorityRequest()
delete_ca_request.name = ca.name
delete_ca_request.ignore_active_certificates = True
delete_ca_request.skip_grace_period = True
client.delete_certificate_authority(request=delete_ca_request).result(
timeout=300
)
print(f" * {ca.name} Deleted!")
except FailedPrecondition as err:
print(err)
continue
def delete_one_pool(ca_pool_name: str) -> None:
client = privateca_v1.CertificateAuthorityServiceClient()
try:
delete_ca_pool_request = privateca_v1.DeleteCaPoolRequest()
delete_ca_pool_request.name = ca_pool_name
print(f"Deleting {ca_pool_name}")
client.delete_ca_pool(request=delete_ca_pool_request).result(timeout=300)
except FailedPrecondition:
print(f"Precondition failed for {ca_pool_name} :(")
@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=3)
def delete_stale_resources() -> None:
client = privateca_v1.CertificateAuthorityServiceClient()
pool_names = []
for location in LOCATIONS:
location_path = client.common_location_path(PROJECT, location)
request = privateca_v1.ListCaPoolsRequest(parent=location_path)
for ca_pool in client.list_ca_pools(request=request):
pool_names.append(ca_pool.name)
with Pool(max(2, os.cpu_count() - 2)) as p:
print(f"Going to clean up CAs from {len(pool_names)} pools.")
p.map(delete_cas_from_pool, pool_names)
with Pool(max(2, os.cpu_count() - 2)) as p:
print(f"Going to delete {len(pool_names)} pools.")
p.map(delete_one_pool, pool_names)
def generate_name() -> str:
return "test-" + uuid.uuid4().hex[:10]
@pytest.fixture
def ca_pool(ca_pool_autodelete_name):
create_ca_pool(PROJECT, LOCATION, ca_pool_autodelete_name)
yield ca_pool_autodelete_name
@pytest.fixture
def certificate_authority(ca_pool):
CA_NAME = generate_name()
create_certificate_authority(
PROJECT, LOCATION, ca_pool, CA_NAME, COMMON_NAME, ORGANIZATION, CA_DURATION
)
yield ca_pool, CA_NAME
# CA Pool cleanup will remove the certificate.
# delete_certificate_authority(PROJECT, LOCATION, ca_pool, CA_NAME)
@pytest.fixture
def deleted_certificate_authority(ca_pool):
CA_NAME = generate_name()
create_certificate_authority(
PROJECT, LOCATION, ca_pool, CA_NAME, COMMON_NAME, ORGANIZATION, CA_DURATION
)
enable_certificate_authority(PROJECT, LOCATION, ca_pool, CA_NAME)
disable_certificate_authority(PROJECT, LOCATION, ca_pool, CA_NAME)
delete_certificate_authority(PROJECT, LOCATION, ca_pool, CA_NAME)
yield ca_pool, CA_NAME
@pytest.fixture
def certificate_template():
TEMPLATE_NAME = generate_name()
create_certificate_template(PROJECT, LOCATION, TEMPLATE_NAME)
yield TEMPLATE_NAME
delete_certificate_template(PROJECT, LOCATION, TEMPLATE_NAME)
@pytest.fixture
def ca_pool_autodelete_name():
name = generate_name()
yield name
ca_client = privateca_v1.CertificateAuthorityServiceClient()
ca_pool_path = ca_client.ca_pool_path(PROJECT, LOCATION, name)
delete_cas_from_pool(ca_pool_path)
delete_one_pool(ca_pool_path)
@pytest.fixture
def ca_pool_autodelete_name2():
name = generate_name()
yield name
ca_client = privateca_v1.CertificateAuthorityServiceClient()
ca_pool_path = ca_client.ca_pool_path(PROJECT, LOCATION, name)
delete_cas_from_pool(ca_pool_path)
delete_one_pool(ca_pool_path)