in detection_rules/integrations.py [0:0]
def build_integrations_manifest(overwrite: bool, rule_integrations: list = [],
integration: str = None, prerelease: bool = False) -> None:
"""Builds a new local copy of manifest.yaml from integrations Github."""
def write_manifests(integrations: dict) -> None:
manifest_file = gzip.open(MANIFEST_FILE_PATH, "w+")
manifest_file_bytes = json.dumps(integrations).encode("utf-8")
manifest_file.write(manifest_file_bytes)
manifest_file.close()
if overwrite:
if MANIFEST_FILE_PATH.exists():
MANIFEST_FILE_PATH.unlink()
final_integration_manifests = {integration: {} for integration in rule_integrations} \
or {integration: {}}
rule_integrations = rule_integrations or [integration]
for integration in rule_integrations:
integration_manifests = get_integration_manifests(integration, prerelease=prerelease)
for manifest in integration_manifests:
validated_manifest = IntegrationManifestSchema(unknown=EXCLUDE).load(manifest)
package_version = validated_manifest.pop("version")
final_integration_manifests[integration][package_version] = validated_manifest
if overwrite and rule_integrations:
write_manifests(final_integration_manifests)
elif integration and not overwrite:
manifest_file = gzip.open(MANIFEST_FILE_PATH, "rb")
manifest_file_bytes = manifest_file.read()
manifest_file_contents = json.loads(manifest_file_bytes.decode("utf-8"))
manifest_file.close()
manifest_file_contents[integration] = final_integration_manifests[integration]
write_manifests(manifest_file_contents)
print(f"final integrations manifests dumped: {MANIFEST_FILE_PATH}")