it_serverless/conftest.py (165 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import json
import os
import subprocess
import time
import urllib.parse
from dataclasses import dataclass
import pytest
import requests
from elasticsearch import Elasticsearch
BASE_URL = os.environ["RALLY_IT_SERVERLESS_BASE_URL"]
API_KEY = os.environ["RALLY_IT_SERVERLESS_API_KEY"]
GET_CREDENTIALS_ENDPOINT = os.environ["RALLY_IT_SERVERLESS_GET_CREDENTIALS_ENDPOINT"]
PIPELINE_NAME = os.environ.get("BUILDKITE_PIPELINE_SLUG")
BUILD_NUMBER = os.environ.get("BUILDKITE_BUILD_NUMBER")
@dataclass
class ServerlessProjectConfig:
target_host: str
username: str
password: str
api_key: str
user_client_options_file: str = None
operator_client_options_file: str = None
def get_client_options_file(self, operator) -> str:
return self.operator_client_options_file if operator else self.user_client_options_file
@staticmethod
def _client_options(client_auth):
return {
"default": {
"verify_certs": False,
"use_ssl": True,
"timeout": 240,
**client_auth,
}
}
def prepare_client_options_files(self, tmpdir_factory):
tmp_path = tmpdir_factory.mktemp("client-options")
client_auth = {
"basic_auth_user": self.username,
"basic_auth_password": self.password,
}
self.operator_client_options_file = tmp_path / "operator.json"
with self.operator_client_options_file.open("w") as f:
json.dump(self._client_options(client_auth), fp=f)
client_auth = {"api_key": self.api_key}
self.user_client_options_file = tmp_path / "user.json"
with self.user_client_options_file.open("w") as f:
json.dump(self._client_options(client_auth), fp=f)
def serverless_api(method, endpoint, json=None):
resp = requests.request(
method,
BASE_URL + endpoint,
headers={
"Authorization": f"ApiKey {API_KEY}",
"Content-Type": "application/json",
},
json=json,
timeout=60,
)
resp.raise_for_status()
return resp.json()
@pytest.fixture(scope="module")
def project():
print("\nCreating project")
project_name = "rally-track-it"
if PIPELINE_NAME is not None and BUILD_NUMBER is not None:
project_name = f"{PIPELINE_NAME}-{BUILD_NUMBER}"
created_project = serverless_api(
"POST",
"/api/v1/serverless/projects/elasticsearch",
json={
"name": project_name,
"region_id": "aws-eu-west-1",
},
)
yield created_project
print("Deleting project")
serverless_api("DELETE", f"/api/v1/serverless/projects/elasticsearch/{created_project['id']}")
@pytest.fixture(scope="module")
def project_config(project, tmpdir_factory):
credentials = serverless_api(
"POST",
f"/api/v1/serverless/projects/elasticsearch/{project['id']}{GET_CREDENTIALS_ENDPOINT}",
)
es_endpoint = project["endpoints"]["elasticsearch"]
es_hostname = urllib.parse.urlparse(es_endpoint).hostname
rally_target_host = f"{es_hostname}:443"
print("Waiting for DNS propagation")
for _ in range(6):
time.sleep(30)
with contextlib.suppress(subprocess.CalledProcessError):
subprocess.run(["nslookup", es_hostname, "8.8.8.8"], check=True)
break
else:
raise ValueError("Timed out waiting for DNS propagation")
print("Waiting for Elasticsearch")
for _ in range(18):
try:
es = Elasticsearch(
f"https://{rally_target_host}",
basic_auth=(
credentials["username"],
credentials["password"],
),
request_timeout=10,
)
info = es.info()
print("GET /")
print(json.dumps(info.body, indent=2))
authenticate = es.perform_request(method="GET", path="/_security/_authenticate")
print("GET /_security/_authenticate")
print(json.dumps(authenticate.body, indent=2))
break
except Exception as e:
print(f"GET / Failed with {str(e)}")
time.sleep(10)
else:
raise ValueError("Timed out waiting for Elasticsearch")
# Create API key to test Rally with a public user privileges
print("Waiting for API key")
for _ in range(18):
try:
api_key = es.security.create_api_key(name="public-api-key")
break
except Exception as e:
print(f"API create failed with {str(e)}")
time.sleep(10)
else:
raise ValueError("Timed out waiting for API key")
# Confirm API key is working fine
print("Testing API key")
for _ in range(18):
try:
es = Elasticsearch(
f"https://{rally_target_host}",
api_key=api_key.body["encoded"],
request_timeout=10,
)
info = es.info()
break
except Exception as e:
print(f"API verification failed with {str(e)}")
time.sleep(10)
else:
raise ValueError("Timed out verifying API key")
project_config = ServerlessProjectConfig(
rally_target_host,
credentials["username"],
credentials["password"],
api_key.body["encoded"],
)
project_config.prepare_client_options_files(tmpdir_factory)
yield project_config
def pytest_addoption(parser):
parser.addoption("--operator", action="store_true", help="run as operator")
def pytest_generate_tests(metafunc):
if "operator" in metafunc.fixturenames:
operator = metafunc.config.getoption("operator")
label = "operator" if operator else "user"
metafunc.parametrize("operator", [operator], ids=[label])
def pytest_configure(config):
config.addinivalue_line("markers", "operator_only: mark test for operator only")
def pytest_collection_modifyitems(config, items):
skip = pytest.mark.skip()
for item in items:
if not config.getoption("operator") and "operator_only" in item.keywords:
item.add_marker(skip)