in sender/ConnectionWrapper.go [65:90]
func (w *ConnectionWrapperImpl) pickUpConfirms() {
w.terminationWaiter.Add(1)
for {
select {
case <-w.terminationChannel:
log.Print("INFO ConnectionWrapperImpl.pickUpConfirms requested termination, shutting down")
w.terminationWaiter.Done()
return
case msg := <-w.amqpConfirmChanel:
if msg.Ack {
w.mutex.Lock()
delete(w.awaitingConfirmation, msg.DeliveryTag)
w.mutex.Unlock()
} else {
log.Print("WARNING ConnectionWrapperImpl.pickUpConfirms server rejected message, resending")
w.mutex.Lock()
originalSource := w.awaitingConfirmation[msg.DeliveryTag]
w.mutex.Unlock()
go func() {
time.Sleep(3 * time.Second)
w.Send(originalSource)
}()
}
}
}
}