in sender/ConnectionWrapper.go [28:60]
func NewConnnectionWrapper(conn mocks.AmqpConnectionInterface, pendingMessages []connectionPoolMessage) (ConnectionWrapper, error) {
amqpChnl, chnlErr := conn.Channel()
if chnlErr != nil {
log.Print("ERROR NewConnectionWrapper could not create a channel: ", chnlErr)
return nil, chnlErr
}
w := &ConnectionWrapperImpl{
amqpChannel: amqpChnl,
terminationChannel: make(chan bool, 1),
terminationWaiter: sync.WaitGroup{},
amqpConfirmChanel: make(chan amqp.Confirmation, 5),
sendCounter: 0,
awaitingConfirmation: make(map[uint64]connectionPoolMessage),
mutex: sync.Mutex{},
}
amqpChnl.NotifyPublish(w.amqpConfirmChanel)
setNotifyErr := amqpChnl.Confirm(false)
if setNotifyErr != nil {
log.Print("ERROR NewConnectionWrapper could not enter confirmation mode: ", setNotifyErr)
return nil, setNotifyErr
}
go w.pickUpConfirms()
for _, msg := range pendingMessages {
sendErr := w.Send(msg)
if sendErr != nil {
log.Printf("WARNING could not send pending message %s: %s", msg.String(), sendErr)
}
}
return w, nil
}