in python-threatexchange/threatexchange/cli/match_cmd.py [0:0]
def execute(self, settings: CLISettings) -> None:
if not settings.index_store.get_available():
if not settings.in_demo_mode:
raise CommandError("No indices available. Do you need to fetch?")
self.stderr("You haven't built any indices, so we'll call `fetch` for you!")
FetchCommand().execute(settings)
signal_types = settings.get_signal_types_for_content(self.content_type)
if self.only_signal:
signal_types = [self.only_signal]
if self.inline:
signal_types = [
s for s in signal_types if issubclass(s, (TextHasher, MatchesStr))
]
else:
signal_types = [s for s in signal_types if issubclass(s, FileHasher)]
logging.info(
"Signal types that apply: %s",
", ".join(s.get_name() for s in signal_types) or "None!",
)
matchers = []
for s_type in signal_types:
index = settings.index_store.load_index(s_type)
if index is None:
logging.info("No index for %s, skipping", s_type.get_name())
continue
query = None
if self.inline:
if issubclass(s_type, TextHasher):
query = lambda t: index.query(s_type.hash_from_str(t)) # type: ignore
elif issubclass(s_type, MatchesStr):
query = lambda t: index.query(t) # type: ignore
else:
query = lambda f: index.query(s_type.hash_from_file(f)) # type: ignore
if query:
matchers.append((s_type, query))
if not matchers:
self.stderr("No data to match against")
return
for inp in self.input_generator:
seen = set()
for s_type, matcher in matchers:
results: t.List[IndexMatch] = matcher(inp)
for r in results:
metadatas: t.List[t.Tuple[str, FetchedSignalMetadata]] = r.metadata
for collab, fetched_data in metadatas:
if collab in seen:
continue
seen.add(collab)
print(s_type.get_name(), f"- ({collab})", fetched_data)