sender/ConnectionPool.go (82 lines of code) (raw):
package sender
import (
"fmt"
"github.com/google/uuid"
"gitlab.com/codmill/customer-projects/guardian/pluto-vs-relay/mocks"
"log"
"sync"
"time"
)
type AmqpConnectionPool interface {
Send(exchange string, routingKey string, content *[]byte) error
}
type connectionPoolMessage struct {
exchange string
routingKey string
content *[]byte
msgId uuid.UUID
timestamp time.Time
}
func (m *connectionPoolMessage) String() string {
return fmt.Sprintf("%s to %s on %s", m.msgId.String(), m.routingKey, m.exchange)
}
type connectionPoolResult struct {
msgId uuid.UUID
isSuccess bool
}
type connectionPoolMap map[uuid.UUID]chan bool
type AmqpConnectionPoolImpl struct {
connection mocks.AmqpConnectionInterface
amqpWrapper ConnectionWrapper
mutex sync.Mutex
}
func NewAmqpConnectionPool(conn mocks.AmqpConnectionInterface) AmqpConnectionPool {
return &AmqpConnectionPoolImpl{
connection: conn,
amqpWrapper: nil,
mutex: sync.Mutex{},
}
}
func (p *AmqpConnectionPoolImpl) setupWrapper(pendingMessages []connectionPoolMessage) error {
p.mutex.Lock()
newWrapper, createErr := NewConnnectionWrapper(p.connection, pendingMessages)
if createErr != nil {
log.Print("Could not create a connection wrapper: ", createErr)
return createErr
}
p.amqpWrapper = newWrapper
p.mutex.Unlock()
return nil
}
func (p *AmqpConnectionPoolImpl) Send(exchange string, routingKey string, content *[]byte) error {
if p.amqpWrapper == nil {
setupErr := p.setupWrapper([]connectionPoolMessage{})
if setupErr != nil {
return setupErr
}
}
msg := connectionPoolMessage{
exchange: exchange,
routingKey: routingKey,
content: content,
timestamp: time.Now(),
msgId: uuid.New(),
}
for {
sendErr := p.amqpWrapper.Send(msg)
if sendErr != nil {
log.Print("WARNING ConnectionPool.Send could not send, re-initialising")
pendingMessages := p.amqpWrapper.Finish()
log.Printf("WARNING ConnectionPool.Send re-sending %d pending messages", len(pendingMessages))
setupErr := p.setupWrapper(pendingMessages)
if setupErr != nil {
log.Print("FATAL can't set up another wrapper, exiting")
panic("Can't set up wrapper")
}
time.Sleep(1 * time.Second)
} else {
break
}
}
return nil
}