in cloud9/stream.py [0:0]
def stream_connect(headers, set, bearer_token):
"""
Connects and starts the stream, processes data via functions (e.g. locate users) and loads processed Tweet data
to DynamoDB table.
Parameters:
stream_url (str): Twitter-provided URl for the filtered stream v1.
"""
response = requests.get("https://api.twitter.com/2/tweets/search/stream?tweet.fields=created_at,author_id", headers=headers, stream=True)
encoding_count=0
try:
for response_line in response.iter_lines():
if response_line:
try:
data = json.loads(response_line)
content = {}
content['id'] = int(data['data']['id'])
content['text'] = data['data']['text']
content['timestamp'] = str(data['data']['created_at']).replace('T', ' ').replace('Z', '')
content['user_id'] = data['data']['author_id']
# Drug mentioned by Tweet
content['drug_mentions'] = ', '.join(drug_types(content['text']))
# Add location info
content['user_city'] = ""
content['user_state'] = ""
content['user_location'] = ""
# Filter out retweets and non-English Tweets
if not str(content['text']).startswith('RT ') and detect(content['text']) == 'en':
location_list = locate(str(content['user_id'])) # List format: ['city OR state OR other', 'city_name OR state_name OR other_name']
if location_list[0] is not None and location_list[0] == 'city':
city = location_list[1]
content['user_city'] = location_list[1]
# States often abbreviated if there is city, we must find state by city, not abbreviation
if city is not None and city in city_to_state_dict:
content['user_state'] = city_to_state_dict[city]
print("CITY: " + str(content['user_city']))
elif location_list[0] is not None and location_list[0] == 'state':
content['user_state'] = location_list[1]
print("STATE: " + str(content['user_state']))
elif location_list[0] is not None and location_list[0] == 'other':
content['user_location'] = location_list[1]
print("LOCATION: " + str(content['user_location']))
table.put_item(Item=content)
print(content)
print(json.dumps(data, indent=4, sort_keys=True))
except Exception as e:
print(str(e))
except requests.exceptions.ChunkedEncodingError:
encoding_count += 1
print(encoding_count)