def process_twitter()

in pelican/plugins/asfdata.py [0:0]


def process_twitter(handle, count, debug):
    if debug:
        print(f'-----\ntwitter feed: {handle}')
    bearer_token = twitter_auth()
    if not bearer_token:
        print('WARN: no bearer token for Twitter')
        return sequence_list('twitter',[{
            'text': 'To retrieve tweets supply a valid twitter bearer token in ~/.authtokens'
        }])
    # do not print or display bearer_token as it is a secret
    query = f'from:{handle}'
    tweet_fields = 'tweet.fields=author_id'
    url = f'https://api.twitter.com/2/tweets/search/recent?query={query}&{tweet_fields}'
    headers = {'Authorization': f'Bearer {bearer_token}'}
    try:
        load = connect_to_endpoint(url, headers)
    except Exception as e:
        print(f'ERROR: Cannot connect to Twitter for {handle}: {e}')
        return sequence_list('twitter',[{ 'text': 'Cannot connect to Twitter at present' }])
    result_count = load['meta']['result_count']
    if result_count == 0:
        print(f'WARN: No recent tweets for {handle}')
        return sequence_list('twitter',[{ 'text': 'No recent tweets found' }])
    if 'data' not in load:
        print('WARN: "data" not in Twitter response')
        print(load) # DEBUG; should not happen if result_count > 0
        return sequence_list('twitter',[{
            'text': 'Unable to extract Twitter data'
        }])
    reference = sequence_list('twitter', load['data'])
    if result_count < count:
        v = reference
    else:
        v = reference[:count]
    return v