def submitTranscribeJob()

in pca-server/src/pca/pca-aws-sf-start-transcribe-job.py [0:0]


def submitTranscribeJob(bucket, key, lang_code):
    '''
    Submits the supplied audio file to Transcribe, using the suppplied language code.  The method will decide
    whether to call the standard Transcribe APIs or the Call Analytics APIs

    @param bucket: Bucket holding the audio file to be tested
    @param key: Key for the audio file in the bucket
    @param lang_code: Configured language code for the audio file
    @return: Name of the transcription job, and the Transcript API mode
    '''

    # Work out our API mode for Transcribe, and get our boto3 client
    transcribe = boto3.client('transcribe')
    api_mode, channel_ident = evaluate_transcribe_mode(bucket, key)

    # Generate job-name - delete if it already exists
    job_name = cf.generateJobName(key)
    current_job_status = check_existing_job_status(job_name, transcribe, api_mode)
    uri = 's3://' + bucket + '/' + key

    # If there's a job already running then the input file may have been copied - quit
    if (current_job_status == "IN_PROGRESS") or (current_job_status == "QUEUED"):
        # Return empty job name
        print("A Transcription job named \'{}\' is already in progress - cannot continue.".format(job_name))
        return ""
    elif current_job_status != "":
        # But if an old one exists we can delete it
        delete_existing_job(job_name, transcribe, api_mode)

    # Setup the structures common to both Standard and Call Analytics
    job_settings = {}
    media_settings = {
        'MediaFileUri': uri
    }

    # Double check that if we have a custom vocab that it actually exists
    if cf.appConfig[cf.CONF_VOCABNAME] != "":
        try:
            vocab_name = cf.appConfig[cf.CONF_VOCABNAME] + '-' + lang_code.lower()
            our_vocab = transcribe.get_vocabulary(VocabularyName = vocab_name)
            if our_vocab["VocabularyState"] == "READY":
                # Only use it if it is ready for use
                job_settings["VocabularyName"] = vocab_name
        except:
            # Doesn't exist - don't use it
            pass

    # Double check that if we have a defined vocabulary filter that it exists
    try:
        vocab_filter_name = cf.appConfig[cf.CONF_FILTER_NAME] + '-' + lang_code.lower()
        transcribe.get_vocabulary_filter(VocabularyFilterName = vocab_filter_name)
        job_settings["VocabularyFilterMethod"] = cf.appConfig[cf.CONF_FILTER_MODE]
        job_settings["VocabularyFilterName"] = vocab_filter_name
    except:
        # Doesn't exist - don't use it
        pass

    # Get our role ARN from the environment and enable content redaction (if possible, and if wanted)
    role_arn = os.environ["RoleArn"]
    if cf.isTranscriptRedactionEnabled() and (lang_code in cf.appConfig[cf.CONF_REDACTION_LANGS]):
        content_redaction = {'RedactionType': 'PII', 'RedactionOutput': 'redacted_and_unredacted'}
    else:
        content_redaction = None

    # Now sort out the mode-specific parameters
    if api_mode == cf.API_ANALYTICS:
        # CALL ANALYTICS JOB MODE - start with redaction and language
        if content_redaction is not None:
            job_settings["ContentRedaction"] = content_redaction
        job_settings["LanguageOptions"] = [lang_code]

        # Work out where our AGENT channel is - this will default to AGENT=0 if it can't work it out
        conf_channels = [speaker_name.lower() for speaker_name in cf.appConfig[cf.CONF_SPEAKER_NAMES]]
        if "agent" in conf_channels:
            # Pick out the index, but if > 1 then we need to default to 0
            agent_channel_number = conf_channels.index("agent")
            if agent_channel_number > 1:
                agent_channel_number = 0
        else:
            # No agent name defined - default to channel-0
            agent_channel_number = 0

        # Now build up or channel definitions
        chan_def_agent = {'ChannelId': agent_channel_number, 'ParticipantRole': 'AGENT'}
        chan_def_cust = {'ChannelId': agent_channel_number ^ 1, 'ParticipantRole': 'CUSTOMER'}

        # Should have a clear run at doing the job now
        kwargs = {'CallAnalyticsJobName': job_name,
                  'Media': media_settings,
                  'OutputLocation': f"s3://{cf.appConfig[cf.CONF_S3BUCKET_OUTPUT]}/{cf.appConfig[cf.CONF_PREFIX_TRANSCRIBE_RESULTS]}/",
                  'DataAccessRoleArn': role_arn,
                  'Settings': job_settings,
                  'ChannelDefinitions': [chan_def_agent, chan_def_cust]
        }

        # Start the Transcribe job
        response = transcribe.start_call_analytics_job(
            **{k: v for k, v in kwargs.items() if v is not None}
        )
    else:
        # STANDARD TRANSCRIBE JOB MODE - start with some simple flags
        job_settings['ShowSpeakerLabels'] = not channel_ident
        job_settings['ChannelIdentification'] = channel_ident

        # Some settings are valid dependent on the mode
        if not channel_ident:
            job_settings["MaxSpeakerLabels"] = int(cf.appConfig[cf.CONF_MAX_SPEAKERS])

        # Job execution settings tp allow queueing of standard Transcribe jobs
        execution_settings = {
            "AllowDeferredExecution": True,
            "DataAccessRoleArn": role_arn
        }

        # Should have a clear run at doing the job now
        kwargs = {'TranscriptionJobName': job_name,
                  'LanguageCode': lang_code,
                  'Media': media_settings,
                  'OutputBucketName': cf.appConfig[cf.CONF_S3BUCKET_OUTPUT],
                  'OutputKey': cf.appConfig[cf.CONF_PREFIX_TRANSCRIBE_RESULTS] + '/',
                  'Settings': job_settings,
                  'JobExecutionSettings': execution_settings,
                  'ContentRedaction': content_redaction
        }

        # Start the Transcribe job, removing any 'None' values on the way
        response = transcribe.start_transcription_job(
            **{k: v for k, v in kwargs.items() if v is not None}
        )

    # Return our job name and api mode, as we need to track them
    return job_name, api_mode