in detection_rules/beats.py [0:0]
def _decompress_and_save_schema(url, release_name):
print(f"Downloading beats {release_name}")
response = requests.get(url)
print(f"Downloaded {len(response.content) / 1024.0 / 1024.0:.2f} MB release.")
fs = {}
parsed = {}
with unzip(response.content) as archive:
base_directory = archive.namelist()[0]
for name in archive.namelist():
if os.path.basename(name) in ("fields.yml", "fields.common.yml", "config.yml"):
contents = archive.read(name)
# chop off the base directory name
key = name[len(base_directory):]
if key.startswith("x-pack"):
key = key[len("x-pack") + 1:]
try:
decoded = yaml.safe_load(contents)
except yaml.YAMLError:
print(f"Error loading {name}")
# create a hierarchical structure
parsed[key] = decoded
branch = fs
directory, base_name = os.path.split(key)
for limb in directory.split(os.path.sep):
branch = branch.setdefault("folders", {}).setdefault(limb, {})
branch.setdefault("files", {})[base_name] = decoded
# remove all non-beat directories
fs = {k: v for k, v in fs.get("folders", {}).items() if k.endswith("beat")}
print(f"Saving detection_rules/etc/beats_schema/{release_name}.json")
compressed = gzip_compress(json.dumps(fs, sort_keys=True, cls=DateTimeEncoder))
path = get_etc_path("beats_schemas", release_name + ".json.gz")
with open(path, 'wb') as f:
f.write(compressed)