def stream()

in tools/reddit-comment-streaming/app/stream_analyzed_comments.py [0:0]


def stream(subreddits):
    """
    Start the comment stream and analyze incoming comments.
    """
    project_id = "<insert-project-id-here>"
    pubsub_topic = "<insert-topic-id-here>"

    # Configure the batch to publish as soon as there are 10 messages
    # or 1 KiB of data, or 1 second has passed.

    batch_settings = pubsub_v1.types.BatchSettings(
        max_messages=10,  # default 100
        max_bytes=1024,  # default 1 MiB
        max_latency=1,  # default 10 ms
    )
    pubsub_client = pubsub_v1.PublisherClient(batch_settings)
    topic_path = pubsub_client.topic_path(project_id, pubsub_topic)

    # prevent collect bot comments
    bot_list = [
        "AutoModerator", "keepthetips", "MAGIC_EYE_BOT", "Funny_Sentinel",
        "Funny-Mod", "Showerthoughts_Mod", "autotldr", "art_moderator_bot",
        "ApiContraption", "WSBVoteBot", "FittitBot", "Photoshopbattlesbot",
        "dataisbeautiful-bot", "timestamp_bot", "remindditbot", "converter-bot",
        "lntipbot"
    ]

    while True:

        try:
            praw_client = praw.Reddit("bot1")
            num_cmts_collected = 0
            cmts_processed = 0

            cmt_stream = praw_client.subreddit(subreddits)

            for cmt in cmt_stream.stream.comments():

                # throttle to avoid 429 error
                sleep(0.5)

                # empty check
                if cmt:
                    cmtbody = cmt.body
                    author = cmt.author

                    if author not in bot_list:

                        if len(cmtbody) > 0 and len(cmtbody) < 5000:

                            #censor check
                            if profanity.contains_profanity(str(cmtbody)):
                                is_censored = 1
                            else:
                                is_censored = 0

                            # remove emojis
                            cleaned_cmt = remove_emoji(str(cmtbody))

                            date_fmt = "%Y-%m-%d %H:%M:%S"
                            # comment date
                            cmt_date = str(
                                datetime.utcfromtimestamp(
                                    cmt.created_utc).strftime(date_fmt))

                            # compartmentalize and localize date for
                            # easier searching
                            local_dt = utc_to_local(
                                datetime.strptime(cmt_date, date_fmt))
                            cmt_timestamp = local_dt.strftime(date_fmt)

                            # comment sentiment and subjectivity
                            sentiment = get_comment_sentiment(cleaned_cmt)
                            pattern_polarity = round(sentiment.polarity, 4)
                            pattern_subjectivity = round(
                                sentiment.subjectivity, 4)

                            is_positive = 0
                            is_neutral = 0
                            is_negative = 0

                            if pattern_polarity > 0.3:
                                is_positive = 1
                            elif 0.3 >= pattern_polarity >= -0.3:
                                is_neutral = 1
                            else:
                                is_negative = 1

                            is_subjective = 0
                            if pattern_subjectivity > 0.7:
                                is_subjective = 1

                            # Readability statistics
                            cmt_read_score = ts.flesch_reading_ease(cleaned_cmt)
                            cmt_read_ease = ""
                            if cmt_read_score >= 80:
                                cmt_read_ease = "easy"
                            elif 80 > cmt_read_score > 50:
                                cmt_read_ease = "standard"
                            else:
                                cmt_read_ease = "difficult"

                            cmt_reading_grade_level = ts.text_standard(
                                cleaned_cmt, float_output=False)

                            # censor and lower
                            censored_cmt = profanity.censor(cleaned_cmt).lower()

                            cmtjson = {
                                "comment_id": str(cmt),
                                "subreddit": str(cmt.subreddit),
                                "author": str(cmt.author),
                                "comment_text": censored_cmt,
                                "distinguished": cmt.distinguished,
                                "submitter": cmt.is_submitter,
                                "total_words": len(cleaned_cmt.split()),
                                "reading_ease_score": cmt_read_score,
                                "reading_ease": cmt_read_ease,
                                "reading_grade_level": cmt_reading_grade_level,
                                "sentiment_score": pattern_polarity,
                                "censored": is_censored,
                                "positive": is_positive,
                                "neutral": is_neutral,
                                "negative": is_negative,
                                "subjectivity_score": pattern_subjectivity,
                                "subjective": is_subjective,
                                "url": "https://reddit.com" + cmt.permalink,
                                "comment_date": cmt_date,
                                "comment_timestamp": cmt_timestamp,
                                "comment_hour": local_dt.hour,
                                "comment_year": local_dt.year,
                                "comment_month": local_dt.month,
                                "comment_day": local_dt.day
                            }

                            cmts_processed = cmts_processed + 1
                            num_cmts_collected = num_cmts_collected + 1
                            print(num_cmts_collected)
                            push_payload(pubsub_client, topic_path, cmtjson)

        except Exception as err:
            error_msg = " An error has occured in the comment stream:" + str(
                err)
            print(error_msg)

            # If too many requests error, we need to wait longer for throttle
            # to end. otherwise start back up right away.
            error_code = "".join(filter(str.isdigit, str(err)))
            http_response = int(error_code)
            if http_response == 429:
                error_msg = error_msg + " - Too many requests. Waiting 2 hrs."
                sleep(7200)
            else:
                error_msg = error_msg + " - Restarting stream now."