main.go (150 lines of code) (raw):

package main import ( "github.com/streadway/amqp" "gitlab.com/codmill/customer-projects/guardian/pluto-vs-relay/mocks" "gitlab.com/codmill/customer-projects/guardian/pluto-vs-relay/sender" "gitlab.com/codmill/customer-projects/guardian/pluto-vs-relay/vidispine" "log" "net/http" "net/url" "os" "os/signal" "syscall" "time" ) func setUpExchange(conn *amqp.Connection, exchangeName string) { rmqChan, chanErr := conn.Channel() if chanErr != nil { log.Fatal("Could not establish initial connection to rabbitmq: ", chanErr) } defer rmqChan.Close() declErr := rmqChan.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil) if declErr != nil { log.Fatal("Could not declare channel: ", declErr) } } func setUpNotifications(vidispine_url *url.URL, requestor *vidispine.VSRequestor, callbackUrl *url.URL) { /* we define the information that we are checking for with these three arrays. They all need to have the same number of (top-level) elements. The first element of `expectedNotificationTypes` is another array consisting of the "notification type" parameter for the first entity type and the first element of `requiredSubpaths` is the subpath of `callbackUrl` where we want that notification delivered */ expectedEntityTypes := []string{"job", "metadata", "item", "shape"} expectedNotificationTypes := [][]string{ {"stop", "update", "create"}, {"modify"}, {"create", "delete"}, {"create", "modify", "delete"}, } requiredSubpaths := []string{"/job", "/item/metadata", "/item", "/item/shape"} log.Print("Checking for our notifications in ", vidispine_url.String()) for entityIndex, et := range expectedEntityTypes { for _, nt := range expectedNotificationTypes[entityIndex] { notificationPresent, check_err := SearchForMyNotification(requestor, callbackUrl.String()+requiredSubpaths[entityIndex], et, nt) if check_err != nil { log.Fatal("Could not check for notification: ", check_err) } if !notificationPresent { log.Printf("INFO setUpNotifications missing %s %s notification", et, nt) createErr := CreateNotification(requestor, callbackUrl.String()+requiredSubpaths[entityIndex], et, nt) if createErr != nil { log.Fatal("Could not create notification: ", createErr) } } } } } /** sets up a signal handler to terminate cleanly if we receive SIGINT or SIGTERM */ func handleSignals(rmq mocks.AmqpConnectionInterface) { sigChan := make(chan os.Signal, 1) //buffer of 1 signal in case we're not in receiving state when it comes through signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) go func() { receivedSig := <-sigChan log.Printf("INFO handleSignals received %s, shutting down in 5s", receivedSig.String()) time.Sleep(5 * time.Second) closeErr := rmq.Close() if closeErr != nil { log.Print("ERROR handleSignals could not close connection: ", closeErr) } os.Exit(0) }() } func main() { vidispine_url_str := os.Getenv("VIDISPINE_URL") vidispine_user := os.Getenv("VIDISPINE_USER") vidispine_passwd := os.Getenv("VIDISPINE_PASSWORD") callback_uri_str := os.Getenv("CALLBACK_BASE") rabbitmq_uri_str := os.Getenv("RABBITMQ_URI") exchangeName := os.Getenv("RABBITMQ_EXCHANGE") if vidispine_url_str == "" || vidispine_user == "" || vidispine_passwd == "" { log.Fatal("Please set VIDISPINE_URL, VIDISPINE_USER and VIDISPINE_PASSWORD in the environment") } if callback_uri_str == "" { log.Fatal("Please set CALLBACK_BASE in the environment") } if rabbitmq_uri_str == "" { log.Fatal("Please set RABBITMQ_URI in the environment") } if exchangeName == "" { log.Fatal("Please set RABBITMQ_EXCHANGE in the environment") } vidispine_url, url_parse_err := url.Parse(vidispine_url_str) if url_parse_err != nil { log.Fatal("VIDISPINE_URL is not valid: ", url_parse_err) } callback_url, url_parse_err := url.Parse(callback_uri_str) if url_parse_err != nil { log.Fatal("CALLBACK_BASE is not valid: ", url_parse_err) } rmq, rmqErr := amqp.Dial(rabbitmq_uri_str) if rmqErr != nil { log.Fatal("Could not connect to rabbitmq: ", rmqErr) } conn := &mocks.AmqpConnectionShim{ Connection: rmq, } /* ensure that rabbitmq connection is terminated cleanly even if program exits uncleanly */ defer func() { if r := recover(); r != nil { log.Print("WARNING main Program is existing due to panic") if rmq != nil && !rmq.IsClosed() { log.Print("INFO main Shutting down broker connection") closeErr := rmq.Close() if closeErr != nil { log.Print("ERROR main Could not shut down broker connection but terminating anyway ", closeErr) } } os.Exit(0xFF) } }() /* ensure that rabbiqmq connection is terminated cleanly if we receive termination signal */ handleSignals(conn) requestor := vidispine.NewVSRequestor(*vidispine_url, vidispine_user, vidispine_passwd) setUpNotifications(vidispine_url, requestor, callback_url) setUpExchange(rmq, exchangeName) amqpPool := sender.NewAmqpConnectionPool(conn) jobMessageHandler := VidispineMessageHandler{ ConnectionPool: amqpPool, ExchangeName: exchangeName, ChannelTimeout: 45 * time.Second, } itemMessageHandler := VidispineItemMessageHandler{ ConnectionPool: amqpPool, ExchangeName: exchangeName, ChannelTimeout: 45 * time.Second, } metaMessageHandler := VidispineMetadataMessageHandler{ ConnectionPool: amqpPool, ExchangeName: exchangeName, ChannelTimeout: 45 * time.Second, } shapeMessageHandler := VidispineShapeMessageHandler{ ConnectionPool: amqpPool, ExchangeName: exchangeName, ChannelTimeout: 45 * time.Second, } healthcheckHandler := HealthcheckHandler{} log.Printf("Callback URL path is %s", callback_url.Path) http.Handle(callback_url.Path+"/job", jobMessageHandler) http.Handle(callback_url.Path+"/item/metadata", metaMessageHandler) http.Handle(callback_url.Path+"/item", itemMessageHandler) http.Handle(callback_url.Path+"/item/shape", shapeMessageHandler) http.Handle("/healthcheck", healthcheckHandler) log.Printf("Starting up on port 8080...") startServeErr := http.ListenAndServe(":8080", nil) if startServeErr != nil { log.Fatal(startServeErr) } }