in detection_rules/main.py [0:0]
def create_dnstwist_index(ctx: click.Context, input_file: click.Path):
"""Create a dnstwist index in Elasticsearch to work with a threat match rule."""
from elasticsearch import Elasticsearch
es_client: Elasticsearch = ctx.obj['es']
click.echo(f'Attempting to load dnstwist data from {input_file}')
dnstwist_data: dict = load_dump(str(input_file))
click.echo(f'{len(dnstwist_data)} records loaded')
original_domain = next(r['domain-name'] for r in dnstwist_data if r.get('fuzzer', '') == 'original*')
click.echo(f'Original domain name identified: {original_domain}')
domain = original_domain.split('.')[0]
domain_index = f'dnstwist-{domain}'
# If index already exists, prompt user to confirm if they want to overwrite
if es_client.indices.exists(index=domain_index):
if click.confirm(
f"dnstwist index: {domain_index} already exists for {original_domain}. Do you want to overwrite?",
abort=True):
es_client.indices.delete(index=domain_index)
fields = [
"dns-a",
"dns-aaaa",
"dns-mx",
"dns-ns",
"banner-http",
"fuzzer",
"original-domain",
"dns.question.registered_domain"
]
timestamp_field = "@timestamp"
mappings = {"mappings": {"properties": {f: {"type": "keyword"} for f in fields}}}
mappings["mappings"]["properties"][timestamp_field] = {"type": "date"}
es_client.indices.create(index=domain_index, body=mappings)
# handle dns.question.registered_domain separately
fields.pop()
es_updates = []
now = datetime.utcnow()
for item in dnstwist_data:
if item['fuzzer'] == 'original*':
continue
record = item.copy()
record.setdefault('dns', {}).setdefault('question', {}).setdefault('registered_domain', item.get('domain-name'))
for field in fields:
record.setdefault(field, None)
record['@timestamp'] = now
es_updates.extend([{'create': {'_index': domain_index}}, record])
click.echo(f'Indexing data for domain {original_domain}')
results = es_client.bulk(body=es_updates)
if results['errors']:
error = {r['create']['result'] for r in results['items'] if r['create']['status'] != 201}
client_error(f'Errors occurred during indexing:\n{error}')
click.echo(f'{len(results["items"])} watchlist domains added to index')
click.echo('Run `prep-rule` and import to Kibana to create alerts on this index')