in internal/remoting/handshake.go [36:84]
func Handshake(ctx context.Context, conn net.Conn) error {
if err := sendHandshake(ctx, conn); err != nil {
return fmt.Errorf("Write handshake to remote failed, err=%s. ", err.Error())
}
waitRespTimeout := 5 * time.Second
waitTimeout := time.After(waitRespTimeout)
for {
select {
case <-waitTimeout:
return fmt.Errorf("Wait handshake response timeout, timeout=%s ", waitRespTimeout.String())
default:
var dataLen uint32
hdrBuf := make([]byte, constants.TransportHeaderSize)
n, err := io.ReadFull(conn, hdrBuf)
if err == io.EOF {
continue
}
if n < constants.TransportHeaderSize {
logger.Errorf("Read header from connection failed, read bytes=%d but expect bytes=%d", n, constants.TransportHeaderSize)
continue
}
dataLen = binary.BigEndian.Uint32(hdrBuf)
dataBuf := make([]byte, dataLen)
n, err = io.ReadFull(conn, dataBuf)
if err == io.EOF {
continue
}
if n < int(dataLen) {
logger.Errorf("Read payload from connection failed, read bytes=%d but expect bytes=%d", n, dataLen)
continue
}
msg, err := trans.ReadAkkaMsg(dataBuf)
if err != nil {
return fmt.Errorf("handshake read akka msg err=%+v ", err)
}
if controlMsg := msg.Instruction; controlMsg != nil && controlMsg.CommandType != nil {
if int32(*controlMsg.CommandType) == int32(akka.CommandType_ASSOCIATE) {
logger.Infof("Receive handshake msg, msg=%+v ", controlMsg)
return nil
}
} else {
return fmt.Errorf("Receive unknown msg type when wait handshake response, msg=%+v ", msg)
}
}
}
}