def query()

in api-reference-examples/python/pytx/scripts/get_data.py [0:0]


def query(args):

    """
    Query the ThreatExchange API at the specified endpoint.
    """

    # maximum number of indicators to fetch
    result_limit = 1000

    # write results to this stream
    output_stream = '/dev/stdout' if not args.output else args.output

    for day in range(args.days_back):

        # format date parameters for HTTP request
        until, until_str, since, since_str = utils.get_time_params(args.end_date, day, '%d-%m-%Y')

        with open(output_stream, 'wb') as ostream:

            print('Writing to %s...' % output_stream)

            writer = csv.writer(ostream)

            if args.object == 'exchange_member':

                engine = ThreatExchangeMember

                fields = [XM.ID, XM.NAME]

                parameters = dict()

            elif args.object == 'malware_analysis':

                engine = Malware

                fields = [
                    MA.ID,
                    MA.ADDED_ON,
                    MA.CRX,
                    MA.IMPHASH,
                    MA.MD5,
                    MA.PASSWORD,
                    MA.PE_RICH_HEADER,
                    MA.SAMPLE_TYPE,
                    MA.SAMPLE_SIZE_COMPRESSED,
                    MA.SHA1,
                    MA.SHA256,
                    MA.SHARE_LEVEL,
                    MA.SSDEEP,
                    MA.STATUS,
                    MA.VICTIM_COUNT,
                    MA.XPI,
                ]

                param_fields = Malware._default_fields
                if args.full_sample:
                    param_fields += ['sample_size', 'sample']

                parameters = dict(
                    fields=param_fields,
                    limit=result_limit,
                    text=args.text,
                    strict_text=args.strict_text,
                    sample_type=args.malware_type,
                    status=args.status,
                    share_level=args.share_level,
                    since=since_str,
                    until=until_str
                )

            elif args.object == 'malware_family':

                engine = MalwareFamily

                fields = [
                    MF.ID,
                    MF.ADDED_ON,
                    MF.ALIASES,
                    MF.DESCRIPTION,
                    MF.FAMILY_TYPE,
                    MF.MALICIOUS,
                    MF.NAME,
                    MF.SAMPLE_COUNT
                ]

                parameters = dict(
                    fields=MalwareFamily._fields,
                    limit=result_limit,
                    text=args.text,
                    strict_text=args.strict_text,
                    since=since_str,
                    until=until_str
                )

            elif args.object == 'threat_descriptor':

                engine = ThreatDescriptor

                fields = [
                    TD.ID,
                    TD.ADDED_ON,
                    TD.CONFIDENCE,
                    TD.DESCRIPTION,
                    TD.EXPIRED_ON,
                    [TD.INDICATOR, TI.INDICATOR],
                    [TD.INDICATOR, TI.TYPE],
                    [TD.INDICATOR, TI.ID],
                    TD.LAST_UPDATED,
                    [TD.OWNER, XM.ID],
                    [TD.OWNER, XM.NAME],
                    [TD.OWNER, XM.EMAIL],
                    TD.PRECISION,
                    TD.RAW_INDICATOR,
                    TD.REVIEW_STATUS,
                    TD.SEVERITY,
                    TD.SHARE_LEVEL,
                    TD.STATUS
                ]

                parameters = dict(
                    fields=ThreatDescriptor._fields,
                    include_expired=args.include_expired,
                    min_confidence=args.confidence_lb,
                    max_confidence=args.confidence_ub,
                    owner=args.owner,
                    review_status=args.review_status,
                    share_level=args.share_level,
                    status=args.status,
                    limit=result_limit,
                    text=args.text,
                    strict_text=args.strict_text,
                    type_=args.indicator_type,
                    since=since_str,
                    until=until_str
                )

            elif args.object == 'threat_indicator':

                engine = ThreatIndicator

                fields = [TI.ID, TI.INDICATOR, TI.TYPE]

                parameters = dict(
                    fields=ThreatIndicator._fields,
                    limit=result_limit,
                    text=args.text,
                    strict_text=args.strict_text,
                    type_=args.indicator_type,
                    since=since_str,
                    until=until_str
                )

            objects = engine.objects(**parameters)
            headers = [utils.convert_to_header(f) for f in fields]
            writer.writerow(headers)

            for i, o in enumerate(objects):
                data = [i] + [utils.get_data_field(f, o) for f in fields]
                writer.writerow(data)