tools/stix-to-ecs/stix_to_ecs.py (425 lines of code) (raw):
# coding: utf-8
from __future__ import annotations
import pathlib
import json
import copy
import argparse
import sys
import elasticsearch
import typing
import getpass
import dataclasses
from stix2 import pattern_visitor
from stix2 import patterns
AUTHOR = ("Cyril François (@cyril-t-f)", "RoDerick Hines (@roderickch01)")
VERSION = "0.3.1"
T = typing.TypeVar("T")
Json = dict[str, T]
ECSIndicator = STIXIndicator = Json
ECSIndicators = STIXIndicators = list[Json]
MARKING_TO_TLP = {
"613f2e26-407d-48c7-9eca-b8e91df99dc9": "clear",
"34098fce-860f-48ae-8e50-ebd3cc5e41da": "green",
"f88d31f6-486f-44da-b317-01333bde0b82": "amber",
"826578e1-40ad-459f-bc73-ede076f81f37": "amber_strict",
"5e57c739-391a-4eb3-b6be-7d15ca92d5ed": "red",
}
STIX_ECS_WORD_MAPPING = {
"hashes": "hash",
"MD5": "md5",
"SHA-1": "sha1",
"SHA-256": "sha256",
}
ECS_WORD_FIELD_MAPPING = {
"domain-name": "domain",
"ipv4-addr": "ip",
"ipv6-addr": "ip",
}
# ctf -> Add an ECS type here to disable it
UNSUPPORTED_ECS_INDICATOR_TYPES = {
"cryptographic-key",
}
@dataclasses.dataclass
class ElasticInfo(object):
cloud_id: str | None
api_key: str | None
url: str | None
username: str | None
password: str | None
index: str | None
verify_certs: bool
def check(self) -> str | None:
if not self.index:
return "`index` is missing"
if self.cloud_id and self.url:
return "`cloud_id` and `url` can't be both provided"
if not self.cloud_id and not self.url:
return "Neither `cloud_id` nor `url` are provided"
if self.api_key and (self.username or self.password):
return "`api_key` and `username` or `password` can't be both provided"
if not self.api_key and not self.password:
return "Neither `api_key` nor `password` are provided"
if not self.api_key:
for k in ("username", "password"):
if not self.__getattribute__(k):
return f"`{k}` is missing"
return None
@dataclasses.dataclass
class Options(object):
input: pathlib.Path
output: pathlib.Path
recursive: bool
provider: str
elastic_info: ElasticInfo
class STIXToECSPatternParser(object):
"""
A class to parse and convert a STIX pattern into ECS data.
"""
def __init__(self, pattern: str) -> None:
"""
Intialize the parser then parse and convert the given STIX pattern into ECS data.
:param pattern: The STIX pattern to parse and convert.
"""
self.__loot = list()
self.__type = ""
self.__field_name = ""
self.__data: dict = dict()
self.__visit(pattern_visitor.create_pattern_object(pattern))
if not self.__loot:
raise RuntimeError("No loot, parsing failed")
self.__set_type()
if not self.__type:
raise RuntimeError("No type, parsing failed")
self.__set_field_name()
if not self.__field_name:
raise RuntimeError("No field name, parsing failed")
self.__set_data()
if not self.__data:
raise RuntimeError("No data, parsing failed")
def __set_data(self) -> STIXToECSPatternParser:
match self.__type:
case "file":
self.__set_file_data()
case _:
self.__set_other_data()
return self
def __set_field_name(self) -> STIXToECSPatternParser:
self.__field_name = convert_ecs_word_to_field(self.__type)
return self
def __set_file_data(self) -> STIXToECSPatternParser:
for x in self.__loot:
if not x[1] in self.__data.keys():
self.__data[x[1]] = dict()
# ctf -> I.e property: "sha256:..., sha1:..., etc"
# ctf -> Will break if property length is > 2, i.e "sha256:sha1:..."
properties = x[2].split(":")
match len(properties):
case 1:
self.__data[x[1]] = properties[0]
case 2:
self.__data[x[1]][properties[0]] = properties[1]
case _:
raise NotImplemented(f"Properties length is > 2")
return self
def __set_other_data(self) -> STIXToECSPatternParser:
# ctf -> Fragile, expect to break if pattern is more complex. Need counter example if any.
self.__data = self.__loot[0][2]
return self
def __set_type(self) -> STIXToECSPatternParser:
self.__type = self.__loot[0][0]
for x in self.__loot[1:]:
if x[0] != self.__type:
raise RuntimeError(
f"Inconsistent types detected in pattern, must be {self.__type} but found {x[0]}"
)
return self
def __visit(self, o) -> STIXToECSPatternParser:
ot = type(o)
match ot:
case patterns.ObservationExpression:
self.__visit(o.operand)
case patterns.OrBooleanExpression | patterns.AndBooleanExpression:
for op in o.operands:
self.__visit(op)
case patterns.ParentheticalExpression:
self.__visit(o.expression)
case patterns.EqualityComparisonExpression:
self.__visit_equality_comparison_expression(o)
case _:
raise NotImplemented(ot)
return self
def __visit_equality_comparison_expression(self, o) -> STIXToECSPatternParser:
lhs = o.lhs
rhs = o.rhs
if not (t := type(lhs)) == patterns.ObjectPath:
raise NotImplemented(t)
if not (t := type(rhs)) == patterns.StringConstant:
raise NotImplemented(t)
self.__loot.append(
(
convert_stix_word_to_ecs_word(lhs.object_type_name),
convert_stix_word_to_ecs_word(lhs.property_path[0].property_name),
":".join(
[
convert_stix_word_to_ecs_word(x.property_name)
for x in lhs.property_path[1:]
]
+ [rhs.value]
),
)
)
return self
@property
def data(self) -> dict:
"""
Get ECS data.
"""
return copy.deepcopy(self.__data)
@property
def description(self) -> str:
"""
Get ECS description.
"""
if type(self.data) is dict and (hash := self.data.get("hash", None)):
# ctf -> SHA256 may not be always available, in this case do we want to crash or take an other algo?
observable = (
hash.get("sha256", None)
or hash.get("sha1", None)
or hash.get("md5", None)
)
if not observable:
raise RuntimeError(
"Missing SHA256, SHA1, or MD5 observable from set of hashes."
)
else:
observable = self.data
return f"Simple indicator of observable {{{observable}}}"
@property
def field_name(self) -> str:
"""
Get ECS field name.
"""
return self.__field_name
@property
def type(self) -> str:
"""
Get ECS type.
"""
return self.__type
def convert_ecs_word_to_field(word: str) -> str:
"""
The function convert the given ECS word into an ECS field.
:param word: The ECS word to convert to field.
:return: The ECS field.
"""
return ECS_WORD_FIELD_MAPPING.get(word, word)
def convert_stix_indicator_to_ecs_indicator(
stix_indicator: dict, provider: str | None
) -> dict:
"""
The function convert a STIX indicator into an ECS indicator.
:param stix_indicator: The STIX indicator to be converted.
:param provider: An optional provider string that will be used to override the parsed provider.
:return: The ECS indicator.
"""
parser = STIXToECSPatternParser(stix_indicator["pattern"])
tmp = dict()
tmp[parser.field_name] = parser.data
tmp["type"] = parser.type
tmp["description"] = parser.description
if first_seen := stix_indicator.get("created", None):
tmp["first_seen"] = first_seen
if provider:
tmp["provider"] = provider
elif provider_ := stix_indicator.get("created_by_ref", None):
tmp["provider"] = provider_
if external_references := stix_indicator.get("external_references", None):
tmp["reference"] = [x["url"] for x in external_references]
if labels := stix_indicator.get("labels", None):
tmp["tags"] = labels
if modified := stix_indicator.get("modified", None):
tmp["modified_at"] = modified
if (markings := stix_indicator.get("object_marking_refs", None)) and (
tlp := parse_tlp(markings)
):
tmp["marking"] = {"tlp": tlp}
return {"threat": {"indicator": tmp}}
def convert_stix_indicators_to_ecs_indicators(
stix_indicators: list[dict], provider: str | None
) -> list[dict]:
"""
The function convert a list of STIX indicators into a list of ECS indicators.
:param stix_indicators: The list of STIX indicators to be converted.
:param provider: An optional provider string that will be used to override the parsed provider.
:return: The list of ECS indicators.
"""
return [
convert_stix_indicator_to_ecs_indicator(x, provider)
for x in filter(is_stix_indicator, stix_indicators)
]
def convert_stix_word_to_ecs_word(word: str) -> str:
"""
The function convert the given STIX word into an ECS word.
:param word: The STIX word to convert into a ECS word.
:return: The ECS word.
"""
return STIX_ECS_WORD_MAPPING.get(word, word)
def flatten_list(l: list[list[T]]) -> list[T]:
tmp = list()
for x in l:
tmp += x
return tmp
def format_ecs_indicator_for_elastic(ecs_indicator: ECSIndicator) -> ECSIndicator:
"""
The function format an ECS indicator for Elastic.
:param ecs_indicator: The ECS indicator to be formatted.
:return: The formatted ECS indicator.
"""
result = copy.deepcopy(ecs_indicator)
result["@timestamp"] = result["threat"]["indicator"]["first_seen"]
result["event"] = {"category": "threat", "kind": "enrichment", "type": "indicator"}
return result
def get_json_files(path: pathlib.Path, recursive: bool) -> list[pathlib.Path]:
"""
The function generate the list of Json files at path, if the parameter is a file it returns [path].
:param path: The root directory path where we want to get Json files or the path to a file.
:param recursive: Enable recursive traversal of directory.
:return: The list of found Json files or [path] if path is a file.
"""
return (
[path]
if path.is_file()
else list(path.rglob("*.json") if recursive else path.glob("*.json"))
)
def is_stix_indicator(stix_object: dict) -> bool:
"""
The function check if a STIX object is an indicator.
:param stix_object: The STIX object to be checked.
:return: True if the STIX object is an indicator, False otherwise.
"""
return stix_object["type"] == "indicator"
def is_supported_ecs_indicator(ecs_indicator: dict) -> bool:
"""
The function check if a STIX object is an indicator.
:param stix_object: The STIX object to be checked.
:return: True if the STIX object is an indicator, False otherwise.
"""
return (
ecs_indicator["threat"]["indicator"]["type"]
not in UNSUPPORTED_ECS_INDICATOR_TYPES
)
def load_stix_objects_from_file(input_path: pathlib.Path) -> list[dict]:
"""
The function load STIX objects from a file.
:param input_path: The path of the file containing STIX objects.
:return: A list of STIX objects.
"""
with input_path.open("r") as f:
objects = json.load(f).get("objects", None)
if not objects:
raise RuntimeError('"objects field doesn\'t exist"')
return [dict(x) for x in objects]
def main() -> None:
options = get_options()
if not options:
exit(1)
files = get_json_files(options.input, options.recursive)
results = process_stix_files(files, options.provider)
if not options.output and not options.elastic_info:
print(json.dumps(flatten_list(results), indent=4))
return
if options.output:
write_ecs_files(zip(files, results), options.output)
if options.elastic_info:
write_ecs_to_elastic(flatten_list(results), options.elastic_info)
def get_options() -> Options | None:
parser = build_argument_parser()
args = parser.parse_args()
if args.recursive and not args.input.is_dir():
print("Flag -r can only be used if input is a directory")
parser.print_usage()
return None
elastic_info = None
if args.elastic:
if args.configuration:
tmp = {
k: (v if v else None)
for k, v in json.loads(args.configuration.read_text()).items()
}
elastic_info = ElasticInfo(
tmp["cloud_id"],
tmp["api_key"],
tmp["url"],
tmp["username"],
tmp["password"],
tmp["index"],
verify_certs=args.verify_certs,
)
else:
elastic_info = ElasticInfo(
args.cloud_id,
(
getpass.getpass(
"Please enter your API key to connect to the Elastic cluster\n"
)
if not args.username
else None
),
args.url,
args.username,
(
getpass.getpass(
"Please enter your password to connect to the Elastic cluster\n"
)
if args.username
else None
),
args.index,
args.verify_certs,
)
if msg := elastic_info.check():
print(msg)
parser.print_usage()
return None
return Options(args.input, args.output, args.recursive, args.provider, elastic_info)
def build_argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
sys.argv[0],
description=f"Convert STIX indicator(s) into ECS indicator(s) - Version {VERSION}",
)
parser.add_argument(
"-i",
"--input",
type=pathlib.Path,
help="STIX input file or directory",
required=True,
)
parser.add_argument(
"-o",
"--output",
type=pathlib.Path,
help="ECS output directory",
)
parser.add_argument(
"-r",
"--recursive",
help="Recursive processing when input is a directory",
action="store_true",
)
parser.add_argument(
"-e", "--elastic", action="store_true", help="Use Elastic cloud configuration"
)
parser.add_argument("-p", "--provider", help="Override ECS provider")
parser.add_argument(
"-c",
"--configuration",
type=pathlib.Path,
help="Path to the configuration file used to connect to the Elastic cluster, used with --elastic",
)
parser.add_argument(
"--cloud-id",
help="The cloud ID of the Elastic cluster, required with --elastic unless configuration file is provided (--configuration), can't be provided along --url",
)
parser.add_argument(
"--url",
type=str,
help="The URL of the Elastic cluster, required with --elastic unless configuration file is provided (--configuration), can't be provided along --cloud-id",
)
parser.add_argument(
"--username",
type=str,
help="The username of the Elastic cluster, required with --elastic unless a configuration file is provided (--configuration)",
)
parser.add_argument(
"--password",
type=str,
help="The password of the Elastic cluster, required with --elastic unless a configuration file is provided (--configuration)",
)
parser.add_argument(
"--index",
type=str,
help="Elastic cluster's index where ECS indicators will be written, required with --elastic unless configuration file is provided (--configuration)",
)
parser.add_argument(
"-x",
"--insecure",
action="store_false",
dest="verify_certs",
help="Disable TLS certificate verification when connecting to the Elastic cluster",
)
return parser
def parse_tlp(markings: list[str]) -> str | None:
"""
The function parse a TLP string from a list of marking definitions if any.
:param markings: A list of marking definitions.
:return: A TLP if found, None otherwise.
"""
for x in markings:
if tlp := MARKING_TO_TLP.get(x.replace("marking-definition--", ""), None):
return tlp
else:
return None
def process_stix_file(input_file: pathlib.Path, provider: str | None) -> list[dict]:
"""
The function load objects from a STIX file and generate a list of ECS indicators for each compatible STIX indicator object.
:param input_file: Path of the file to be processed.
:param provider: An optional provider string that will be used to override the parsed provider.
:return: A list of ECS indicators.
"""
return list(
filter(
is_supported_ecs_indicator,
convert_stix_indicators_to_ecs_indicators(
load_stix_objects_from_file(input_file), provider
),
)
)
def process_stix_files(
input_files: list[pathlib.Path], provider: str | None
) -> list[list[dict]]:
"""
The function will process a list of STIX files.
:param input_files: The list of files to be processed.
:param provider: An optional provider string that will be used to override the parsed provider.
:return: A list containing ECS indicators for each processed file.
"""
return [process_stix_file(x, provider) for x in input_files]
def write_ecs_files(
ecs_files: typing.Iterable[tuple[pathlib.Path, ECSIndicators]],
output_path: pathlib.Path,
) -> None:
"""
The function write each set of indicators to their files in the given directory.
:param ecs_files: The list of tuples containing the output file path and the associated set of indicators.
:param output_path: The path of the directory where file will be written.
"""
output_path.mkdir(exist_ok=True)
for x in ecs_files:
with output_path.joinpath(f"{x[0].stem}.ecs.ndjson").open("w") as f:
f.write("\n".join(json.dumps(x) for x in x[1]))
def write_ecs_to_elastic(
ecs_indicators: ECSIndicators, elastic_info: ElasticInfo
) -> None:
"""
The function writes each ECS indicator to the given Elastic cluster and index.
:param ecs_indicators: ECS Indicators to export to the Elastic cluster index.
:param elastic_info: Object containing connection information about the Elastic cluster.
"""
elastic = elasticsearch.Elasticsearch(
cloud_id=elastic_info.cloud_id,
api_key=elastic_info.api_key,
hosts=[elastic_info.url] if elastic_info.url else None,
basic_auth=(
(elastic_info.username, elastic_info.password)
if elastic_info.username
else None
),
verify_certs=elastic_info.verify_certs,
)
if not elastic.ping():
raise RuntimeError(f"Can't connect to the Elastic cluster: {elastic.info()}")
for x in map(format_ecs_indicator_for_elastic, ecs_indicators):
elastic.index(index=elastic_info.index, document=x)
print(f"Data successfully exported to the `{elastic_info.index}` index")
if __name__ == "__main__":
main()