in detection_rules/packaging.py [0:0]
def generate_summary_and_changelog(self, changed_rule_ids, new_rule_ids, removed_rules):
"""Generate stats on package."""
summary = {
'changed': defaultdict(list),
'added': defaultdict(list),
'removed': defaultdict(list),
'unchanged': defaultdict(list)
}
changelog = {
'changed': defaultdict(list),
'added': defaultdict(list),
'removed': defaultdict(list),
'unchanged': defaultdict(list)
}
# Build an index map first
longest_name = 0
indexes = set()
for rule in self.rules:
longest_name = max(longest_name, len(rule.name))
index_list = getattr(rule.contents.data, "index", [])
if index_list:
indexes.update(index_list)
index_map = {index: str(i) for i, index in enumerate(sorted(indexes))}
def get_summary_rule_info(r: TOMLRule):
r = r.contents
rule_str = f'{r.name:<{longest_name}} (v:{r.autobumped_version} t:{r.data.type}'
if isinstance(rule.contents.data, QueryRuleData):
index = rule.contents.data.get("index") or []
rule_str += f'-{r.data.language}'
rule_str += f'(indexes:{"".join(index_map[idx] for idx in index) or "none"}'
return rule_str
def get_markdown_rule_info(r: TOMLRule, sd):
# lookup the rule in the GitHub tag v{major.minor.patch}
data = r.contents.data
rules_dir_link = f'https://github.com/elastic/detection-rules/tree/v{self.name}/rules/{sd}/'
rule_type = data.language if isinstance(data, QueryRuleData) else data.type
return f'`{r.id}` **[{r.name}]({rules_dir_link + os.path.basename(str(r.path))})** (_{rule_type}_)'
for rule in self.rules:
sub_dir = os.path.basename(os.path.dirname(rule.path))
if rule.id in changed_rule_ids:
summary['changed'][sub_dir].append(get_summary_rule_info(rule))
changelog['changed'][sub_dir].append(get_markdown_rule_info(rule, sub_dir))
elif rule.id in new_rule_ids:
summary['added'][sub_dir].append(get_summary_rule_info(rule))
changelog['added'][sub_dir].append(get_markdown_rule_info(rule, sub_dir))
else:
summary['unchanged'][sub_dir].append(get_summary_rule_info(rule))
changelog['unchanged'][sub_dir].append(get_markdown_rule_info(rule, sub_dir))
for rule in self.deprecated_rules:
sub_dir = os.path.basename(os.path.dirname(rule.path))
if rule.id in removed_rules:
summary['removed'][sub_dir].append(rule.name)
changelog['removed'][sub_dir].append(rule.name)
def format_summary_rule_str(rule_dict):
str_fmt = ''
for sd, rules in sorted(rule_dict.items(), key=lambda x: x[0]):
str_fmt += f'\n{sd} ({len(rules)})\n'
str_fmt += '\n'.join(' - ' + s for s in sorted(rules))
return str_fmt or '\nNone'
def format_changelog_rule_str(rule_dict):
str_fmt = ''
for sd, rules in sorted(rule_dict.items(), key=lambda x: x[0]):
str_fmt += f'\n- **{sd}** ({len(rules)})\n'
str_fmt += '\n'.join(' - ' + s for s in sorted(rules))
return str_fmt or '\nNone'
def rule_count(rule_dict):
count = 0
for _, rules in rule_dict.items():
count += len(rules)
return count
today = str(datetime.date.today())
summary_fmt = [f'{sf.capitalize()} ({rule_count(summary[sf])}): \n{format_summary_rule_str(summary[sf])}\n'
for sf in ('added', 'changed', 'removed', 'unchanged') if summary[sf]]
change_fmt = [f'{sf.capitalize()} ({rule_count(changelog[sf])}): \n{format_changelog_rule_str(changelog[sf])}\n'
for sf in ('added', 'changed', 'removed') if changelog[sf]]
summary_str = '\n'.join([
f'Version {self.name}',
f'Generated: {today}',
f'Total Rules: {len(self.rules)}',
f'Package Hash: {self.get_package_hash(verbose=False)}',
'---',
'(v: version, t: rule_type-language)',
'Index Map:\n{}'.format("\n".join(f" {v}: {k}" for k, v in index_map.items())),
'',
'Rules',
*summary_fmt
])
changelog_str = '\n'.join([
f'# Version {self.name}',
f'_Released {today}_',
'',
'### Rules',
*change_fmt,
'',
'### CLI'
])
return summary_str, changelog_str