in server/proxy/proxy.go [252:301]
func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.ServerCommonConf) {
xl := xlog.FromContextSafe(pxy.Context())
defer userConn.Close()
// server plugin hook
rc := pxy.GetResourceController()
content := &plugin.NewUserConnContent{
User: pxy.GetUserInfo(),
ProxyName: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
RemoteAddr: userConn.RemoteAddr().String(),
}
_, err := rc.PluginManager.NewUserConn(content)
if err != nil {
xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
return
}
// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil {
return
}
defer workConn.Close()
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseInfo()
xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
if cfg.UseEncryption {
local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = frpIo.WithCompression(local)
}
xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
name := pxy.GetName()
proxyType := pxy.GetConf().GetBaseInfo().ProxyType
metrics.Server.OpenConnection(name, proxyType)
inCount, outCount := frpIo.Join(local, userConn)
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, inCount)
metrics.Server.AddTrafficOut(name, proxyType, outCount)
xl.Debug("join connections closed")
}