func()

in mex.go [326:366]


func (mexset *messageExchangeSet) newExchange(ctx context.Context, ctxCancel context.CancelFunc, framePool FramePool,
	msgType messageType, msgID uint32, bufferSize int) (*messageExchange, error) {
	if mexset.log.Enabled(LogLevelDebug) {
		mexset.log.Debugf("Creating new %s message exchange for [%v:%d]", mexset.name, msgType, msgID)
	}

	mex := &messageExchange{
		msgType:   msgType,
		msgID:     msgID,
		ctx:       ctx,
		ctxCancel: ctxCancel,
		recvCh:    make(chan *Frame, bufferSize),
		errCh:     newErrNotifier(),
		mexset:    mexset,
		framePool: framePool,
	}

	mexset.Lock()
	addErr := mexset.addExchange(mex)
	mexset.Unlock()

	if addErr != nil {
		logger := mexset.log.WithFields(
			LogField{"msgID", mex.msgID},
			LogField{"msgType", mex.msgType},
			LogField{"exchange", mexset.name},
		)
		if addErr == errMexSetShutdown {
			logger.Warn("Attempted to create new mex after mexset shutdown.")
		} else if addErr == errDuplicateMex {
			logger.Warn("Duplicate msg ID for active and new mex.")
		}

		return nil, addErr
	}

	mexset.onAdded()

	// TODO(mmihic): Put into a deadline ordered heap so we can garbage collected expired exchanges
	return mex, nil
}