in scripts/testing/cli.py [0:0]
def run_scenarios(name, es_host, es_username, es_password, vm_zone, test_case):
"""
Runs the scenarios from the test case file
"""
scenarios = yaml.safe_load(open(test_case))
with cli_config(es_host, es_username, es_password) as cli_config_file:
cmd = [
"gcloud",
"compute",
"scp",
cli_config_file,
f"{name}:~/{CLI_CONFIG_FILE}",
"--zone",
vm_zone,
]
run_gcloud_cmd(cmd)
cmd = [
"gcloud",
"compute",
"ssh",
name,
"--zone",
vm_zone,
"--command",
f"sudo /var/app/bin/connectors -c {CLI_CONFIG_FILE} connector list",
]
run_gcloud_cmd(cmd)
with connector_service_config(
es_host, es_username, es_password
) as service_config_file:
cmd = [
"gcloud",
"compute",
"scp",
service_config_file,
f"{name}:{CONNECTOR_SERVICE_CONFIG_FILE}",
"--zone",
vm_zone,
]
run_gcloud_cmd(cmd)
for scenario in scenarios["scenarios"]:
try:
file_name = None
rendered_connector_configuration = render_connector_configuration(
scenario["connector_configuration"]
)
with tempfile.NamedTemporaryFile(delete=False, mode="w") as fp:
file_name = fp.name
fp.write(json.dumps(rendered_connector_configuration))
fp.close()
cmd = [
"gcloud",
"compute",
"scp",
fp.name,
f"{name}:~/{os.path.basename(scenario['connector_configuration'])}",
"--zone",
vm_zone,
]
run_gcloud_cmd(cmd)
finally:
if file_name is not None and os.path.isfile(file_name):
os.remove(file_name)
# prepare connector
native = "--native" if scenario["native"] else ""
connector_command = [
"connector",
"create",
"--name",
scenario["connector_name"],
"--index-name",
scenario["index_name"],
"--service-type",
scenario["service_type"],
"--index-language",
scenario["index_language"],
native,
"--from-file",
os.path.basename(scenario["connector_configuration"]),
"--update-config",
"--connector-service-config",
CONNECTOR_SERVICE_CONFIG_FILE,
]
cmd = [
"gcloud",
"compute",
"ssh",
name,
"--zone",
vm_zone,
"--command",
f"sudo /var/app/bin/connectors -c {CLI_CONFIG_FILE} {' '.join(connector_command)}",
]
result = run_gcloud_cmd(cmd)
connector_id = re.search("ID:\s([\w\-_]+)", result.stdout.decode("utf-8"))[1]
# Start the service
cmd = [
"gcloud",
"compute",
"ssh",
name,
"--zone",
vm_zone,
"--command",
f"sudo /var/app/bin/elastic-ingest -c {CONNECTOR_SERVICE_CONFIG_FILE} --debug --filebeat >~/service.log 2>&1 &",
]
run_gcloud_cmd(cmd)
for test in scenario["tests"]:
click.echo(f"Run test: {test['name']}")
cmd = [
"gcloud",
"compute",
"ssh",
name,
"--zone",
vm_zone,
"--command",
f"sudo /var/app/bin/connectors -c {CLI_CONFIG_FILE} job start -i {connector_id} -t {test['job_type']} --format json",
]
result = run_gcloud_cmd(cmd)
job_id = json.loads(result.stdout.decode("utf-8"))["id"]
timeout = 0
timeout_step = test["timeout"] / 5
while True:
cmd = [
"gcloud",
"compute",
"ssh",
name,
"--zone",
vm_zone,
"--command",
f"sudo /var/app/bin/connectors -c {CLI_CONFIG_FILE} job view {job_id} --format json",
]
result = run_gcloud_cmd(cmd)
job = json.loads(result.stdout.decode("utf-8"))
if job["job_status"] != test["match"]["status"]:
time.sleep(timeout_step)
timeout += timeout_step
else:
click.echo(click.style(f"Test {test['name']} passed", fg="green"))
break
if timeout >= test["timeout"]:
click.echo(click.style(f"Test {test['name']} failed", fg="red"))
break