in connection.go [316:389]
func (ch *Channel) newConnection(baseCtx context.Context, conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) *Connection {
opts := ch.connectionOptions.withDefaults()
connID := _nextConnID.Inc()
connDirection := inbound
log := ch.log.WithFields(LogFields{
{"connID", connID},
{"localAddr", conn.LocalAddr().String()},
{"remoteAddr", conn.RemoteAddr().String()},
{"remoteHostPort", remotePeer.HostPort},
{"remoteIsEphemeral", remotePeer.IsEphemeral},
{"remoteProcess", remotePeer.ProcessName},
}...)
if outboundHP != "" {
connDirection = outbound
log = log.WithFields(LogField{"outboundHP", outboundHP})
}
log = log.WithFields(LogField{"connectionDirection", connDirection})
peerInfo := ch.PeerInfo()
timeNow := ch.timeNow().UnixNano()
c := &Connection{
channelConnectionCommon: ch.channelConnectionCommon,
connID: connID,
conn: conn,
sysConn: getSysConn(conn, log),
connDirection: connDirection,
opts: opts,
state: connectionActive,
sendCh: make(chan *Frame, opts.getSendBufferSize(remotePeer.ProcessName)),
stopCh: make(chan struct{}),
localPeerInfo: peerInfo,
remotePeerInfo: remotePeer,
remotePeerAddress: remotePeerAddress,
outboundHP: outboundHP,
inbound: newMessageExchangeSet(log, messageExchangeSetInbound),
outbound: newMessageExchangeSet(log, messageExchangeSetOutbound),
internalHandlers: ch.internalHandlers,
handler: ch.handler,
events: events,
commonStatsTags: ch.commonStatsTags,
healthCheckHistory: newHealthHistory(),
lastActivityRead: *atomic.NewInt64(timeNow),
lastActivityWrite: *atomic.NewInt64(timeNow),
baseContext: ch.connContext(baseCtx, conn),
}
if tosPriority := opts.TosPriority; tosPriority > 0 {
if err := ch.setConnectionTosPriority(tosPriority, conn); err != nil {
log.WithFields(ErrField(err)).Error("Failed to set ToS priority.")
}
}
c.nextMessageID.Store(initialID)
c.log = log
c.outbound.onCancel = c.onCancel
c.inbound.onRemoved = c.checkExchanges
c.outbound.onRemoved = c.checkExchanges
c.inbound.onAdded = c.onExchangeAdded
c.outbound.onAdded = c.onExchangeAdded
if ch.RelayHost() != nil {
c.relay = NewRelayer(ch, c)
}
// Connections are activated as soon as they are created.
c.callOnActive()
go c.readFrames(connID)
go c.writeFrames(connID)
return c
}