in mex.go [326:366]
func (mexset *messageExchangeSet) newExchange(ctx context.Context, ctxCancel context.CancelFunc, framePool FramePool,
msgType messageType, msgID uint32, bufferSize int) (*messageExchange, error) {
if mexset.log.Enabled(LogLevelDebug) {
mexset.log.Debugf("Creating new %s message exchange for [%v:%d]", mexset.name, msgType, msgID)
}
mex := &messageExchange{
msgType: msgType,
msgID: msgID,
ctx: ctx,
ctxCancel: ctxCancel,
recvCh: make(chan *Frame, bufferSize),
errCh: newErrNotifier(),
mexset: mexset,
framePool: framePool,
}
mexset.Lock()
addErr := mexset.addExchange(mex)
mexset.Unlock()
if addErr != nil {
logger := mexset.log.WithFields(
LogField{"msgID", mex.msgID},
LogField{"msgType", mex.msgType},
LogField{"exchange", mexset.name},
)
if addErr == errMexSetShutdown {
logger.Warn("Attempted to create new mex after mexset shutdown.")
} else if addErr == errDuplicateMex {
logger.Warn("Duplicate msg ID for active and new mex.")
}
return nil, addErr
}
mexset.onAdded()
// TODO(mmihic): Put into a deadline ordered heap so we can garbage collected expired exchanges
return mex, nil
}