in source/lambda/ingestion-youtube/util/comment.py [0:0]
def search_comments(event):
logger.debug(f"Query handler received event: {json.dumps(event)}")
youtube = get_youtube_service_resource()
record = event["detail"]
search_query = record["SearchQuery"]
video_id = record["VideoId"]
title = record["Title"]
comment_search_params = {
"part": "snippet, replies",
"videoId": video_id,
"maxResults": 100,
"order": "time",
"textFormat": "plainText",
}
# check for tracker and decide if needs to be published
tracker = ddb_helper.get_query_timestamp(video_id)
logger.debug(f"Tracker for VideoId: {video_id} is {tracker}")
tracker_date = datetime.fromisoformat(tracker["LAST_QUERIED_TIMESTAMP"]) if tracker else None
while True:
request = youtube.commentThreads().list(**comment_search_params)
try:
youtube_response = request.execute()
logger.debug(f"Threads, youtube comments {json.dumps(youtube_response)}")
record_published = process_service_response(youtube_response, search_query, tracker_date, title)
next_page_token = youtube_response.get("nextPageToken", None)
logger.debug(f"Next page token is {next_page_token}")
# This condition optimizes comment thread list, since it seems that the API is returning the most recent ones first
# this would avoid additional additional iterations to the call if the comment was already ingested based on the
# tracker date and comments updatedAt timestamp
if next_page_token and record_published:
comment_search_params["pageToken"] = next_page_token
else:
# update tracker, since loop is over break
ddb_helper.update_query_timestamp(video_id)
break
except googleapiclient.errors.HttpError as error:
logger.error(
f"Error occurred when calling list comments for params: {json.dumps(comment_search_params)} and error is {error}"
)
break