in detection_rules/integrations.py [0:0]
def build_integrations_schemas(overwrite: bool, integration: str = None) -> None:
"""Builds a new local copy of integration-schemas.json.gz from EPR integrations."""
saved_integration_schemas = {}
# Check if the file already exists and handle accordingly
if overwrite and SCHEMA_FILE_PATH.exists():
SCHEMA_FILE_PATH.unlink()
final_integration_schemas = {}
elif SCHEMA_FILE_PATH.exists():
final_integration_schemas = load_integrations_schemas()
else:
final_integration_schemas = {}
# Load the integration manifests
integration_manifests = load_integrations_manifests()
# if a single integration is specified, only process that integration
if integration:
if integration in integration_manifests:
integration_manifests = {integration: integration_manifests[integration]}
else:
raise ValueError(f"Integration {integration} not found in manifest.")
# Loop through the packages and versions
for package, versions in integration_manifests.items():
print(f"processing {package}")
final_integration_schemas.setdefault(package, {})
for version, manifest in versions.items():
if package in saved_integration_schemas and version in saved_integration_schemas[package]:
continue
# Download the zip file
download_url = f"https://epr.elastic.co{manifest['download']}"
response = requests.get(download_url)
response.raise_for_status()
# Update the final integration schemas
final_integration_schemas[package].update({version: {}})
# Open the zip file
with unzip(response.content) as zip_ref:
for file in zip_ref.namelist():
file_data_bytes = zip_ref.read(file)
# Check if the file is a match
if glob.fnmatch.fnmatch(file, '*/fields/*.yml'):
integration_name = Path(file).parent.parent.name
final_integration_schemas[package][version].setdefault(integration_name, {})
schema_fields = yaml.safe_load(file_data_bytes)
# Parse the schema and add to the integration_manifests
data = flatten_ecs_schema(schema_fields)
flat_data = {field['name']: field['type'] for field in data}
final_integration_schemas[package][version][integration_name].update(flat_data)
# add machine learning jobs to the schema
if package in list(map(str.lower, definitions.MACHINE_LEARNING_PACKAGES)):
if glob.fnmatch.fnmatch(file, '*/ml_module/*ml.json'):
ml_module = json.loads(file_data_bytes)
job_ids = [job['id'] for job in ml_module['attributes']['jobs']]
final_integration_schemas[package][version]['jobs'] = job_ids
del file_data_bytes
# Write the final integration schemas to disk
with gzip.open(SCHEMA_FILE_PATH, "w") as schema_file:
schema_file_bytes = json.dumps(final_integration_schemas).encode("utf-8")
schema_file.write(schema_file_bytes)
print(f"final integrations manifests dumped: {SCHEMA_FILE_PATH}")