scripts/testing/cli.py (472 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # import json import os import re import subprocess # noqa S404 import tempfile import time from contextlib import contextmanager import click import yaml __all__ = ["main"] BASE_DIR = os.path.dirname(os.path.abspath(__file__)) VM_STARTUP_SCRIPT_PATH = f"startup-script={BASE_DIR}/startup_scipt.sh" VM_INIT_ATTEMPTS = 30 SLEEP_TIMEOUT = 6 DOCKER_COMPOSE_FILE = f"{BASE_DIR}/docker-compose.yml" PULL_CONNECTORS_SCRIPT = f"{BASE_DIR}/pull-connectors.sh" CLI_CONFIG_FILE = "cli-config.yml" CONNECTOR_SERVICE_CONFIG_FILE = "/var/app/config.yml" VAULT_SECRETS_PREFIX = "secret/ent-search-team/" ES_DEFAULT_HOST = "http://localhost:9200" ES_DEFAULT_USERNAME = "elastic" ES_DEFAULT_PASSWORD = "changeme" # noqa: S105 SOURCE_MACHINE_IMAGE = "elastic-connectors-testing-base-image" IMAGE_FAMILY = "ubuntu-2204-lts" # VMs metadata DIVISION = "engineering" ORG = "entsearch" TEAM = "ingestion" PROJECT = "connectors-testing" @click.group() @click.pass_context def cli(ctx): pass @click.command( name="run-test", help="Spin up a VM, Elasticsearch and connectors services and run the tests", ) @click.argument("name") @click.option( "--vm-type", default="e2-highcpu-2", help="Virtual machine type. See more in https://cloud.google.com/compute/docs/general-purpose-machines", ) @click.option( "--vm-zone", default="europe-west1-b", help="Virtual machine zone. See more in https://cloud.google.com/compute/docs/regions-zones", ) @click.option( "--connectors-ref", default="main", help="A commit hash or a branch name of connectors repository", ) @click.option( "--es-version", help="Elasticsearch version. If defined, the script will use a docker-compose file to start Elasticsearch", ) @click.option("--es-host", help="Elasticsearch host", default=ES_DEFAULT_HOST) @click.option( "--es-username", help="Elasticsearch username", default=ES_DEFAULT_USERNAME ) @click.option( "--es-password", help="Elasticsearch password", default=ES_DEFAULT_PASSWORD ) @click.option("--test-case", help="Test case file", type=click.Path(exists=True)) @click.option("--delete", is_flag=True, help="Deletes the VM once the tests passed") @click.pass_context def create_test_environment( ctx, name, vm_type, vm_zone, es_version, connectors_ref, es_host, es_username, es_password, test_case, delete, ): """ Creates a new VM and runs the tests """ # TODO: fail fast. Check all the configs/yaml files before creating a VM create_vm(name, vm_type, vm_zone) setup_stack(name, vm_zone, es_version, connectors_ref, es_host) run_scenarios(name, es_host, es_username, es_password, vm_zone, test_case) if delete is True: ctx.invoke(delete_test_environment, name=name, vm_zone=vm_zone) else: print_help(name, vm_zone) cli.add_command(create_test_environment) @click.command(name="delete", help="Deletes a VM") @click.argument("name") @click.option("--vm-zone", default="europe-west1-b") def delete_test_environment(name, vm_zone): """ Deletes the VM """ cmd = [ "gcloud", "compute", "instances", "delete", name, "--quiet", "--zone", vm_zone, ] click.echo("Deleting the VM") run_gcloud_cmd(cmd) click.echo("The VM has been deleted") cli.add_command(delete_test_environment) def print_help(name, vm_zone): """ Prints a list of commands that can be used to interact with the setup """ logs_cmd = " ".join( [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", '"tail -f ~/service.log"', ] ) connectors_cmd = " ".join( [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f'"/var/app/bin/connectors -c ~/{CLI_CONFIG_FILE} connector list"', ] ) sync_jobs_cmd = " ".join( [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f'"/var/app/bin/connectors -c ~/{CLI_CONFIG_FILE} job list CONNECTOR_ID"', ] ) click.echo("You can use the following commands to interact with the setup:") click.echo("Access logs: " + click.style(logs_cmd, fg="green")) click.echo("List of connectors: " + click.style(connectors_cmd, fg="green")) click.echo("List of sync jobs: " + click.style(sync_jobs_cmd, fg="green")) def create_vm(name, vm_type, vm_zone): """ Creates a new VM and waits until the startup script finishes its work """ with click.progressbar(label="Creating a new VM...", length=100) as steps: cmd = [ "gcloud", "compute", "instances", "create", name, "--async", "--source-machine-image", SOURCE_MACHINE_IMAGE, "--image-family", IMAGE_FAMILY, "--machine-type", vm_type, "--zone", vm_zone, f"--metadata-from-file={VM_STARTUP_SCRIPT_PATH}", ] result = run_gcloud_cmd(cmd) steps.update(25) attempts = 0 while True: if attempts >= VM_INIT_ATTEMPTS: click.echo("Timeout!") raise click.Abort() attempts = attempts + 1 # Tries to ssh to the VM and check if the startup script finished its work cmd = [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--ssh-flag=", "-q", "--command", "sudo ls /var/log/startup-is-finished ", ] result = subprocess.run( # noqa: S603 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) stdout = result.stdout.decode("utf-8") # indicates that the VM is booting if re.search("Connection refused", stdout) or re.search( "SSH connectivity issues", stdout ): steps.update(25) # indicates that the VM is ready but the startup script elif result.returncode == 1 or result.returncode == 2: pass elif result.returncode == 0: steps.update(25) break else: click.echo(stdout) raise click.Abort() time.sleep(SLEEP_TIMEOUT) # update the VM metadata cmd = [ "gcloud", "compute", "instances", "add-metadata", name, "--zone", vm_zone, "--metadata", f"division={DIVISION},org={ORG},team={TEAM},project={PROJECT}", ] run_gcloud_cmd(cmd) steps.update(25) def render_connector_configuration(file_path): """ Reads the connector configuration file and replaces all the values that start with `vault:` with the values from Vault """ configuration = {} with open(os.path.join(BASE_DIR, file_path), "r") as f: configuration = json.loads(f.read()) for key, item in configuration.items(): if type(item) is str and item.startswith("vault:"): configuration[key] = read_from_vault(item) return configuration def setup_stack(name, vm_zone, es_version, connectors_ref, es_host): with click.progressbar(label="Setting up the stack...", length=100) as steps: # Upload pull-connectors file cmd = [ "gcloud", "compute", "scp", PULL_CONNECTORS_SCRIPT, f"{name}:~/", "--zone", vm_zone, ] run_gcloud_cmd(cmd) steps.update(1) # pull connectors repo cmd = [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f"sudo ./{os.path.basename(PULL_CONNECTORS_SCRIPT)} {connectors_ref}", ] run_gcloud_cmd(cmd) steps.update(9) # check if it's a cloud/serverless deployment if es_host != ES_DEFAULT_HOST: steps.update(90) return # upload Elasticsearch docker compose file cmd = [ "gcloud", "compute", "scp", DOCKER_COMPOSE_FILE, f"{name}:~/", "--zone", vm_zone, ] run_gcloud_cmd(cmd) steps.update(1) # run docker compose file cmd = [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f"sudo ES_VERSION={es_version} docker compose -f ~/{os.path.basename(DOCKER_COMPOSE_FILE)} up -d", ] run_gcloud_cmd(cmd) steps.update(9) # wait when Elasticsearch starts for _i in range(1, 16): time.sleep(SLEEP_TIMEOUT) steps.update(4) steps.update(20) # TODO: Use the same config file for the CLI and the connector service @contextmanager def cli_config(es_host, es_username, es_password): """ Creates a temporary file with the CLI configuration and deletes it after the context is closed """ try: config = { "elasticsearch": { "host": es_host, "username": es_username, "password": es_password, } } file_name = None with tempfile.NamedTemporaryFile(delete=False, mode="w") as fp: file_name = fp.name fp.write(yaml.dump(config)) fp.close() yield file_name finally: if os.path.isfile(file_name): os.remove(file_name) @contextmanager def connector_service_config(es_host, es_username, es_password): """ Creates a temporary file with the connector service configuration and deletes it after the context is closed """ try: config = { "elasticsearch.host": es_host, "elasticsearch.username": es_username, "elasticsearch.password": es_password, } file_name = None with tempfile.NamedTemporaryFile(delete=False, mode="w") as fp: file_name = fp.name fp.write(yaml.dump(config)) fp.close() yield file_name finally: if os.path.isfile(file_name): os.remove(file_name) def run_scenarios(name, es_host, es_username, es_password, vm_zone, test_case): """ Runs the scenarios from the test case file """ scenarios = yaml.safe_load(open(test_case)) with cli_config(es_host, es_username, es_password) as cli_config_file: cmd = [ "gcloud", "compute", "scp", cli_config_file, f"{name}:~/{CLI_CONFIG_FILE}", "--zone", vm_zone, ] run_gcloud_cmd(cmd) cmd = [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f"sudo /var/app/bin/connectors -c {CLI_CONFIG_FILE} connector list", ] run_gcloud_cmd(cmd) with connector_service_config( es_host, es_username, es_password ) as service_config_file: cmd = [ "gcloud", "compute", "scp", service_config_file, f"{name}:{CONNECTOR_SERVICE_CONFIG_FILE}", "--zone", vm_zone, ] run_gcloud_cmd(cmd) for scenario in scenarios["scenarios"]: try: file_name = None rendered_connector_configuration = render_connector_configuration( scenario["connector_configuration"] ) with tempfile.NamedTemporaryFile(delete=False, mode="w") as fp: file_name = fp.name fp.write(json.dumps(rendered_connector_configuration)) fp.close() cmd = [ "gcloud", "compute", "scp", fp.name, f"{name}:~/{os.path.basename(scenario['connector_configuration'])}", "--zone", vm_zone, ] run_gcloud_cmd(cmd) finally: if file_name is not None and os.path.isfile(file_name): os.remove(file_name) # prepare connector native = "--native" if scenario["native"] else "" connector_command = [ "connector", "create", "--name", scenario["connector_name"], "--index-name", scenario["index_name"], "--service-type", scenario["service_type"], "--index-language", scenario["index_language"], native, "--from-file", os.path.basename(scenario["connector_configuration"]), "--update-config", "--connector-service-config", CONNECTOR_SERVICE_CONFIG_FILE, ] cmd = [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f"sudo /var/app/bin/connectors -c {CLI_CONFIG_FILE} {' '.join(connector_command)}", ] result = run_gcloud_cmd(cmd) connector_id = re.search("ID:\s([\w\-_]+)", result.stdout.decode("utf-8"))[1] # Start the service cmd = [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f"sudo /var/app/bin/elastic-ingest -c {CONNECTOR_SERVICE_CONFIG_FILE} --debug --filebeat >~/service.log 2>&1 &", ] run_gcloud_cmd(cmd) for test in scenario["tests"]: click.echo(f"Run test: {test['name']}") cmd = [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f"sudo /var/app/bin/connectors -c {CLI_CONFIG_FILE} job start -i {connector_id} -t {test['job_type']} --format json", ] result = run_gcloud_cmd(cmd) job_id = json.loads(result.stdout.decode("utf-8"))["id"] timeout = 0 timeout_step = test["timeout"] / 5 while True: cmd = [ "gcloud", "compute", "ssh", name, "--zone", vm_zone, "--command", f"sudo /var/app/bin/connectors -c {CLI_CONFIG_FILE} job view {job_id} --format json", ] result = run_gcloud_cmd(cmd) job = json.loads(result.stdout.decode("utf-8")) if job["job_status"] != test["match"]["status"]: time.sleep(timeout_step) timeout += timeout_step else: click.echo(click.style(f"Test {test['name']} passed", fg="green")) break if timeout >= test["timeout"]: click.echo(click.style(f"Test {test['name']} failed", fg="red")) break # TODO Read all the fields in one call def read_from_vault(key): """ Reads a secret from Vault and returns its value """ _, secret_prefix, field = key.split(":") cmd = ["vault", "read", "-field", field, f"{VAULT_SECRETS_PREFIX}/{secret_prefix}"] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # noqa: S603 if result.returncode != 0: click.echo(result.stdout, err=True) raise click.Abort(result.stdout) return result.stdout.decode("utf-8") def run_gcloud_cmd(cmd): """ Runs a gcloud command and raises an exception if the command failed """ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # noqa: S603 if result.returncode != 0: click.echo(result.stdout, err=True) raise click.Abort(result.stdout) return result def main(args=None): cli() if __name__ == "__main__": main()