transcodelauncher/main.go (186 lines of code) (raw):
package main
import (
"context"
"flag"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/elastictranscoder"
"github.com/aws/aws-sdk-go-v2/service/elastictranscoder/types"
"github.com/davecgh/go-spew/spew"
"github.com/guardian/simple-interactive-deliverables/common"
"log"
"os"
"path"
"regexp"
"time"
)
/*
isCompleted returns true if the given string pointer refers to a completed status
(i.e. success/failure/cancelled)
*/
func isCompleted(jobStatus *string) bool {
return *jobStatus == "Complete" || *jobStatus == "Canceled" || *jobStatus == "Error"
}
func getPresetInfo(cli *elastictranscoder.Client, presetId string) *elastictranscoder.ReadPresetOutput {
out, err := cli.ReadPreset(context.Background(), &elastictranscoder.ReadPresetInput{Id: aws.String(presetId)})
if err != nil {
log.Fatalf("Could not read preset %s: %s", presetId, err)
}
return out
}
func appendToBasefile(originalFilename *string, stuff string) string {
xtractor := regexp.MustCompile("(.*)\\.([^.]*)")
fileBase := path.Base(*originalFilename)
var fileStem string
parts := xtractor.FindAllStringSubmatch(fileBase, -1)
if parts != nil {
fileStem = parts[0][1]
} else {
fileStem = fileBase
}
return fileStem + stuff
}
func makeOutputFilename(originalFilename *string, transcode *TranscodeSet) string {
return appendToBasefile(originalFilename, transcode.Suffix+transcode.Extension)
}
func makeOutputThumbsPattern(originalFilename *string, transcode *TranscodeSet) string {
return appendToBasefile(originalFilename, fmt.Sprintf("_{count}%s", transcode.Suffix))
}
func WriteOutputs(awsConfig aws.Config, tableName string, content []*common.Encoding) {
client := dynamodb.NewFromConfig(awsConfig)
for _, entry := range content {
toWrite := entry.ToDynamoDB()
_, err := client.PutItem(context.Background(), &dynamodb.PutItemInput{
Item: toWrite,
TableName: &tableName,
})
if err != nil {
log.Fatal("Could not write item: ", err)
}
}
}
func main() {
inputFile := flag.String("input", "", "Bucket path to input file")
overridePrefix := flag.String("outputprefix", "", "Prefix for intermediate output files. If empty, a randomly generated prefix is used")
pipelineId := flag.String("pipeline", "", "Pipeline ID to run on")
transcodeSet := flag.String("transcodeset", "horizontal_transcode_set", "transcode set yaml to use")
contentId := flag.Int64("contentId", 0, "content ID value to use")
titleId := flag.Int64("titleId", 0, "title ID value to use")
uriBase := flag.String("uribase", "", "base URL on the CDN where content is accessible")
tableName := flag.String("table", "", "name of the table that contains encodings")
noDbOut := flag.Bool("nodb", false, "set this to only run the transcode and not push to encodings endpoint")
cdnPath := flag.String("cdnbucket", "", "If set, copy the files to this location in the form bucket:/path")
flag.Parse()
transcodes, err := LoadTranscodeSet(transcodeSet)
if err != nil {
log.Fatalf("Could not load transcodes from '%s': '%s'", *transcodeSet, err)
}
if *tableName == "" {
log.Fatal("You must specify a table to output encodings to")
}
var outputPrefix string
if overridePrefix == nil {
outputPrefix = *overridePrefix
} else {
outputPrefix = common.GenerateStringIdPathSafe() + "/"
}
awscfg, awsErr := awsconfig.LoadDefaultConfig(context.Background())
if awsErr != nil {
log.Fatalf("Could not connect to AWS: %s", awsErr)
}
cli := elastictranscoder.NewFromConfig(awscfg)
query := &elastictranscoder.ReadPipelineInput{Id: pipelineId}
pipelineInfo, err := cli.ReadPipeline(context.Background(), query)
if err != nil {
log.Fatalf("Could not query pipeline %s: %s", *pipelineId, err)
}
log.Printf("INFO Pipeline %s runs from %s into %s; status %s", *pipelineId, *pipelineInfo.Pipeline.InputBucket, *pipelineInfo.Pipeline.OutputBucket, *pipelineInfo.Pipeline.Status)
outputList := make([]types.CreateJobOutput, len(*transcodes))
for i, transcode := range *transcodes {
outputList[i] = types.CreateJobOutput{
Key: aws.String(makeOutputFilename(inputFile, &transcode)),
PresetId: aws.String(transcode.PresetId),
ThumbnailPattern: aws.String(makeOutputThumbsPattern(inputFile, &transcode)),
}
}
params := &elastictranscoder.CreateJobInput{
PipelineId: aws.String(*pipelineId),
Input: &types.JobInput{
Key: aws.String(*inputFile),
},
OutputKeyPrefix: aws.String(outputPrefix),
Outputs: outputList,
Playlists: nil,
UserMetadata: nil,
}
response, err := cli.CreateJob(context.Background(), params)
if err != nil {
log.Fatalf("Could not start ETS job for '%s': %s", *inputFile, err)
}
log.Printf("Created job with ID %s", *response.Job.Id)
var jobStatus *elastictranscoder.ReadJobOutput
for {
time.Sleep(5 * time.Second)
jobStatus, err = cli.ReadJob(context.Background(), &elastictranscoder.ReadJobInput{Id: response.Job.Id})
if err != nil {
log.Fatalf("Could not check job status: %s", err)
}
log.Printf("Job status is %s", *jobStatus.Job.Status)
if isCompleted(jobStatus.Job.Status) {
break
}
}
fcsId := common.GenerateStringId()
copier := NewCopier(awscfg)
switch *jobStatus.Job.Status {
case "Error":
log.Printf("Job failed, please see the ETS console for job ID '%s' to get the reason", *jobStatus.Job.Id)
os.Exit(1)
case "Canceled":
log.Printf("Job was cancelled in the ETS console")
os.Exit(2)
case "Complete":
log.Printf("Job completed!")
enc := make([]*common.Encoding, len(jobStatus.Job.Outputs))
for i, out := range jobStatus.Job.Outputs {
presetInfo := getPresetInfo(cli, *out.PresetId)
enc[i] = common.JobOutputToEncoding(&out, presetInfo.Preset, int32(*contentId), int32(*titleId), fcsId, *uriBase)
spew.Dump(enc[i])
outputFilepath := *out.Key
if outputPrefix != "" {
outputFilepath = outputPrefix + *out.Key
}
if *cdnPath != "" {
currentPosterName, destPosterName, err := PosterFrameNamesForEncoding(outputFilepath, ".png")
log.Printf("CurrentPosterName is %s from %s", currentPosterName, outputFilepath)
havePoster, err := copier.DoesFileExist(context.Background(), *pipelineInfo.Pipeline.OutputBucket, currentPosterName)
if err != nil {
log.Fatal("Could not locate proxy: ", err)
}
if havePoster {
err = copier.DoCopyDestspec(context.Background(),
*pipelineInfo.Pipeline.OutputBucket,
currentPosterName,
*cdnPath+"/"+destPosterName,
true)
if err != nil {
log.Printf("ERROR Could not copy poster s3://%s/%s: %s", *pipelineInfo.Pipeline.OutputBucket, currentPosterName, err)
}
} else {
log.Printf("WARNING Could not find a poster for %s", *out.Key)
}
err = copier.DoCopyDestspec(context.Background(),
*pipelineInfo.Pipeline.OutputBucket,
outputFilepath,
*cdnPath+"/"+*out.Key,
true)
if err != nil {
log.Printf("ERROR Could not copy s3://%s/%s: %s", *pipelineInfo.Pipeline.OutputBucket, outputFilepath, err)
}
}
}
if !*noDbOut {
WriteOutputs(awscfg, *tableName, enc)
}
default:
log.Fatalf("Got an unexpected job status: '%s'", *jobStatus.Job.Status)
}
}