def lambda_handler()

in functions/source/job-creation/job_creation.py [0:0]


def lambda_handler(event, context):
    """
    Recieves event, by getting triggered with addition of
     files to s3://ipac-clabsi-production/source-csv/.
    Creates a input manifest file from the context of dataframe.
    Sends a request to sagemaker client for Label job creation.

    ----------
    fieldname : Event
        AWS Lambda uses this parameter to pass in event data to the handler.
        This parameter is usually of the Python dict type.
        It can also be list, str, int, float, or NoneType type.
        When you invoke your function,
        you determine the content and structure of the event.
        When an AWS service invokes your function,
        the event structure varies by service.
        For details, see Using AWS Lambda with other services.

    fieldname: context
        AWS Lambda uses this parameter to
         provide runtime information to your handler


    Returns
    -------
        Sagemaker labeling job status description
    """
    print(context)
    s3_client = boto3.client('s3')
    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'],
                                    encoding='utf-8')
    filename = os.path.basename(key)
    manifest_path = 'manifests/' + filename + '.manifest'
    mrn = filename.split(".")[0]

    try:
        # read csv file
        csv_obj = s3_client.get_object(Bucket=bucket, Key=key)
        body = csv_obj['Body']
        csv_string = body.read().decode('utf-8')

        # initialize the data dictionary
        # Grab the header and turn it into json
        dataframe = pd.read_csv(StringIO(csv_string))

        # Remove NaN from the dataframe, because json can not handle NaN
        dataframe.fillna('None', inplace=True)

        # tracking the number of previews reviews
        dataframe.drop(columns=[
            i for i in dataframe.columns if i[:3] == 'Unn'], inplace=True)
        # PR mean previously reviewed.
        # If this dataframe has been previously through the job creation
        # pipeline it will have the column PR
        # We will increase the value of PR by 1 to
        # track the number of times this case has been reviewed.

        if 'PR' in dataframe.columns:
            print('increasing pr by 1')
            dataframe.loc[:, 'PR'] += 1
        else:
            dataframe.loc[:, 'PR'] = 0
        previously_reviewed = dataframe['PR'].max()

        if 'mrn' in dataframe.columns:
            mrn_id = dataframe.mrn[0]
        if 'MRN' in dataframe.columns:
            mrn_id = dataframe.MRN[0]
        mrn_id = str(mrn_id)

        data = gen_data_dict(dataframe, bucket)

        data['csv_bucket'] = os.environ['PRODUCTION']
        data['csv_path'] = key
        data['pr'] = previously_reviewed
        data['mrn'] = mrn_id
        # Write our manifest file if going through groundstation
        s3write = boto3.resource('s3')
        s3object = s3write.Object(bucket, manifest_path)
        s3object.put(Body=(bytes(json.dumps(
            data, default=convert).encode('UTF-8'))),
            ServerSideEncryption="aws:kms")

        # Handle UI template
        complete_template_path_uri = "s3://{}/{}".format(
            os.environ['PRODUCTION'],
            os.environ['TEMPLATE_PATH'],
        )
        # Writing the patient MRN to CreationTime file,
        #  so it can be tracked, and relunched if expired.
        write_timeline(mrn_id, dataframe['PR'][0])
        if 'second_reviewer_id' in dataframe.columns:
            worker_id = dataframe['second_reviewer_id'][0]
        else:
            if 'first_reviewer_id' in dataframe.columns:
                worker_id = dataframe['first_reviewer_id'][0]
            else:
                worker_id = 'Ready_for_ICP_review'


        if dataframe['PR'][0] >= 1:
            worker_id = 'Ready-for-physician-review'

        # Job name is the name in the Ground Truth queue, it has to be unique
        job_name = 'MRN-{mrn_id}-reviewed-{review_number}-times-{creation_time}'. \
            format(**{
                    'mrn_id': mrn_id,
                    'review_number': dataframe["PR"][0],
                    'creation_time': time.strftime('%Y-%m-%d-%H-%M',
                     time.localtime(time.time()))})
        print('Job_name', job_name)

        # Task name is the job name and description on the user UI
        task_name = 'mrn:{}-- Number of previous reviews:{}--{}'\
            .format(mrn_id, dataframe["PR"][0], worker_id)
        print('Task name:', task_name)

        # Manifest file paths
        input_manifest_uri = "s3://{}/{}".format(bucket, manifest_path)
        output_manifest_uri = 's3://{}/output/{}'.format(bucket, mrn)

        # Ground Truth job request building
        human_task_config = {
            "AnnotationConsolidationConfig": {
                "AnnotationConsolidationLambdaArn":
                os.environ['POST_LABEL_ARN'],
            },
            "PreHumanTaskLambdaArn": os.environ['PRE_LABEL_ARN'],
            "MaxConcurrentTaskCount": int(os.environ[
                'MAX_CONCURRENT_TASK_COUNT']),
            # 200 texts will be sent at a time to the workteam.
            "NumberOfHumanWorkersPerDataObject": int(os.environ[
                'NUMBER_OF_HUMAN_WORKERS_PER_DATA_OBJECT']),
            # 1 workers will be enough to label each text.
            "TaskAvailabilityLifetimeInSeconds": int(os.environ[
                'TASK_AVAILABILITY_LIFE_TIME_IN_SECONDS']),
            # Your work team has 6 hours to complete all pending tasks.
            "TaskDescription": task_name,
            "TaskTimeLimitInSeconds": int(os.environ[
                'TASK_TIME_LIMIT_IN_SECONDS']),
            # Each text must be labeled within 5 minutes.
            "TaskTitle": task_name,
            "UiConfig": {
                "UiTemplateS3Uri": complete_template_path_uri,
            },
        }

        human_task_config["WorkteamArn"] = os.environ['PRIVATE_WORK_TEAM_ARN']
        # Creating the Ground truth label job request
        ground_truth_request = {
            "InputConfig": {
                "DataSource": {
                    "S3DataSource": {
                        "ManifestS3Uri": input_manifest_uri,
                    },
                },
                "DataAttributes": {
                    "ContentClassifiers": [
                        "FreeOfPersonallyIdentifiableInformation",
                        "FreeOfAdultContent",
                    ]
                },
            },
            "OutputConfig": {
                "S3OutputPath": output_manifest_uri,
            },
            "HumanTaskConfig": human_task_config,
            "LabelingJobName": job_name,
            "RoleArn": os.environ['GROUNDTRUTH_ROLE'],
            "LabelAttributeName": "category",
        }

        # Submit ground truth job
        sagemaker_client = boto3.client('sagemaker')
        sagemaker_client.create_labeling_job(**ground_truth_request)

        return sagemaker_client.describe_labeling_job(
            LabelingJobName=job_name)['LabelingJobStatus']

    except Exception as error:
        print(error)
        raise error