python-threatexchange/threatexchange/fb_threatexchange/descriptor.py (147 lines of code) (raw):

#!/usr/bin/env python # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved """ Wrappers for the json returned by the ThreatExchange API to typed objects. """ import collections import typing as t class ThreatDescriptor(t.NamedTuple): """ Wrapper around ThreatExchange JSON for a ThreatDescriptor. Example: { "id": "3058061737574159" "raw_indicator": "facefacefacefacefacefacefaceface", "type": "HASH_MD5", "owner": { "id": "616912251743987", ..., }, "status": "MALICIOUS", "tags": null, "added_on": "2020-07-01T18:31:15+0000" ... } """ # TODO - do something smarter than this - static # class variable problematic, currently set in main.py MY_APP_ID = -1 # type: ignore # You declared the indicator was in the collaboration label set TRUE_POSITIVE = "true_positive" # type: ignore # You declared the indicator was not in the collaboration label set FALSE_POSITIVE = "false_positive" # type: ignore # Someone declared the indicator was not in the collaboration label set DISPUTED = "disputed" # type: ignore # Special tags to mark whether you (or someone else) # has weighed in on the indicator SPECIAL_TAGS = frozenset((TRUE_POSITIVE, FALSE_POSITIVE, DISPUTED)) # type: ignore id: int raw_indicator: str indicator_type: str owner_id: int tags: t.List[str] status: str added_on: str @classmethod def from_te_json(cls, my_app_id: t.Union[str, int], td_json) -> "ThreatDescriptor": # Hack for now, but nearly refactored out of cls state cls.MY_APP_ID = my_app_id owner_id_str = td_json["owner"]["id"] tags = td_json.get("tags", []) # This is needed because ThreatExchangeAPI.get_threat_descriptors() # does a transform, but other locations do not if isinstance(tags, dict): tags = sorted(tag["text"] for tag in tags["data"]) td = cls( # type: ignore id=int(td_json["id"]), raw_indicator=td_json["raw_indicator"], indicator_type=td_json["type"], owner_id=int(owner_id_str), tags=[tag for tag in tags if tag not in ThreatDescriptor.SPECIAL_TAGS], status=td_json["status"], added_on=td_json["added_on"], ) # Add special tags # TODO - Consider stripping out collab labels # from FALSE_POSITIVE & NON_MALICIOUS # Is this my descriptor? if td.is_mine: if td.status == "NON_MALICIOUS": td.tags.append(ThreatDescriptor.FALSE_POSITIVE) else: td.tags.append(ThreatDescriptor.TRUE_POSITIVE) # Disputed path #1 - mark as non_malicious elif td.status == "NON_MALICIOUS": td.tags.append(ThreatDescriptor.DISPUTED) # Disputed path #2 - react with DISAGREE_WITH_TAGS elif "DISAGREE_WITH_TAGS" in td_json.get("my_reactions", ()): td.tags.append(ThreatDescriptor.FALSE_POSITIVE) elif any( t == "DISAGREE_WITH_TAGS" for r in td_json.get("reactions", []) for t in r ): td.tags.append(ThreatDescriptor.DISPUTED) return td def to_params(self) -> t.Dict[str, t.Any]: params = dict(self.__dict__) params["type"] = params.pop("indicator_type") if not params["tags"]: del params["tags"] return params @property def is_true_positive(self) -> bool: return self.TRUE_POSITIVE in self.tags @property def is_false_positive(self) -> bool: return self.FALSE_POSITIVE in self.tags @property def is_mine(self) -> bool: """This Descriptor is my App's Opinion""" return self.MY_APP_ID == self.owner_id class SimpleDescriptorRollup: """ A simple way to merge opinions on the same indicator. This contains all the information needed for simple SignalType state. """ IS_MY_OPINION = "mine" __slots__ = ["first_descriptor_id", "added_on", "labels"] def __init__( self, first_descriptor_id: int, added_on: str, labels: t.Set[str] ) -> None: self.first_descriptor_id = first_descriptor_id self.added_on = added_on # TODO - convert to int? self.labels = set(labels) @classmethod def from_descriptor(cls, descriptor: ThreatDescriptor) -> "SimpleDescriptorRollup": return cls(descriptor.id, descriptor.added_on, set(descriptor.tags)) @classmethod def from_descriptors( cls, descriptors: t.Iterable[ThreatDescriptor] ) -> "SimpleDescriptorRollup": ret = None for d in descriptors: if not ret: ret = cls.from_descriptor(d) else: ret.merge(d) if ret: return ret raise ValueError("Empty descriptor list!") def merge(self, descriptor: ThreatDescriptor) -> None: # Is the other descriptor mine? If so, unconditionally take it and clear # everything else if descriptor.is_mine: self.added_on = self.IS_MY_OPINION self.first_descriptor_id = descriptor.id self.labels = set(descriptor.tags) return # My descriptor beats my reactions, and I don't want # to take anyone else's opinion elif self.added_on == self.IS_MY_OPINION: return # Is my reaction? elif descriptor.is_false_positive: self.added_on = self.IS_MY_OPINION self.first_descriptor_id = descriptor.id self.labels = set(descriptor.tags) return # Else merge the labels together self.added_on, self.first_descriptor_id = min( (self.added_on, self.first_descriptor_id), (descriptor.added_on, descriptor.id), ) self.labels.union(descriptor.tags) def as_row(self) -> t.Tuple[int, str, str]: """Simple conversion to CSV row""" return self.first_descriptor_id, self.added_on, " ".join(self.labels) @classmethod def from_row(cls, row: t.List) -> "SimpleDescriptorRollup": """Simple conversion from CSV row""" labels = [] if row[2]: labels = row[2].split(" ") return cls(int(row[0]), row[1], set(labels)) @classmethod def from_threat_updates_json( cls, my_app_id: int, te_json: t.Dict[str, t.Any] ) -> t.Optional["SimpleDescriptorRollup"]: if te_json["should_delete"]: return None # https://github.com/facebook/ThreatExchange/issues/834 if not te_json.get("descriptors", {}).get("data"): return None descriptors = [] for descriptor_json in te_json["descriptors"]["data"]: # Look at me ma! I'm modifying input paramaters! descriptor_json["raw_indicator"] = te_json["indicator"] descriptor_json["type"] = te_json["type"] descriptors.append( ThreatDescriptor.from_te_json(my_app_id, descriptor_json) ) return cls.from_descriptors(descriptors) @staticmethod def te_threat_updates_fields() -> t.Tuple[str, ...]: return ( "id", "indicator", "type", "last_updated", "should_delete", "descriptors{%s}" % ",".join( ( "reactions", "my_reactions", "owner{id}", "tags", "status", "added_on", ) ), )