in internal/actor/job_instance_actor.go [68:115]
func (a *jobInstanceActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *schedulerx.WorkerBatchReportTaskStatuesResponse:
// send to atLeastOnceDeliveryRoutingActor
// actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
// Msg: msg,
// }
// FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s
a.handleReportWorkerStatus(ctx, ctx.Message())
case *schedulerx.WorkerReportJobInstanceStatusResponse:
// send to atLeastOnceDeliveryRoutingActor
// actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
// Msg: msg,
// }
// FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s
a.handleReportWorkerStatus(ctx, ctx.Message())
case *actorcomm.SchedulerWrappedMsg:
switch innerMsg := msg.Msg.(type) {
case *schedulerx.ServerSubmitJobInstanceRequest:
if err := a.handleSubmitJobInstance(ctx, msg); err != nil {
logger.Errorf("handleSubmitJobInstanceRequest failed, err=%s", err.Error())
}
case *schedulerx.ServerKillJobInstanceRequest:
if err := a.handleKillJobInstance(ctx, msg); err != nil {
logger.Errorf("handleKillJobInstanceRequest failed, err=%s", err.Error())
}
case *schedulerx.ServerRetryTasksRequest:
a.handleRetryTasks(ctx, msg)
case *schedulerx.ServerKillTaskRequest:
a.handleKillTask(ctx, msg)
case *schedulerx.ServerCheckTaskMasterRequest:
a.handCheckTaskMaster(ctx, msg)
case *schedulerx.MasterNotifyWorkerPullRequest:
a.handleInitPull(ctx, msg)
case *schedulerx.WorkerReportJobInstanceStatusRequest:
// forward to server
ctx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), msg)
case *schedulerx.WorkerReportJobInstanceStatusResponse, *schedulerx.WorkerBatchReportTaskStatuesResponse:
// send to atLeastOnceDeliveryRoutingActor
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: innerMsg,
SenderPath: msg.SenderPath,
}
default:
logger.Errorf("Receive unknown message in jobInstanceActor, msg=%+v", msg)
}
}
}