internal/actor/heartbeat_processor.go (92 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package actor
import (
"fmt"
"github.com/asynkron/protoactor-go/actor"
"google.golang.org/protobuf/proto"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/proto/akka"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/codec"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/pool"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/trans"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
)
var _ actor.Process = &heartbeatProcessor{}
type heartbeatProcessor struct {
connpool pool.ConnPool
}
func newHeartbeatProcessor(connpool pool.ConnPool) actor.Process {
return &heartbeatProcessor{
connpool: connpool,
}
}
func (p *heartbeatProcessor) SendUserMessage(pid *actor.PID, message interface{}) {
if actorcomm.IsSchedulerxServer(pid) {
var (
akkaMsg *akka.AkkaProtocolMessage
err error
)
wrappedMsg, ok := message.(*actorcomm.SchedulerWrappedMsg)
if !ok {
logger.Errorf("Get unknown message, msg=%+v", wrappedMsg)
return
}
conn, err := p.connpool.Get(wrappedMsg.Ctx)
if err != nil {
logger.Errorf("Get conn from pool failed, err=%s", err.Error())
return
}
switch msg := wrappedMsg.Msg.(type) {
case *schedulerx.ServerSubmitJobInstanceResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$ServerSubmitJobInstanceResponse")
case *schedulerx.ServerKillJobInstanceResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$ServerKillJobInstanceResponse")
case *schedulerx.WorkerReportJobInstanceStatusRequest:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
fmt.Sprintf("akka.tcp://server@%s/", conn.RemoteAddr().String()),
fmt.Sprintf("akka.tcp://%s@%s/user/at_least_once_delivery_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Worker$WorkerReportJobInstanceStatusRequest",
codec.WithMessageContainerSerializer(),
codec.WithSelectionEnvelopePattern([]*akka.Selection{
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("user"),
},
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("instance_status_router"),
},
}))
default:
logger.Errorf("Unknown akka message type=%+v", msg)
return
}
if err != nil {
logger.Errorf("Encode akka message failed, err=%s", err.Error())
return
}
if err := trans.WriteAkkaMsg(akkaMsg, conn); err != nil {
logger.Errorf("Write akka message failed, err=%s", err.Error())
return
}
}
}
func (p *heartbeatProcessor) SendSystemMessage(pid *actor.PID, message interface{}) {
switch msg := message.(type) {
default:
logger.Errorf("Unknown akka message type=%+v", msg)
return
}
}
func (p *heartbeatProcessor) Stop(pid *actor.PID) {
// do nothing
}