sender/ConnectionWrapper.go (111 lines of code) (raw):

package sender import ( "github.com/streadway/amqp" "gitlab.com/codmill/customer-projects/guardian/pluto-vs-relay/mocks" "log" "sync" "time" ) type ConnectionWrapper interface { Send(message connectionPoolMessage) error //call this to shut down the connection. Returns a list of messages that did not get published. Finish() []connectionPoolMessage } type ConnectionWrapperImpl struct { amqpChannel mocks.AmqpChannelInterface terminationChannel chan bool terminationWaiter sync.WaitGroup amqpConfirmChanel chan amqp.Confirmation sendCounter uint64 //this is a map of (delivery_tag, source) that contains content which has not yet been acked awaitingConfirmation map[uint64]connectionPoolMessage mutex sync.Mutex } func NewConnnectionWrapper(conn mocks.AmqpConnectionInterface, pendingMessages []connectionPoolMessage) (ConnectionWrapper, error) { amqpChnl, chnlErr := conn.Channel() if chnlErr != nil { log.Print("ERROR NewConnectionWrapper could not create a channel: ", chnlErr) return nil, chnlErr } w := &ConnectionWrapperImpl{ amqpChannel: amqpChnl, terminationChannel: make(chan bool, 1), terminationWaiter: sync.WaitGroup{}, amqpConfirmChanel: make(chan amqp.Confirmation, 5), sendCounter: 0, awaitingConfirmation: make(map[uint64]connectionPoolMessage), mutex: sync.Mutex{}, } amqpChnl.NotifyPublish(w.amqpConfirmChanel) setNotifyErr := amqpChnl.Confirm(false) if setNotifyErr != nil { log.Print("ERROR NewConnectionWrapper could not enter confirmation mode: ", setNotifyErr) return nil, setNotifyErr } go w.pickUpConfirms() for _, msg := range pendingMessages { sendErr := w.Send(msg) if sendErr != nil { log.Printf("WARNING could not send pending message %s: %s", msg.String(), sendErr) } } return w, nil } /** this is a goroutine that listens for confirmation messages */ func (w *ConnectionWrapperImpl) pickUpConfirms() { w.terminationWaiter.Add(1) for { select { case <-w.terminationChannel: log.Print("INFO ConnectionWrapperImpl.pickUpConfirms requested termination, shutting down") w.terminationWaiter.Done() return case msg := <-w.amqpConfirmChanel: if msg.Ack { w.mutex.Lock() delete(w.awaitingConfirmation, msg.DeliveryTag) w.mutex.Unlock() } else { log.Print("WARNING ConnectionWrapperImpl.pickUpConfirms server rejected message, resending") w.mutex.Lock() originalSource := w.awaitingConfirmation[msg.DeliveryTag] w.mutex.Unlock() go func() { time.Sleep(3 * time.Second) w.Send(originalSource) }() } } } } /** sends content to the channel. If this returns an error, then the ConnectionWrapper object should be disposed and re-initialised */ func (w *ConnectionWrapperImpl) Send(message connectionPoolMessage) error { w.mutex.Lock() defer w.mutex.Unlock() //deliberate - don't release mutex until we have either published or failed. that will ensure //that the counter actually does correspond to the delivery tag w.sendCounter += 1 w.awaitingConfirmation[w.sendCounter] = message return w.amqpChannel.Publish(message.exchange, message.routingKey, true, false, amqp.Publishing{ Headers: map[string]interface{}{"Timestamp": message.timestamp.Format(time.RFC3339Nano)}, ContentEncoding: "utf-8", ContentType: "application/json", Timestamp: message.timestamp, MessageId: message.msgId.String(), Body: *message.content, }) } func (w *ConnectionWrapperImpl) Finish() []connectionPoolMessage { closeErr := w.amqpChannel.Close() if closeErr != nil { log.Print("ERROR ConnectionWrapper.Finish could not close broker channel: ", closeErr) } log.Print("DEBUG ConnectionWrapper.Finish waiting for confirm receive routine to terminate") w.terminationChannel <- true w.terminationWaiter.Wait() log.Printf("DEBUG ConnectionWrapper.Finish done.") pendingMessages := make([]connectionPoolMessage, len(w.awaitingConfirmation)) i := 0 for _, msg := range w.awaitingConfirmation { pendingMessages[i] = msg } return pendingMessages }