def create_dnstwist_index()

in detection_rules/main.py [0:0]


def create_dnstwist_index(ctx: click.Context, input_file: click.Path):
    """Create a dnstwist index in Elasticsearch to work with a threat match rule."""
    from elasticsearch import Elasticsearch

    es_client: Elasticsearch = ctx.obj['es']

    click.echo(f'Attempting to load dnstwist data from {input_file}')
    dnstwist_data: dict = load_dump(str(input_file))
    click.echo(f'{len(dnstwist_data)} records loaded')

    original_domain = next(r['domain-name'] for r in dnstwist_data if r.get('fuzzer', '') == 'original*')
    click.echo(f'Original domain name identified: {original_domain}')

    domain = original_domain.split('.')[0]
    domain_index = f'dnstwist-{domain}'
    # If index already exists, prompt user to confirm if they want to overwrite
    if es_client.indices.exists(index=domain_index):
        if click.confirm(
                f"dnstwist index: {domain_index} already exists for {original_domain}. Do you want to overwrite?",
                abort=True):
            es_client.indices.delete(index=domain_index)

    fields = [
        "dns-a",
        "dns-aaaa",
        "dns-mx",
        "dns-ns",
        "banner-http",
        "fuzzer",
        "original-domain",
        "dns.question.registered_domain"
    ]
    timestamp_field = "@timestamp"
    mappings = {"mappings": {"properties": {f: {"type": "keyword"} for f in fields}}}
    mappings["mappings"]["properties"][timestamp_field] = {"type": "date"}

    es_client.indices.create(index=domain_index, body=mappings)

    # handle dns.question.registered_domain separately
    fields.pop()
    es_updates = []
    now = datetime.utcnow()

    for item in dnstwist_data:
        if item['fuzzer'] == 'original*':
            continue

        record = item.copy()
        record.setdefault('dns', {}).setdefault('question', {}).setdefault('registered_domain', item.get('domain-name'))

        for field in fields:
            record.setdefault(field, None)

        record['@timestamp'] = now

        es_updates.extend([{'create': {'_index': domain_index}}, record])

    click.echo(f'Indexing data for domain {original_domain}')

    results = es_client.bulk(body=es_updates)
    if results['errors']:
        error = {r['create']['result'] for r in results['items'] if r['create']['status'] != 201}
        client_error(f'Errors occurred during indexing:\n{error}')

    click.echo(f'{len(results["items"])} watchlist domains added to index')
    click.echo('Run `prep-rule` and import to Kibana to create alerts on this index')