backend/app/extraction/ExternalTranscriptionExtractor.scala (144 lines of code) (raw):
package extraction
import com.amazonaws.services.sqs.AmazonSQS
import com.amazonaws.services.sqs.model.{MessageAttributeValue, SendMessageRequest}
import model.manifest.Blob
import org.joda.time.DateTime
import play.api.libs.json.{JsError, JsResult, JsValue, Json, Reads}
import services.index.Index
import services.{ObjectStorage, TranscribeConfig}
import utils._
import utils.attempt.Failure
import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters.MapHasAsJava
case class SignedUrl(url: String, key: String)
object SignedUrl {
implicit val formats = Json.format[SignedUrl]
}
case class OutputBucketUrls(text: SignedUrl, srt: SignedUrl, json: SignedUrl)
case class OutputBucketKeys(text: String, srt: String, json: String)
case class TranscriptionJob(id: String, originalFilename: String, inputSignedUrl: String, sentTimestamp: String,
userEmail: String, transcriptDestinationService: String, outputBucketUrls: OutputBucketUrls,
languageCode: String, translate: Boolean, translationOutputBucketUrls: OutputBucketUrls,
diarize: Boolean = false, engine: String = "whispercpp")
object OutputBucketUrls {
implicit val formats = Json.format[OutputBucketUrls]
}
object OutputBucketKeys {
implicit val formats = Json.format[OutputBucketKeys]
}
object TranscriptionJob {
implicit val formats = Json.format[TranscriptionJob]
}
sealed trait TranscriptionOutput {
def id: String
def originalFilename: String
def userEmail: String
def isTranslation: Boolean
def status: String
}
case class TranscriptionOutputSuccess(
id: String,
originalFilename: String,
userEmail: String,
isTranslation: Boolean,
status: String = "SUCCESS",
languageCode: String,
outputBucketKeys: OutputBucketKeys,
translationOutputBucketKeys: Option[OutputBucketKeys]
) extends TranscriptionOutput
case class TranscriptionOutputFailure(
id: String,
originalFilename: String,
userEmail: String,
isTranslation: Boolean,
status: String = "FAILURE"
) extends TranscriptionOutput
object TranscriptionOutputSuccess {
implicit val format = Json.format[TranscriptionOutputSuccess]
}
object TranscriptionOutputFailure {
implicit val format = Json.format[TranscriptionOutputFailure]
}
object TranscriptionOutput {
// Custom Reads to handle both message types
implicit val transcriptionOutputReads: Reads[TranscriptionOutput] = new Reads[TranscriptionOutput] {
def reads(json: JsValue): JsResult[TranscriptionOutput] = {
(json \ "status").as[String] match {
case "SUCCESS" => json.validate[TranscriptionOutputSuccess]
case "FAILURE" => json.validate[TranscriptionOutputFailure]
case other => JsError(s"Unknown status type: $other")
}
}
}
}
// The transcription types are matched with types in transcription service
// https://github.com/guardian/transcription-service/blob/main/packages/common/src/types.ts
class ExternalTranscriptionExtractor(index: Index, transcribeConfig: TranscribeConfig, transcriptionStorage: ObjectStorage, outputStorage: ObjectStorage, amazonSQSClient: AmazonSQS)(implicit executionContext: ExecutionContext) extends ExternalExtractor with Logging {
val mimeTypes: Set[String] = Set(
"audio/wav",
"audio/vnd.wave",
"audio/x-aiff", // converted and transcribed. But preview doesn't work
"audio/mpeg",
"audio/aac", // tika can't detect this!!
"audio/vorbis", // Converted by ffmpeg but failed in whisper
"audio/opus",
"audio/amr", // converted and transcribed. But preview doesn't work
"audio/amr-wb", // Couldn't find a sample to test
"audio/x-caf", // Couldn't find a sample to test
"audio/mp4", // Couldn't find a sample to test
"audio/x-ms-wma", // converted and transcribed. But preview doesn't work
"video/3gpp",
"video/mp4", // quicktime detected for some of mp4 samples
"video/quicktime",
"video/x-flv", // converted and transcribed. But preview doesn't work
"video/x-ms-wmv", // converted and transcribed. But preview doesn't work
"video/x-msvideo", // converted and transcribed. But preview doesn't work
"video/x-m4v",
"video/mpeg" // converted and transcribed. But preview doesn't work
)
def canProcessMimeType: String => Boolean = mimeTypes.contains
override def indexing = true
// set a low priority as transcription takes a long time, we don't want to block up the workers
override def priority = 2
private def getOutputBucketUrls(blobUri: String): Either[Failure, OutputBucketUrls] = {
val srtKey = s"srt/$blobUri.srt"
val jsonKey = s"json/$blobUri.json"
val textKey = s"text/$blobUri.txt"
val bucketUrls = for {
srt <- outputStorage.getUploadSignedUrl(srtKey)
json <- outputStorage.getUploadSignedUrl(jsonKey)
text <- outputStorage.getUploadSignedUrl(textKey)
} yield {
OutputBucketUrls(
SignedUrl(text, textKey),
SignedUrl(srt, srtKey),
SignedUrl(json, jsonKey)
)
}
bucketUrls
}
override def triggerExtraction(blob: Blob, params: ExtractionParams): Either[Failure, Unit] = {
val transcriptionJob = for {
downloadSignedUrl <- transcriptionStorage.getSignedUrl (blob.uri.toStoragePath)
transcriptsOutputSignedUrls <- getOutputBucketUrls(blob.uri.value)
translationOutputSignedUrls <- getOutputBucketUrls(s"${blob.uri.value}-translation")
} yield {
TranscriptionJob(blob.uri.value, blob.uri.value, downloadSignedUrl, DateTime.now().toString, "giant", "Giant",
transcriptsOutputSignedUrls, "auto", true, translationOutputSignedUrls)
}
transcriptionJob.flatMap {
job => {
try {
logger.info(s"sending message to Transcription Service Queue")
val sendMessageCommand = new SendMessageRequest()
.withQueueUrl(transcribeConfig.transcriptionServiceQueueUrl)
.withMessageBody(Json.stringify(Json.toJson(job)))
.withMessageGroupId(UUID.randomUUID().toString)
.withMessageAttributes(
Map("BlobId" -> new MessageAttributeValue().withDataType("String").withStringValue(blob.uri.value)).asJava
)
Right(amazonSQSClient.sendMessage(sendMessageCommand))
} catch {
case e: Failure => Left(e)
}
}
}
}
}