internal/remoting/handler.go (95 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remoting
import (
"context"
"encoding/binary"
"errors"
"io"
"os"
"time"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
"github.com/alibaba/schedulerx-worker-go/internal/proto/akka"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/codec"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/pool"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/trans"
"github.com/alibaba/schedulerx-worker-go/logger"
)
func OnMsgReceived(ctx context.Context) {
connpool := pool.GetConnPool()
var dataLen uint32
hdrBuf := make([]byte, constants.TransportHeaderSize)
for {
conn, err := connpool.Get(ctx)
if err != nil {
logger.Errorf("OnMsgReceived get conn from pool failed, err=%s", err.Error())
time.Sleep(100 * time.Millisecond) // maybe network is broken, just wait a moment
continue
}
_ = conn.SetReadDeadline(time.Now().Add(3 * time.Second))
n, err := io.ReadFull(conn, hdrBuf)
if err == io.EOF {
time.Sleep(100 * time.Millisecond) // maybe network is broken, just wait a moment
continue
}
if errors.Is(err, os.ErrDeadlineExceeded) { // timeout, read no data
continue
}
if err != nil { // broke pipe
// EADDRNOTAVAIL
connpool.ReconnectTrigger() <- struct{}{}
logger.Errorf("OnMsgReceived broke pipe, err=%s", err.Error())
continue
}
if n < constants.TransportHeaderSize {
logger.Errorf("Read header from connection failed, read bytes=%d but expect bytes=%d", n, constants.TransportHeaderSize)
continue
}
dataLen = binary.BigEndian.Uint32(hdrBuf)
dataBuf := make([]byte, dataLen)
n, err = io.ReadFull(conn, dataBuf)
if err == io.EOF {
continue
}
if n < int(dataLen) {
logger.Errorf("Read payload from connection failed, read bytes=%d but expect bytes=%d", n, dataLen)
continue
}
akkaMsg, err := trans.ReadAkkaMsg(dataBuf)
if err != nil {
logger.Errorf("Read raw buffer data failed, err=%s", err.Error())
continue
}
msg, senderPath, err := codec.DecodeAkkaMessage(akkaMsg)
if err != nil {
logger.Errorf("Read akka message failed, err=%s", err.Error())
continue
}
switch msg := msg.(type) {
case *akka.AkkaControlMessage:
if int32(msg.GetCommandType()) == int32(akka.CommandType_HEARTBEAT) {
logger.Debugf("Receive heartbeat from server, heartbeat=%+v", msg)
continue
} else {
logger.Warnf("Receive unexpect control message from server, message=%+v", msg)
continue
}
case *schedulerx.ServerSubmitJobInstanceRequest:
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
case *schedulerx.ServerKillJobInstanceRequest:
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
case *schedulerx.ServerKillTaskRequest:
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
case *schedulerx.ServerRetryTasksRequest:
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
case *schedulerx.WorkerReportJobInstanceStatusResponse:
logger.Debugf("Receive WorkerReportJobInstanceStatusResponse from server, resp=%+v", msg)
continue
case *schedulerx.WorkerHeartBeatResponse:
logger.Debugf("Receive heartbeat from server, heartbeat=%+v", msg)
continue
default:
logger.Errorf("Unknown msg type, msg=%+v", msg)
continue
}
}
}