func()

in sender/ConnectionPool.go [61:94]


func (p *AmqpConnectionPoolImpl) Send(exchange string, routingKey string, content *[]byte) error {
	if p.amqpWrapper == nil {
		setupErr := p.setupWrapper([]connectionPoolMessage{})
		if setupErr != nil {
			return setupErr
		}
	}

	msg := connectionPoolMessage{
		exchange:   exchange,
		routingKey: routingKey,
		content:    content,
		timestamp:  time.Now(),
		msgId:      uuid.New(),
	}

	for {
		sendErr := p.amqpWrapper.Send(msg)
		if sendErr != nil {
			log.Print("WARNING ConnectionPool.Send could not send, re-initialising")
			pendingMessages := p.amqpWrapper.Finish()
			log.Printf("WARNING ConnectionPool.Send re-sending %d pending messages", len(pendingMessages))
			setupErr := p.setupWrapper(pendingMessages)
			if setupErr != nil {
				log.Print("FATAL can't set up another wrapper, exiting")
				panic("Can't set up wrapper")
			}
			time.Sleep(1 * time.Second)
		} else {
			break
		}
	}
	return nil
}