in geneve/utils/__init__.py [0:0]
def load_rules(uri, paths=None, basedir=None, *, timeout=17, tries=3):
version = None
uri_parts = urlparse(uri)
if uri_parts.hostname == "epr.elastic.co":
import requests
for n in range(tries):
try:
res = requests.get(uri, timeout=timeout)
break
except requests.exceptions.ConnectTimeout:
if n == tries - 1:
raise
res.raise_for_status()
res = res.json()
if uri_parts.path == "/search":
if len(res) != 1:
raise ValueError(f"Wrong number of packages: {len(res)}")
uri_parts = uri_parts._replace(path=res[0]["download"], query="")
uri = urlunparse(uri_parts)
version = res[0]["version"]
elif uri_parts.path.startswith("/package/security_detection_engine/"):
uri_parts = uri_parts._replace(path=res["download"], query="")
uri = urlunparse(uri_parts)
version = res["version"]
with resource(uri, basedir=basedir, cachedir=dirs.cache) as resource_dir:
is_package = (resource_dir / "manifest.yml").exists()
if paths is None:
paths = "kibana/security_rule/*.json" if is_package else "rules/**/*.toml"
if isinstance(paths, str):
paths = (paths,)
if is_package:
files = {}
for path in paths:
for filepath in resource_dir.glob(path):
rule_id, *rule_rev = filepath.stem.split("_")
rule_rev = int(rule_rev[0]) if rule_rev else 0
try:
if rule_rev > files[rule_id][0]:
files[rule_id] = (rule_rev, filepath)
except KeyError:
files[rule_id] = (rule_rev, filepath)
filenames = (filename for _, filename in files.values())
import json
else:
filenames = (filename for path in paths for filename in resource_dir.glob(path))
import pytoml
rules = []
for filename in filenames:
with open(filename) as f:
if is_package:
rule = json.load(f)["attributes"]
else:
rule = pytoml.load(f)["rule"]
rule["path"] = Path(".").joinpath(*Path(filename).relative_to(resource_dir).parts[1:])
rules.append(SimpleNamespace(**rule))
return version, rules