def stream_connect()

in cloud9/stream.py [0:0]


def stream_connect(headers, set, bearer_token):
    """
    Connects and starts the stream, processes data via functions (e.g. locate users) and loads processed Tweet data
    to DynamoDB table.
    
    Parameters:
    stream_url (str): Twitter-provided URl for the filtered stream v1.
    """
    response = requests.get("https://api.twitter.com/2/tweets/search/stream?tweet.fields=created_at,author_id", headers=headers, stream=True)
    encoding_count=0
    try:
        for response_line in response.iter_lines():
                if response_line:
                    try:
                        data = json.loads(response_line)
                        content = {}
                        content['id'] = int(data['data']['id'])
                        content['text'] = data['data']['text']
                        content['timestamp'] = str(data['data']['created_at']).replace('T', ' ').replace('Z', '')
                        content['user_id'] = data['data']['author_id']
                      
                        # Drug mentioned by Tweet
                        content['drug_mentions'] = ', '.join(drug_types(content['text']))
                        
                        # Add location info
                        content['user_city'] = ""
                        content['user_state'] = ""
                        content['user_location'] = ""
                              
                        # Filter out retweets and non-English Tweets
                        if not str(content['text']).startswith('RT ') and detect(content['text']) == 'en':
                            location_list = locate(str(content['user_id'])) # List format: ['city OR state OR other', 'city_name OR state_name OR other_name']
                            if location_list[0] is not None and location_list[0] == 'city':
                                city = location_list[1]
                                content['user_city'] = location_list[1]
                                # States often abbreviated if there is city, we must find state by city, not abbreviation
                                if city is not None and city in city_to_state_dict:
                                    content['user_state'] = city_to_state_dict[city]
                                print("CITY: " + str(content['user_city']))
                            elif location_list[0] is not None and location_list[0] == 'state':
                                content['user_state'] = location_list[1]
                                print("STATE: " + str(content['user_state']))
                            elif location_list[0] is not None and location_list[0] == 'other':
                                content['user_location'] = location_list[1]
                                print("LOCATION: " + str(content['user_location']))
                            
                        
                            table.put_item(Item=content)
                        print(content)
                        print(json.dumps(data, indent=4, sort_keys=True))
                      
                    except Exception as e:
                        print(str(e))
    except requests.exceptions.ChunkedEncodingError:
        encoding_count += 1
        print(encoding_count)