func()

in connection.go [316:389]


func (ch *Channel) newConnection(baseCtx context.Context, conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) *Connection {
	opts := ch.connectionOptions.withDefaults()

	connID := _nextConnID.Inc()
	connDirection := inbound
	log := ch.log.WithFields(LogFields{
		{"connID", connID},
		{"localAddr", conn.LocalAddr().String()},
		{"remoteAddr", conn.RemoteAddr().String()},
		{"remoteHostPort", remotePeer.HostPort},
		{"remoteIsEphemeral", remotePeer.IsEphemeral},
		{"remoteProcess", remotePeer.ProcessName},
	}...)
	if outboundHP != "" {
		connDirection = outbound
		log = log.WithFields(LogField{"outboundHP", outboundHP})
	}

	log = log.WithFields(LogField{"connectionDirection", connDirection})
	peerInfo := ch.PeerInfo()
	timeNow := ch.timeNow().UnixNano()

	c := &Connection{
		channelConnectionCommon: ch.channelConnectionCommon,

		connID:             connID,
		conn:               conn,
		sysConn:            getSysConn(conn, log),
		connDirection:      connDirection,
		opts:               opts,
		state:              connectionActive,
		sendCh:             make(chan *Frame, opts.getSendBufferSize(remotePeer.ProcessName)),
		stopCh:             make(chan struct{}),
		localPeerInfo:      peerInfo,
		remotePeerInfo:     remotePeer,
		remotePeerAddress:  remotePeerAddress,
		outboundHP:         outboundHP,
		inbound:            newMessageExchangeSet(log, messageExchangeSetInbound),
		outbound:           newMessageExchangeSet(log, messageExchangeSetOutbound),
		internalHandlers:   ch.internalHandlers,
		handler:            ch.handler,
		events:             events,
		commonStatsTags:    ch.commonStatsTags,
		healthCheckHistory: newHealthHistory(),
		lastActivityRead:   *atomic.NewInt64(timeNow),
		lastActivityWrite:  *atomic.NewInt64(timeNow),
		baseContext:        ch.connContext(baseCtx, conn),
	}

	if tosPriority := opts.TosPriority; tosPriority > 0 {
		if err := ch.setConnectionTosPriority(tosPriority, conn); err != nil {
			log.WithFields(ErrField(err)).Error("Failed to set ToS priority.")
		}
	}

	c.nextMessageID.Store(initialID)
	c.log = log
	c.outbound.onCancel = c.onCancel
	c.inbound.onRemoved = c.checkExchanges
	c.outbound.onRemoved = c.checkExchanges
	c.inbound.onAdded = c.onExchangeAdded
	c.outbound.onAdded = c.onExchangeAdded

	if ch.RelayHost() != nil {
		c.relay = NewRelayer(ch, c)
	}

	// Connections are activated as soon as they are created.
	c.callOnActive()

	go c.readFrames(connID)
	go c.writeFrames(connID)
	return c
}