in sender/ConnectionPool.go [61:94]
func (p *AmqpConnectionPoolImpl) Send(exchange string, routingKey string, content *[]byte) error {
if p.amqpWrapper == nil {
setupErr := p.setupWrapper([]connectionPoolMessage{})
if setupErr != nil {
return setupErr
}
}
msg := connectionPoolMessage{
exchange: exchange,
routingKey: routingKey,
content: content,
timestamp: time.Now(),
msgId: uuid.New(),
}
for {
sendErr := p.amqpWrapper.Send(msg)
if sendErr != nil {
log.Print("WARNING ConnectionPool.Send could not send, re-initialising")
pendingMessages := p.amqpWrapper.Finish()
log.Printf("WARNING ConnectionPool.Send re-sending %d pending messages", len(pendingMessages))
setupErr := p.setupWrapper(pendingMessages)
if setupErr != nil {
log.Print("FATAL can't set up another wrapper, exiting")
panic("Can't set up wrapper")
}
time.Sleep(1 * time.Second)
} else {
break
}
}
return nil
}