def build_integrations_schemas()

in detection_rules/integrations.py [0:0]


def build_integrations_schemas(overwrite: bool, integration: str = None) -> None:
    """Builds a new local copy of integration-schemas.json.gz from EPR integrations."""

    saved_integration_schemas = {}

    # Check if the file already exists and handle accordingly
    if overwrite and SCHEMA_FILE_PATH.exists():
        SCHEMA_FILE_PATH.unlink()
        final_integration_schemas = {}
    elif SCHEMA_FILE_PATH.exists():
        final_integration_schemas = load_integrations_schemas()
    else:
        final_integration_schemas = {}

    # Load the integration manifests
    integration_manifests = load_integrations_manifests()

    # if a single integration is specified, only process that integration
    if integration:
        if integration in integration_manifests:
            integration_manifests = {integration: integration_manifests[integration]}
        else:
            raise ValueError(f"Integration {integration} not found in manifest.")

    # Loop through the packages and versions
    for package, versions in integration_manifests.items():
        print(f"processing {package}")
        final_integration_schemas.setdefault(package, {})
        for version, manifest in versions.items():
            if package in saved_integration_schemas and version in saved_integration_schemas[package]:
                continue

            # Download the zip file
            download_url = f"https://epr.elastic.co{manifest['download']}"
            response = requests.get(download_url)
            response.raise_for_status()

            # Update the final integration schemas
            final_integration_schemas[package].update({version: {}})

            # Open the zip file
            with unzip(response.content) as zip_ref:
                for file in zip_ref.namelist():
                    file_data_bytes = zip_ref.read(file)
                    # Check if the file is a match
                    if glob.fnmatch.fnmatch(file, '*/fields/*.yml'):
                        integration_name = Path(file).parent.parent.name
                        final_integration_schemas[package][version].setdefault(integration_name, {})
                        schema_fields = yaml.safe_load(file_data_bytes)

                        # Parse the schema and add to the integration_manifests
                        data = flatten_ecs_schema(schema_fields)
                        flat_data = {field['name']: field['type'] for field in data}

                        final_integration_schemas[package][version][integration_name].update(flat_data)

                    # add machine learning jobs to the schema
                    if package in list(map(str.lower, definitions.MACHINE_LEARNING_PACKAGES)):
                        if glob.fnmatch.fnmatch(file, '*/ml_module/*ml.json'):
                            ml_module = json.loads(file_data_bytes)
                            job_ids = [job['id'] for job in ml_module['attributes']['jobs']]
                            final_integration_schemas[package][version]['jobs'] = job_ids

                    del file_data_bytes

    # Write the final integration schemas to disk
    with gzip.open(SCHEMA_FILE_PATH, "w") as schema_file:
        schema_file_bytes = json.dumps(final_integration_schemas).encode("utf-8")
        schema_file.write(schema_file_bytes)

    print(f"final integrations manifests dumped: {SCHEMA_FILE_PATH}")