server/plugins/defuzzer.py (161 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import calendar import re import shlex import typing """ This is the query de-fuzzer library for Foal. It turns a URL search query into an ES query """ def defuzz(formdata: dict, nodate: bool = False, list_override: typing.Optional[str] = None) -> dict: # Default to 30 day date range daterange = {"gt": "now-30d", "lt": "now+1d"} # Custom date range? # If a month is the only thing, fake start and end if "date" in formdata and "e" not in formdata: formdata["s"] = formdata["date"] formdata["e"] = formdata["date"] # classic start and end month params if "s" in formdata and "e" in formdata: if not re.match(r"\d{4}-\d{1,2}$", formdata["s"]): raise ValueError("Keyword 's' must be of type YYYY-MM") if not re.match(r"\d{4}-\d{1,2}$", formdata["e"]): raise ValueError("Keyword 'e' must be of type YYYY-MM") syear, smonth = formdata["s"].split("-", 1) eyear, emonth = formdata["e"].split("-", 1) _estart, eend = calendar.monthrange(int(eyear), int(emonth)) daterange = { "gt": "%04u/%02u/01 00:00:00" % (int(syear), int(smonth)), "lt": "%04u/%02u/%02u 23:59:59" % (int(eyear), int(emonth), eend), } # days ago to start, and number of days to match elif "dfrom" in formdata and "dto" in formdata: dfrom = formdata["dfrom"] dto = formdata["dto"] if re.match(r"\d+$", dfrom) and re.match(r"\d+$", dto): ef = int(dfrom) et = int(dto) if ef > 0 and et > 0: if et > ef: et = ef # avoid overruning into the future daterange = { "gte": "now-%dd" % ef, "lte": "now-%dd" % (ef - et), } else: raise ValueError("Keywords 'dfrom' and 'dto' must be numeric") # Advanced date formatting elif "d" in formdata: # The more/less than N days/weeks/months/years ago m = re.match(r"^(lte|gte)=([0-9]+[Mwyd])$", formdata["d"]) if m: t = m.group(1) r = m.group(2) if t == "lte" and r: daterange = {"gt": "now-%s" % r} elif t == "gte" and r: daterange = {"lt": "now-%s" % r} else: # simple one month listing m = re.match(r"^(\d\d\d\d-\d+)$", formdata["d"]) if m: xdate = m.group(1) dyear, dmonth = xdate.split("-", 1) daterange = { "gte": "%04u-%02u-01||/M" % (int(dyear), int(dmonth)), "lte": "%04u-%02u-01||/M" % (int(dyear), int(dmonth)), "format": "yyyy-MM-dd", } else: # dfr and dto defining a time span m = re.match(r"^dfr=(\d\d\d\d-\d+-\d+)\|dto=(\d\d\d\d-\d+-\d+)$", formdata["d"]) if m: dfr = m.group(1) dto = m.group(2) syear, smonth, sday = dfr.split("-", 2) eyear, emonth, eday = dto.split("-", 2) daterange = { "gt": "%04u/%02u/%02u 00:00:00" % (int(syear), int(smonth), int(sday)), "lt": "%04u/%02u/%02u 23:59:59" % (int(eyear), int(emonth), int(eday)), } # List parameter(s) if list_override: # Certain requests use the full list ID as a single variable. Allow for that if so. if not list_override.count("@") == 1: raise ValueError("list_override must contain exactly one @ character") listname, fqdn = list_override.split("@", 1) else: fqdn = formdata.get("domain", '') # Must be provided listname = formdata.get("list", '') # Must be provided if not fqdn: raise ValueError("You must specify a domain part of the mailing list(s) to search, or * for wildcard search.") if not listname: raise ValueError("You must specify a list part of the mailing list(s) to search, or * for wildcard search.") if "@" in listname: raise ValueError("The list component of the List ID(s) cannot contain @, please use both list and domain keywords for searching.") list_raw = "<%s.%s>" % (listname, fqdn) # Default is to look in a specific list query_list_hash: typing.Dict = {"term": {"list_raw": list_raw}} # *@fqdn match? if listname == "*" and fqdn != "*": query_list_hash = {"wildcard": {"list_raw": {"value": "*.%s>" % fqdn}}} # listname@* match? if listname != "*" and fqdn == "*": query_list_hash = {"wildcard": {"list_raw": "<%s.*>" % listname}} # *@* ?? if listname == "*" and fqdn == "*": query_list_hash = {"wildcard": {"list_raw": "*"}} must = [query_list_hash] must_not = [] # Append date range if not excluded if not nodate: must.append({"range": {"date": daterange}}) # Query string search: # - foo bar baz: find emails with these words # - orange -apples: fond email with oranges but not apples # - "this sentence": find emails with this exact string if "q" in formdata: qs = formdata["q"].replace(":", "") try: bits = shlex.split(qs) except ValueError: # Uneven number of quotes, default to split on whitespace instead bits = qs.split() query_should_match = [] query_should_not_match = [] for bit in bits: force_positive = False # Translate -- into a positive '-', so you can find "-1" etc if bit[0:2] == "--": force_positive = True bit = bit[1:] # Negatives if bit[0] == "-" and not force_positive: query_should_not_match.append(bit[1:]) # Positives else: query_should_match.append(bit) if query_should_match: query_should_match_expanded = [] for x in query_should_match: query_should_match_expanded.append( { "bool": { "should": [ { "multi_match": { "fields": ["from", "body", "subject"], "query": x, "type": "phrase", }, }, ] } } ) xmust = {"bool": {"minimum_should_match": len(query_should_match), "should": query_should_match_expanded}} must.append(xmust) for x in query_should_not_match: must_not.append( { "match": { "subject": x, } } ) must_not.append( { "match": { "from": x, } } ) must_not.append( { "match": { "body": x, } } ) # Header parameters for header in ["from", "subject", "body", "to", "messageid"]: hname = "header_%s" % header if hname in formdata: hvalue = formdata[hname] # '-' not allowed in variable names, so we convert here if header == 'messageid': header = 'message-id' must.append({"match_phrase": {header: hvalue}}) query_as_bool = {"must": must} if must_not: query_as_bool["must_not"] = must_not return query_as_bool