in detection_rules/ecs.py [0:0]
def download_schemas(refresh_master=True, refresh_all=False, verbose=True):
"""Download additional schemas from ecs releases."""
existing = [Version.parse(v) for v in get_schema_map()] if not refresh_all else []
url = 'https://api.github.com/repos/elastic/ecs/releases'
releases = requests.get(url)
for release in releases.json():
version = Version.parse(release.get('tag_name', '').lstrip('v'))
# we don't ever want beta
if not version or version < Version.parse("1.0.1") or version in existing:
continue
schema_dir = os.path.join(ECS_SCHEMAS_DIR, str(version))
with unzip(requests.get(release['zipball_url']).content) as archive:
name_list = archive.namelist()
base = name_list[0]
# members = [m for m in name_list if m.startswith('{}{}/'.format(base, 'use-cases')) and m.endswith('.yml')]
members = ['{}generated/ecs/ecs_flat.yml'.format(base), '{}generated/ecs/ecs_nested.yml'.format(base)]
saved = []
for member in members:
file_name = os.path.basename(member)
os.makedirs(schema_dir, exist_ok=True)
# load as yaml, save as json
contents = yaml.safe_load(archive.read(member))
out_file = file_name.replace(".yml", ".json.gz")
compressed = gzip_compress(json.dumps(contents, sort_keys=True, cls=DateTimeEncoder))
new_path = get_etc_path(ECS_NAME, str(version), out_file)
with open(new_path, 'wb') as f:
f.write(compressed)
saved.append(out_file)
if verbose:
print('Saved files to {}: \n\t- {}'.format(schema_dir, '\n\t- '.join(saved)))
# handle working master separately
if refresh_master:
master_ver = requests.get('https://raw.githubusercontent.com/elastic/ecs/master/version')
master_ver = Version.parse(master_ver.text.strip())
master_schema = requests.get('https://raw.githubusercontent.com/elastic/ecs/master/generated/ecs/ecs_flat.yml')
master_schema = yaml.safe_load(master_schema.text)
# prepend with underscore so that we can differentiate the fact that this is a working master version
# but first clear out any existing masters, since we only ever want 1 at a time
existing_master = glob.glob(os.path.join(ECS_SCHEMAS_DIR, 'master_*'))
for m in existing_master:
shutil.rmtree(m, ignore_errors=True)
master_dir = "master_{}".format(master_ver)
os.makedirs(get_etc_path(ECS_NAME, master_dir), exist_ok=True)
compressed = gzip_compress(json.dumps(master_schema, sort_keys=True, cls=DateTimeEncoder))
new_path = get_etc_path(ECS_NAME, master_dir, "ecs_flat.json.gz")
with open(new_path, 'wb') as f:
f.write(compressed)
if verbose:
print('Saved files to {}: \n\t- {}'.format(master_dir, 'ecs_flat.json.gz'))