in detection_rules/packaging.py [0:0]
def create_bulk_index_body(self) -> Tuple[Ndjson, Ndjson]:
"""Create a body to bulk index into a stack."""
package_hash = self.get_package_hash(verbose=False)
now = datetime.datetime.isoformat(datetime.datetime.utcnow())
create = {'create': {'_index': f'rules-repo-{self.name}-{package_hash}'}}
# first doc is summary stats
summary_doc = {
'group_hash': package_hash,
'package_version': self.name,
'rule_count': len(self.rules),
'rule_ids': [],
'rule_names': [],
'rule_hashes': [],
'source': 'repo',
'details': {'datetime_uploaded': now}
}
bulk_upload_docs = Ndjson([create, summary_doc])
importable_rules_docs = Ndjson()
for rule in self.rules:
summary_doc['rule_ids'].append(rule.id)
summary_doc['rule_names'].append(rule.name)
summary_doc['rule_hashes'].append(rule.contents.get_hash())
if rule.id in self.new_ids:
status = 'new'
elif rule.id in self.changed_ids:
status = 'modified'
else:
status = 'unmodified'
bulk_upload_docs.append(create)
relative_path = str(rule.get_base_rule_dir())
if relative_path is None:
raise ValueError(f"Could not find a valid relative path for the rule: {rule.id}")
rule_doc = dict(hash=rule.contents.get_hash(),
source='repo',
datetime_uploaded=now,
status=status,
package_version=self.name,
flat_mitre=ThreatMapping.flatten(rule.contents.data.threat).to_dict(),
relative_path=relative_path)
rule_doc.update(**rule.contents.to_api_format())
bulk_upload_docs.append(rule_doc)
importable_rules_docs.append(rule_doc)
return bulk_upload_docs, importable_rules_docs