func OnMsgReceived()

in internal/remoting/handler.go [37:120]


func OnMsgReceived(ctx context.Context) {
	connpool := pool.GetConnPool()

	var dataLen uint32
	hdrBuf := make([]byte, constants.TransportHeaderSize)
	for {
		conn, err := connpool.Get(ctx)
		if err != nil {
			logger.Errorf("OnMsgReceived get conn from pool failed, err=%s", err.Error())
			time.Sleep(100 * time.Millisecond) // maybe network is broken, just wait a moment
			continue
		}

		_ = conn.SetReadDeadline(time.Now().Add(3 * time.Second))
		n, err := io.ReadFull(conn, hdrBuf)
		if err == io.EOF {
			time.Sleep(100 * time.Millisecond) // maybe network is broken, just wait a moment
			continue
		}
		if errors.Is(err, os.ErrDeadlineExceeded) { // timeout, read no data
			continue
		}
		if err != nil { // broke pipe
			// EADDRNOTAVAIL
			connpool.ReconnectTrigger() <- struct{}{}
			logger.Errorf("OnMsgReceived broke pipe, err=%s", err.Error())
			continue
		}
		if n < constants.TransportHeaderSize {
			logger.Errorf("Read header from connection failed, read bytes=%d but expect bytes=%d", n, constants.TransportHeaderSize)
			continue
		}

		dataLen = binary.BigEndian.Uint32(hdrBuf)
		dataBuf := make([]byte, dataLen)
		n, err = io.ReadFull(conn, dataBuf)
		if err == io.EOF {
			continue
		}
		if n < int(dataLen) {
			logger.Errorf("Read payload from connection failed, read bytes=%d but expect bytes=%d", n, dataLen)
			continue
		}

		akkaMsg, err := trans.ReadAkkaMsg(dataBuf)
		if err != nil {
			logger.Errorf("Read raw buffer data failed, err=%s", err.Error())
			continue
		}
		msg, senderPath, err := codec.DecodeAkkaMessage(akkaMsg)
		if err != nil {
			logger.Errorf("Read akka message failed, err=%s", err.Error())
			continue
		}

		switch msg := msg.(type) {
		case *akka.AkkaControlMessage:
			if int32(msg.GetCommandType()) == int32(akka.CommandType_HEARTBEAT) {
				logger.Debugf("Receive heartbeat from server, heartbeat=%+v", msg)
				continue
			} else {
				logger.Warnf("Receive unexpect control message from server, message=%+v", msg)
				continue
			}
		case *schedulerx.ServerSubmitJobInstanceRequest:
			actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
		case *schedulerx.ServerKillJobInstanceRequest:
			actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
		case *schedulerx.ServerKillTaskRequest:
			actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
		case *schedulerx.ServerRetryTasksRequest:
			actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
		case *schedulerx.WorkerReportJobInstanceStatusResponse:
			logger.Debugf("Receive WorkerReportJobInstanceStatusResponse from server, resp=%+v", msg)
			continue
		case *schedulerx.WorkerHeartBeatResponse:
			logger.Debugf("Receive heartbeat from server, heartbeat=%+v", msg)
			continue
		default:
			logger.Errorf("Unknown msg type, msg=%+v", msg)
			continue
		}
	}
}