in detection_rules/devtools.py [0:0]
def generate_schema(token: str, schema: str, schema_version: str, endpoint_target: str, overwrite: bool):
"""Download schemas and generate flattend schema."""
github = GithubClient(token)
client = github.authenticated_client
if schema_version and not Version.parse(schema_version):
raise click.BadParameter(f"Invalid schema version: {schema_version}")
click.echo(f"Generating {schema} schema")
if schema == "endgame":
if not schema_version:
raise click.BadParameter("Schema version required")
schema_manager = EndgameSchemaManager(client, schema_version)
schema_manager.save_schemas(overwrite=overwrite)
# ecs, beats and endpoint schemas are refreshed during release
# these schemas do not require a schema version
if schema == "ecs":
download_schemas(refresh_all=True)
if schema == "beats":
if not schema_version:
download_latest_beats_schema()
refresh_main_schema()
else:
download_beats_schema(schema_version)
# endpoint package custom schemas can be downloaded
# this download requires a specific schema target
if schema == "endpoint":
repo = client.get_repo("elastic/endpoint-package")
contents = repo.get_contents("custom_schemas")
optional_endpoint_targets = [
Path(f.path).name.replace("custom_", "").replace(".yml", "")
for f in contents if f.name.endswith(".yml") or Path(f.path).name == endpoint_target
]
if not endpoint_target:
raise click.BadParameter("Endpoint target required")
if endpoint_target not in optional_endpoint_targets:
raise click.BadParameter(f"""Invalid endpoint schema target: {endpoint_target}