def lambda_handler()

in sdlf-utils/pipeline-examples/topic-modelling/stageB/lambda/stage-b-compile-data/src/lambda_function.py [0:0]


def lambda_handler(event, context):
    """Compile Data to a CSV with Topic Model Output

    Arguments:
        event {dict} -- Dictionary with details on Bucket and Keys
        context {dict} -- Dictionary with details on Lambda context

    Returns:
        {dict} -- Dictionary with Processed Bucket and Keys Path
    """
    try:
        # Get Information about the Step Function 
        logger.info('Fetching event data from previous step')
        team = event['body']['team']
        stage = event['body']['pipeline_stage']
        dataset = event['body']['dataset']
        bucket = event['body']['bucket']
        
        # Start Connection to Octagon Client
        logger.info('Initializing Octagon client')
        component = context.function_name.split('-')[-2].title()
        octagon_client = (
            octagon.OctagonClient()
            .with_run_lambda(True)
            .with_configuration_instance(event['body']['env'])
            .build()
        )
        peh.PipelineExecutionHistoryAPI(octagon_client).retrieve_pipeline_execution(
            event['body']['job']['peh_id'])
        
        logger.info('Starting to Compile Results')
        
        
        # Here we will get associate topics and add to our existing metadata
        # for each of the abstract text files  we have in the pre-stage bucket:
        
        # Get the s3 location of the zipped topic model output
        key = "post-stage/{}/{}/".format(team, dataset)
        my_bucket = client.list_objects_v2(Bucket = bucket, Prefix = key)
        for objects in my_bucket["Contents"]:
            if ".tar.gz" in objects["Key"]:
                key = (objects["Key"])

        # Extract the Topic Model Data from the zipped file
        s3_object = client.get_object(Bucket=bucket, Key=key)
        wholefile = s3_object['Body'].read()
        fileobj = io.BytesIO(wholefile)
        tarf = tarfile.open(fileobj=fileobj)
        csv_files = [f.name for f in tarf.getmembers() if f.name.endswith('.csv')]
        
        # Read in both the Doc-Topics and Topic-Terms csv files using Pandas DataFrames
        #   doc-topics.csv (The topics for each abstract document)
        #   topic-terms.csv (The terms associated to each topic - up to 10 terms)
        for i in csv_files:
            if "doc-topics" in i:
                csv_contents = tarf.extractfile(i).read()
                doc_topics = pd.read_csv(io.BytesIO(csv_contents), encoding='utf8')
            
            if "topic-terms" in i:
                csv_contents1 = tarf.extractfile(i).read()
                topic_terms = pd.read_csv(io.BytesIO(csv_contents1), encoding='utf8')
              
              
        # Group All of the Topics as a List for Each Abstract Docname 
        doc_topics_grouped = doc_topics.groupby("docname")["topic"].apply(list).reset_index(name='topic_list')
        
        # Group All of the Terms Associated to each of the Topics Found
        topic_terms_grouped = topic_terms.groupby("topic")["term"].apply(list).reset_index(name='term_list')
        
        
        # For Each Abstract We Will Add a Column with the Associated Topic Terms (i.e. 'term_list')
        main_list = []
        for index, row in doc_topics_grouped.iterrows():
            labels = []
            for topic in row[1]:
                l = topic_terms_grouped.loc[topic][1]
                labels.extend(l)
            main_list.append(labels)
        doc_topics_grouped['term_list'] = main_list


        # Now Lets Pull All the PreStage Metadata we Have for Each Abstract Document:
        
        # List csv Files in the Pre-stage Bucket
        key = "pre-stage/{}/{}/medical_data".format(team, dataset)
        response = client.list_objects_v2(Bucket = bucket, Prefix = key)

        # Combine All the Metadata into one Large Pandas DataFrame
        count = 0
        for contents in response['Contents']:
            if contents['Size'] > 0:
                if count < 1:
                    obj = client.get_object(Bucket = bucket, Key = contents["Key"])
                    metadata = pd.read_csv(io.BytesIO(obj['Body'].read()), encoding='utf8')
                else:
                    obj = client.get_object(Bucket = bucket, Key = contents["Key"])
                    df = pd.read_csv(io.BytesIO(obj['Body'].read()), encoding='utf8')
                    metadata = metadata.append(df, ignore_index = True)
                count = count + 1
            
        
        # IMPORTANT: Now we can merge the Topics and Terms
        # we found for each document with the existing Metadata
        doc_topics_final = pd.merge(metadata,doc_topics_grouped,on = 'docname')
        
        
        # We will also create a training data csv (including topics and text only) so new documents 
        # can be associated to one of these topics using  Multi-Label Classification:
        label_list = []
        for index, row in doc_topics_final.iterrows():
            if len(doc_topics_final["topic_list"][index])>1:
                listToStr = '|'.join([str(elem) for elem in doc_topics_final["topic_list"][index]])
                label_list.append(listToStr)
            else:
                label_list.append(str(doc_topics_final["topic_list"][index][0]))
        
        # Create Training Data DataFrame from the two columns
        training_data = pd.DataFrame(list(zip(label_list, doc_topics_final["abstract"])), columns =['Labels', 'Abstracts'])

        # Get KMS Key to Encrypt Data
        kms_key = KMSConfiguration(team).get_kms_arn
        
        # Write Our DataFrames with Output to S3 Post-Stage:
        # Write Training data to s3 Post-Stage Bucket
        output_path = "training_data.csv"
        s3_path_key = "post-stage/{}/{}/multilabel_classification/{}".format(team, dataset,output_path)
        training_data.to_csv('/tmp/' + output_path,index = False, header = False)
        s3_interface.upload_object('/tmp/' + output_path, bucket, s3_path_key, kms_key=kms_key)
        
        # Write Final df to s3 Post-Stage Bucket
        output_path = "compile_topics_data.csv"
        s3_path_key = "post-stage/{}/{}/{}".format(team, dataset, output_path)
        doc_topics_final.to_csv('/tmp/' + output_path)
        s3_interface.upload_object('/tmp/' + output_path, bucket, s3_path_key, kms_key=kms_key)

        # Write doc_topics df to s3 Post-Stage Bucket
        output_path = "doc_topics.csv"
        s3_path_key = "post-stage/{}/{}/topic_data/{}".format(team, dataset, output_path)
        doc_topics.to_csv('/tmp/' + output_path)
        s3_interface.upload_object('/tmp/' + output_path, bucket, s3_path_key, kms_key=kms_key)
    
        # Write topic_terms df to s3 Post-Stage Bucket
        output_path = "topic_terms.csv"
        s3_path_key = "post-stage/{}/{}/topic_data/{}".format(team, dataset, output_path)
        topic_terms.to_csv('/tmp/' + output_path)
        s3_interface.upload_object('/tmp/' + output_path, bucket, s3_path_key, kms_key=kms_key)
        
        
        # Update Pipeline Execution in Octagon
        octagon_client.update_pipeline_execution(
            status="{} {} Processing".format(stage, component), component=component)
    except Exception as e:
        logger.error("Fatal error", exc_info=True)
        octagon_client.end_pipeline_execution_failed(component=component,
                                                     issue_comment="{} {} Error: {}".format(stage, component, repr(e)))
        raise e
    return 200