def parse_csv_for_mapping()

in source/lambda/wf_publish_topic_model/util/topic.py [0:0]


def parse_csv_for_mapping(platform, job_id, timestamp, topic_content_dict):
    # initiatlize array to store topic number and id mapping
    topic_id_mapping = []

    # now query individual buckets and read specific lines to retrieve id
    for key_name in topic_content_dict:
        obj = s3.get_object(Bucket=os.environ["RAW_DATA_FEED"], Key=f"{platform}/{key_name}")
        # splitting byte code lines into an array. This saves decode call and is invoked later only if the line is read
        raw_feed_array = obj["Body"].read().split(b"\n")

        logger.debug(f"Processing key name: {key_name}")
        for record in topic_content_dict[key_name]:
            logger.debug(f"Processing record: {record}")
            line_number = int(record["docname"].split(":")[1])
            # calling decode here as we are now reading the line
            id_str = raw_feed_array[line_number - 1].decode("utf-8").split(",")[0]
            if len(id_str) > 0:
                mapping_record = {}
                mapping_record["platform"] = platform
                mapping_record["id_str"] = id_str
                mapping_record["job_id"] = job_id
                mapping_record["job_timestamp"] = timestamp
                mapping_record["topic"] = record["topic"]
                topic_id_mapping.append(mapping_record)

    if os.environ.get("LOG_LEVEL") == "DEBUG":
        for record in topic_id_mapping:
            logger.debug("Returning Topic ID mapping " + json.dumps(record))
    return topic_id_mapping