in tools/reddit-comment-streaming/app/stream_analyzed_comments.py [0:0]
def stream(subreddits):
"""
Start the comment stream and analyze incoming comments.
"""
project_id = "<insert-project-id-here>"
pubsub_topic = "<insert-topic-id-here>"
# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=10, # default 100
max_bytes=1024, # default 1 MiB
max_latency=1, # default 10 ms
)
pubsub_client = pubsub_v1.PublisherClient(batch_settings)
topic_path = pubsub_client.topic_path(project_id, pubsub_topic)
# prevent collect bot comments
bot_list = [
"AutoModerator", "keepthetips", "MAGIC_EYE_BOT", "Funny_Sentinel",
"Funny-Mod", "Showerthoughts_Mod", "autotldr", "art_moderator_bot",
"ApiContraption", "WSBVoteBot", "FittitBot", "Photoshopbattlesbot",
"dataisbeautiful-bot", "timestamp_bot", "remindditbot", "converter-bot",
"lntipbot"
]
while True:
try:
praw_client = praw.Reddit("bot1")
num_cmts_collected = 0
cmts_processed = 0
cmt_stream = praw_client.subreddit(subreddits)
for cmt in cmt_stream.stream.comments():
# throttle to avoid 429 error
sleep(0.5)
# empty check
if cmt:
cmtbody = cmt.body
author = cmt.author
if author not in bot_list:
if len(cmtbody) > 0 and len(cmtbody) < 5000:
#censor check
if profanity.contains_profanity(str(cmtbody)):
is_censored = 1
else:
is_censored = 0
# remove emojis
cleaned_cmt = remove_emoji(str(cmtbody))
date_fmt = "%Y-%m-%d %H:%M:%S"
# comment date
cmt_date = str(
datetime.utcfromtimestamp(
cmt.created_utc).strftime(date_fmt))
# compartmentalize and localize date for
# easier searching
local_dt = utc_to_local(
datetime.strptime(cmt_date, date_fmt))
cmt_timestamp = local_dt.strftime(date_fmt)
# comment sentiment and subjectivity
sentiment = get_comment_sentiment(cleaned_cmt)
pattern_polarity = round(sentiment.polarity, 4)
pattern_subjectivity = round(
sentiment.subjectivity, 4)
is_positive = 0
is_neutral = 0
is_negative = 0
if pattern_polarity > 0.3:
is_positive = 1
elif 0.3 >= pattern_polarity >= -0.3:
is_neutral = 1
else:
is_negative = 1
is_subjective = 0
if pattern_subjectivity > 0.7:
is_subjective = 1
# Readability statistics
cmt_read_score = ts.flesch_reading_ease(cleaned_cmt)
cmt_read_ease = ""
if cmt_read_score >= 80:
cmt_read_ease = "easy"
elif 80 > cmt_read_score > 50:
cmt_read_ease = "standard"
else:
cmt_read_ease = "difficult"
cmt_reading_grade_level = ts.text_standard(
cleaned_cmt, float_output=False)
# censor and lower
censored_cmt = profanity.censor(cleaned_cmt).lower()
cmtjson = {
"comment_id": str(cmt),
"subreddit": str(cmt.subreddit),
"author": str(cmt.author),
"comment_text": censored_cmt,
"distinguished": cmt.distinguished,
"submitter": cmt.is_submitter,
"total_words": len(cleaned_cmt.split()),
"reading_ease_score": cmt_read_score,
"reading_ease": cmt_read_ease,
"reading_grade_level": cmt_reading_grade_level,
"sentiment_score": pattern_polarity,
"censored": is_censored,
"positive": is_positive,
"neutral": is_neutral,
"negative": is_negative,
"subjectivity_score": pattern_subjectivity,
"subjective": is_subjective,
"url": "https://reddit.com" + cmt.permalink,
"comment_date": cmt_date,
"comment_timestamp": cmt_timestamp,
"comment_hour": local_dt.hour,
"comment_year": local_dt.year,
"comment_month": local_dt.month,
"comment_day": local_dt.day
}
cmts_processed = cmts_processed + 1
num_cmts_collected = num_cmts_collected + 1
print(num_cmts_collected)
push_payload(pubsub_client, topic_path, cmtjson)
except Exception as err:
error_msg = " An error has occured in the comment stream:" + str(
err)
print(error_msg)
# If too many requests error, we need to wait longer for throttle
# to end. otherwise start back up right away.
error_code = "".join(filter(str.isdigit, str(err)))
http_response = int(error_code)
if http_response == 429:
error_msg = error_msg + " - Too many requests. Waiting 2 hrs."
sleep(7200)
else:
error_msg = error_msg + " - Restarting stream now."