func()

in client/proxy/proxy.go [294:420]


func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
	xl := pxy.xl
	defer conn.Close()
	var natHoleSidMsg msg.NatHoleSid
	err := msg.ReadMsgInto(conn, &natHoleSidMsg)
	if err != nil {
		xl.Error("xtcp read from workConn error: %v", err)
		return
	}

	natHoleClientMsg := &msg.NatHoleClient{
		ProxyName: pxy.cfg.ProxyName,
		Sid:       natHoleSidMsg.Sid,
	}
	raddr, _ := net.ResolveUDPAddr("udp",
		net.JoinHostPort(pxy.clientCfg.ServerAddr, strconv.Itoa(pxy.serverUDPPort)))
	clientConn, err := net.DialUDP("udp", nil, raddr)
	if err != nil {
		xl.Error("dial server udp addr error: %v", err)
		return
	}
	defer clientConn.Close()

	err = msg.WriteMsg(clientConn, natHoleClientMsg)
	if err != nil {
		xl.Error("send natHoleClientMsg to server error: %v", err)
		return
	}

	// Wait for client address at most 5 seconds.
	var natHoleRespMsg msg.NatHoleResp
	_ = clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))

	buf := pool.GetBuf(1024)
	n, err := clientConn.Read(buf)
	if err != nil {
		xl.Error("get natHoleRespMsg error: %v", err)
		return
	}
	err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
	if err != nil {
		xl.Error("get natHoleRespMsg error: %v", err)
		return
	}
	_ = clientConn.SetReadDeadline(time.Time{})
	_ = clientConn.Close()

	if natHoleRespMsg.Error != "" {
		xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
		return
	}

	xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)

	// Send detect message
	host, portStr, err := net.SplitHostPort(natHoleRespMsg.VisitorAddr)
	if err != nil {
		xl.Error("get NatHoleResp visitor address [%s] error: %v", natHoleRespMsg.VisitorAddr, err)
	}
	laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())

	port, err := strconv.ParseInt(portStr, 10, 64)
	if err != nil {
		xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
		return
	}
	_ = pxy.sendDetectMsg(host, int(port), laddr, []byte(natHoleRespMsg.Sid))
	xl.Trace("send all detect msg done")

	if err := msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{}); err != nil {
		xl.Error("write message error: %v", err)
		return
	}

	// Listen for clientConn's address and wait for visitor connection
	lConn, err := net.ListenUDP("udp", laddr)
	if err != nil {
		xl.Error("listen on visitorConn's local address error: %v", err)
		return
	}
	defer lConn.Close()

	_ = lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
	sidBuf := pool.GetBuf(1024)
	var uAddr *net.UDPAddr
	n, uAddr, err = lConn.ReadFromUDP(sidBuf)
	if err != nil {
		xl.Warn("get sid from visitor error: %v", err)
		return
	}
	_ = lConn.SetReadDeadline(time.Time{})
	if string(sidBuf[:n]) != natHoleRespMsg.Sid {
		xl.Warn("incorrect sid from visitor")
		return
	}
	pool.PutBuf(sidBuf)
	xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)

	if _, err := lConn.WriteToUDP(sidBuf[:n], uAddr); err != nil {
		xl.Error("write uaddr error: %v", err)
		return
	}

	kcpConn, err := frpNet.NewKCPConnFromUDP(lConn, false, uAddr.String())
	if err != nil {
		xl.Error("create kcp connection from udp connection error: %v", err)
		return
	}

	fmuxCfg := fmux.DefaultConfig()
	fmuxCfg.KeepAliveInterval = 5 * time.Second
	fmuxCfg.LogOutput = io.Discard
	sess, err := fmux.Server(kcpConn, fmuxCfg)
	if err != nil {
		xl.Error("create yamux server from kcp connection error: %v", err)
		return
	}
	defer sess.Close()
	muxConn, err := sess.Accept()
	if err != nil {
		xl.Error("accept for yamux connection error: %v", err)
		return
	}

	HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
		muxConn, []byte(pxy.cfg.Sk), m)
}