in api-reference-examples/python/pytx/scripts/get_data.py [0:0]
def query(args):
"""
Query the ThreatExchange API at the specified endpoint.
"""
# maximum number of indicators to fetch
result_limit = 1000
# write results to this stream
output_stream = '/dev/stdout' if not args.output else args.output
for day in range(args.days_back):
# format date parameters for HTTP request
until, until_str, since, since_str = utils.get_time_params(args.end_date, day, '%d-%m-%Y')
with open(output_stream, 'wb') as ostream:
print('Writing to %s...' % output_stream)
writer = csv.writer(ostream)
if args.object == 'exchange_member':
engine = ThreatExchangeMember
fields = [XM.ID, XM.NAME]
parameters = dict()
elif args.object == 'malware_analysis':
engine = Malware
fields = [
MA.ID,
MA.ADDED_ON,
MA.CRX,
MA.IMPHASH,
MA.MD5,
MA.PASSWORD,
MA.PE_RICH_HEADER,
MA.SAMPLE_TYPE,
MA.SAMPLE_SIZE_COMPRESSED,
MA.SHA1,
MA.SHA256,
MA.SHARE_LEVEL,
MA.SSDEEP,
MA.STATUS,
MA.VICTIM_COUNT,
MA.XPI,
]
param_fields = Malware._default_fields
if args.full_sample:
param_fields += ['sample_size', 'sample']
parameters = dict(
fields=param_fields,
limit=result_limit,
text=args.text,
strict_text=args.strict_text,
sample_type=args.malware_type,
status=args.status,
share_level=args.share_level,
since=since_str,
until=until_str
)
elif args.object == 'malware_family':
engine = MalwareFamily
fields = [
MF.ID,
MF.ADDED_ON,
MF.ALIASES,
MF.DESCRIPTION,
MF.FAMILY_TYPE,
MF.MALICIOUS,
MF.NAME,
MF.SAMPLE_COUNT
]
parameters = dict(
fields=MalwareFamily._fields,
limit=result_limit,
text=args.text,
strict_text=args.strict_text,
since=since_str,
until=until_str
)
elif args.object == 'threat_descriptor':
engine = ThreatDescriptor
fields = [
TD.ID,
TD.ADDED_ON,
TD.CONFIDENCE,
TD.DESCRIPTION,
TD.EXPIRED_ON,
[TD.INDICATOR, TI.INDICATOR],
[TD.INDICATOR, TI.TYPE],
[TD.INDICATOR, TI.ID],
TD.LAST_UPDATED,
[TD.OWNER, XM.ID],
[TD.OWNER, XM.NAME],
[TD.OWNER, XM.EMAIL],
TD.PRECISION,
TD.RAW_INDICATOR,
TD.REVIEW_STATUS,
TD.SEVERITY,
TD.SHARE_LEVEL,
TD.STATUS
]
parameters = dict(
fields=ThreatDescriptor._fields,
include_expired=args.include_expired,
min_confidence=args.confidence_lb,
max_confidence=args.confidence_ub,
owner=args.owner,
review_status=args.review_status,
share_level=args.share_level,
status=args.status,
limit=result_limit,
text=args.text,
strict_text=args.strict_text,
type_=args.indicator_type,
since=since_str,
until=until_str
)
elif args.object == 'threat_indicator':
engine = ThreatIndicator
fields = [TI.ID, TI.INDICATOR, TI.TYPE]
parameters = dict(
fields=ThreatIndicator._fields,
limit=result_limit,
text=args.text,
strict_text=args.strict_text,
type_=args.indicator_type,
since=since_str,
until=until_str
)
objects = engine.objects(**parameters)
headers = [utils.convert_to_header(f) for f in fields]
writer.writerow(headers)
for i, o in enumerate(objects):
data = [i] + [utils.get_data_field(f, o) for f in fields]
writer.writerow(data)