in source/lambda/wf_publish_topic_model/util/topic.py [0:0]
def parse_csv_for_mapping(platform, job_id, timestamp, topic_content_dict):
# initiatlize array to store topic number and id mapping
topic_id_mapping = []
# now query individual buckets and read specific lines to retrieve id
for key_name in topic_content_dict:
obj = s3.get_object(Bucket=os.environ["RAW_DATA_FEED"], Key=f"{platform}/{key_name}")
# splitting byte code lines into an array. This saves decode call and is invoked later only if the line is read
raw_feed_array = obj["Body"].read().split(b"\n")
logger.debug(f"Processing key name: {key_name}")
for record in topic_content_dict[key_name]:
logger.debug(f"Processing record: {record}")
line_number = int(record["docname"].split(":")[1])
# calling decode here as we are now reading the line
id_str = raw_feed_array[line_number - 1].decode("utf-8").split(",")[0]
if len(id_str) > 0:
mapping_record = {}
mapping_record["platform"] = platform
mapping_record["id_str"] = id_str
mapping_record["job_id"] = job_id
mapping_record["job_timestamp"] = timestamp
mapping_record["topic"] = record["topic"]
topic_id_mapping.append(mapping_record)
if os.environ.get("LOG_LEVEL") == "DEBUG":
for record in topic_id_mapping:
logger.debug("Returning Topic ID mapping " + json.dumps(record))
return topic_id_mapping