in cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/SpeechToTextSDK.scala [498:577]
def this() = this(Identifiable.randomUID("ConversationTranscription"))
/** @return text transcription of the audio */
def inputStreamToText(stream: InputStream,
audioFormat: String,
uri: URI,
speechKey: String,
profanity: String,
language: String,
format: String,
defaultAudioFormat: Option[String],
participants: Seq[TranscriptionParticipant]
): Iterator[TranscriptionResponse] = {
val speechConfig = getSpeechConfig(uri, speechKey, language, profanity, format)
speechConfig.setProperty("ConversationTranscriptionInRoomAndOnline", "true")
speechConfig.setServiceProperty("transcriptionMode",
"RealTimeAndAsync", ServicePropertyChannel.UriQueryParameter)
val guid = UUID.randomUUID().toString
val conversation = Conversation.createConversationAsync(speechConfig, guid).get()
participants.foreach(p =>
conversation.addParticipantAsync(
Participant.from(p.name, p.language, p.signature)
).get()
)
val pullStream = getPullStream(stream, audioFormat, defaultAudioFormat)
val audioConfig = AudioConfig.fromStreamInput(pullStream)
val transcriber = new ConversationTranscriber(audioConfig)
// TODO fix this spelling in 1.15 update
conversation.getProperties.setProperty("DifferenciateGuestSpeakers", "true")
transcriber.joinConversationAsync(conversation).get()
val connection = Connection.fromRecognizer(transcriber)
connection.setMessageProperty("speech.config", "application",
s"""{"name":"synapseml", "version": "${BuildInfo.version}"}""")
val queue = new LinkedBlockingQueue[Option[String]]()
def cleanUp(): Unit = {
transcriber.stopTranscribingAsync().get()
Option(conversation).foreach(_.close())
Option(pullStream).foreach(_.close())
Option(speechConfig).foreach(_.close())
Option(audioConfig).foreach(_.close())
}
def recognizedHandler(s: Any, e: ConversationTranscriptionEventArgs): Unit = {
if (e.getResult.getReason eq ResultReason.RecognizedSpeech) {
queue.put(Some(e.getResult.getProperties.getProperty(PropertyId.SpeechServiceResponse_JsonResult)))
}
}
def sessionStoppedHandler(s: Any, e: SessionEventArgs): Unit = {
queue.put(None)
cleanUp()
}
transcriber.transcribed.addEventListener(makeEventHandler(recognizedHandler))
transcriber.sessionStopped.addEventListener(makeEventHandler(sessionStoppedHandler))
transcriber.startTranscribingAsync().get
if (getExtraFfmpegArgs.contains("-t")) {
val timeLimit = getExtraFfmpegArgs(getExtraFfmpegArgs.indexOf("-t") + 1).toInt
Future {
blocking {
Thread.sleep((timeLimit + 20)*1000)
}
queue.put(None)
cleanUp()
}(ExecutionContext.global)
}
new BlockingQueueIterator[String](queue, cleanUp()).map { jsonString =>
//println(jsonString)
jsonString.parseJson.convertTo[TranscriptionResponse]
}
}