api-reference-examples/python/pytx/scripts/get_data.py (182 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
This script demonstrates using pytx to query the ThreatExchange API.
Key parameters:
-o (--object) type of object to query; see main()
-O (--output) output stream (default /dev/stdout)
If no value is supplied for --object:
- The script fetches each type of object
- It saves results in the working directory in files named [object_type].csv
Sample usage:
python scripts/get_data.py -o exchange_member -O /dev/stdout
python scripts/get_data.py -o threat_descriptor -O /dev/stdout -t smarturl
"""
import argparse
import csv
from datetime import datetime
from pytx import (
Malware,
MalwareFamily,
ThreatDescriptor,
ThreatExchangeMember,
ThreatIndicator,
utils
)
from pytx.vocabulary import (
Malware as MA,
MalwareFamilies as MF,
ThreatDescriptor as TD,
ThreatExchangeMember as XM,
ThreatIndicator as TI
)
def main():
args = parse_arguments()
if args.object is None:
args.object = 'exchange_member'
args.output = 'exchange_members.csv'
query(args)
args.object = 'malware_analysis'
args.output = 'malware_analyses.csv'
query(args)
args.object = 'malware_family'
args.output = 'malware_families.csv'
query(args)
args.object = 'threat_descriptor'
args.output = 'threat_descriptors.csv'
query(args)
args.object = 'threat_indicator'
args.output = 'threat_indicators.csv'
query(args)
else:
query(args)
def query(args):
"""
Query the ThreatExchange API at the specified endpoint.
"""
# maximum number of indicators to fetch
result_limit = 1000
# write results to this stream
output_stream = '/dev/stdout' if not args.output else args.output
for day in range(args.days_back):
# format date parameters for HTTP request
until, until_str, since, since_str = utils.get_time_params(args.end_date, day, '%d-%m-%Y')
with open(output_stream, 'wb') as ostream:
print('Writing to %s...' % output_stream)
writer = csv.writer(ostream)
if args.object == 'exchange_member':
engine = ThreatExchangeMember
fields = [XM.ID, XM.NAME]
parameters = dict()
elif args.object == 'malware_analysis':
engine = Malware
fields = [
MA.ID,
MA.ADDED_ON,
MA.CRX,
MA.IMPHASH,
MA.MD5,
MA.PASSWORD,
MA.PE_RICH_HEADER,
MA.SAMPLE_TYPE,
MA.SAMPLE_SIZE_COMPRESSED,
MA.SHA1,
MA.SHA256,
MA.SHARE_LEVEL,
MA.SSDEEP,
MA.STATUS,
MA.VICTIM_COUNT,
MA.XPI,
]
param_fields = Malware._default_fields
if args.full_sample:
param_fields += ['sample_size', 'sample']
parameters = dict(
fields=param_fields,
limit=result_limit,
text=args.text,
strict_text=args.strict_text,
sample_type=args.malware_type,
status=args.status,
share_level=args.share_level,
since=since_str,
until=until_str
)
elif args.object == 'malware_family':
engine = MalwareFamily
fields = [
MF.ID,
MF.ADDED_ON,
MF.ALIASES,
MF.DESCRIPTION,
MF.FAMILY_TYPE,
MF.MALICIOUS,
MF.NAME,
MF.SAMPLE_COUNT
]
parameters = dict(
fields=MalwareFamily._fields,
limit=result_limit,
text=args.text,
strict_text=args.strict_text,
since=since_str,
until=until_str
)
elif args.object == 'threat_descriptor':
engine = ThreatDescriptor
fields = [
TD.ID,
TD.ADDED_ON,
TD.CONFIDENCE,
TD.DESCRIPTION,
TD.EXPIRED_ON,
[TD.INDICATOR, TI.INDICATOR],
[TD.INDICATOR, TI.TYPE],
[TD.INDICATOR, TI.ID],
TD.LAST_UPDATED,
[TD.OWNER, XM.ID],
[TD.OWNER, XM.NAME],
[TD.OWNER, XM.EMAIL],
TD.PRECISION,
TD.RAW_INDICATOR,
TD.REVIEW_STATUS,
TD.SEVERITY,
TD.SHARE_LEVEL,
TD.STATUS
]
parameters = dict(
fields=ThreatDescriptor._fields,
include_expired=args.include_expired,
min_confidence=args.confidence_lb,
max_confidence=args.confidence_ub,
owner=args.owner,
review_status=args.review_status,
share_level=args.share_level,
status=args.status,
limit=result_limit,
text=args.text,
strict_text=args.strict_text,
type_=args.indicator_type,
since=since_str,
until=until_str
)
elif args.object == 'threat_indicator':
engine = ThreatIndicator
fields = [TI.ID, TI.INDICATOR, TI.TYPE]
parameters = dict(
fields=ThreatIndicator._fields,
limit=result_limit,
text=args.text,
strict_text=args.strict_text,
type_=args.indicator_type,
since=since_str,
until=until_str
)
objects = engine.objects(**parameters)
headers = [utils.convert_to_header(f) for f in fields]
writer.writerow(headers)
for i, o in enumerate(objects):
data = [i] + [utils.get_data_field(f, o) for f in fields]
writer.writerow(data)
def parse_arguments():
parser = argparse.ArgumentParser()
add = parser.add_argument
add('-d', '--days_back', help='Number of days to look back', type=int, default=1)
add('-e', '--end_date', help='Date upper bound (inclusive) (UTC)', type=str, default=str(datetime.utcnow()))
add('-f', '--full_sample', help='Full sample', action='store_true')
add('-i', '--indicator_type', help='Threat indicator type')
add('-L', '--confidence_lb', help='Confidence lower bound', type=int)
add('-l', '--share_level', help='Share level')
add('-m', '--malware_type', help='Malware sample type')
add('-O', '--output', help='Output stream')
add('-o', '--object', help='Object type')
add('-r', '--review_status', help='Review status')
add('-s', '--status', help='Status')
add('-T', '--strict_text', help='Strict text query (no wildcards)', action='store_true')
add('-t', '--text', help='Text query')
add('-U', '--confidence_ub', help='Confidence upper bound', type=int)
add('-w', '--owner', help='Comma-separated list of AppIDs')
add('-x', '--include_expired', help='Include expired data', action='store_true')
return parser.parse_args()
if __name__ == '__main__':
main()