func Handshake()

in internal/remoting/handshake.go [36:84]


func Handshake(ctx context.Context, conn net.Conn) error {
	if err := sendHandshake(ctx, conn); err != nil {
		return fmt.Errorf("Write handshake to remote failed, err=%s. ", err.Error())
	}

	waitRespTimeout := 5 * time.Second
	waitTimeout := time.After(waitRespTimeout)
	for {
		select {
		case <-waitTimeout:
			return fmt.Errorf("Wait handshake response timeout, timeout=%s ", waitRespTimeout.String())
		default:
			var dataLen uint32
			hdrBuf := make([]byte, constants.TransportHeaderSize)
			n, err := io.ReadFull(conn, hdrBuf)
			if err == io.EOF {
				continue
			}
			if n < constants.TransportHeaderSize {
				logger.Errorf("Read header from connection failed, read bytes=%d but expect bytes=%d", n, constants.TransportHeaderSize)
				continue
			}

			dataLen = binary.BigEndian.Uint32(hdrBuf)
			dataBuf := make([]byte, dataLen)
			n, err = io.ReadFull(conn, dataBuf)
			if err == io.EOF {
				continue
			}
			if n < int(dataLen) {
				logger.Errorf("Read payload from connection failed, read bytes=%d but expect bytes=%d", n, dataLen)
				continue
			}

			msg, err := trans.ReadAkkaMsg(dataBuf)
			if err != nil {
				return fmt.Errorf("handshake read akka msg err=%+v ", err)
			}
			if controlMsg := msg.Instruction; controlMsg != nil && controlMsg.CommandType != nil {
				if int32(*controlMsg.CommandType) == int32(akka.CommandType_ASSOCIATE) {
					logger.Infof("Receive handshake msg, msg=%+v ", controlMsg)
					return nil
				}
			} else {
				return fmt.Errorf("Receive unknown msg type when wait handshake response, msg=%+v ", msg)
			}
		}
	}
}