in internal/actor/task_actor.go [63:153]
func (a *taskActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *schedulerx.ContainerReportTaskStatusRequest:
if err := a.handleTaskStatus(msg); err != nil {
logger.Errorf("handleTaskStatus failed, err=%s", err.Error())
}
case *schedulerx.ContainerBatchReportTaskStatuesRequest:
// send to atLeastOnceDeliveryRoutingActor
a.handleBatchTaskStatues(ctx, msg)
case *schedulerx.WorkerMapTaskRequest:
if err := a.handleMapTask(ctx, msg); err != nil {
logger.Errorf("handleMapTask failed, err=%s", err.Error())
}
case *schedulerx.WorkerMapTaskResponse:
actorcomm.WorkerMapTaskRespMsgSender() <- msg
case *schedulerx.PullTaskFromMasterRequest:
a.handlePullTasks(ctx, msg)
case *actorcomm.SchedulerWrappedMsg:
switch innerMsg := msg.Msg.(type) {
case *schedulerx.ContainerBatchReportTaskStatuesRequest:
// send to atLeastOnceDeliveryRoutingActor
a.handleBatchTaskStatues(ctx, innerMsg)
case *schedulerx.WorkerMapTaskResponse:
actorcomm.WorkerMapTaskRespMsgSender() <- innerMsg
case *schedulerx.PullTaskFromMasterRequest:
a.handlePullTasks(ctx, innerMsg)
case *schedulerx.WorkerReportJobInstanceProgressRequest:
// forward to server
ctx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), msg)
case *schedulerx.WorkerBatchUpdateTaskStatusRequest:
// forward to server
serverPid := actorcomm.SchedulerxServerPid(context.Background())
result, err := ctx.RequestFuture(serverPid, msg, 5*time.Second).Result()
if err != nil {
logger.Errorf("Send WorkerBatchUpdateTaskStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
} else {
actorcomm.WorkerBatchUpdateTaskStatusRespMsgSender() <- result.(*schedulerx.WorkerBatchUpdateTaskStatusResponse)
}
case *schedulerx.WorkerQueryJobInstanceStatusRequest:
// forward to server
serverPid := actorcomm.SchedulerxServerPid(context.Background())
result, err := ctx.RequestFuture(serverPid, msg, 30*time.Second).Result()
if err != nil {
logger.Errorf("Send WorkerQueryJobInstanceStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
} else {
actorcomm.WorkerQueryJobInstanceStatusRespMsgSender() <- result.(*schedulerx.WorkerQueryJobInstanceStatusResponse)
}
case *schedulerx.WorkerClearTasksRequest:
// forward to server
serverPid := actorcomm.SchedulerxServerPid(context.Background())
result, err := ctx.RequestFuture(serverPid, msg, 5*time.Second).Result()
if err != nil {
logger.Errorf("Send WorkerClearTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
} else {
actorcomm.WorkerClearTasksRespMsgSender() <- result.(*schedulerx.WorkerClearTasksResponse)
}
case *schedulerx.WorkerBatchCreateTasksRequest:
// forward to server
serverPid := actorcomm.SchedulerxServerPid(context.Background())
result, err := ctx.RequestFuture(serverPid, msg, 90*time.Second).Result()
if err != nil {
logger.Errorf("Send WorkerBatchCreateTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
} else {
actorcomm.WorkerBatchCreateTasksRespMsgSender() <- result.(*schedulerx.WorkerBatchCreateTasksResponse)
}
case *schedulerx.WorkerPullTasksRequest:
// forward to server
serverPid := actorcomm.SchedulerxServerPid(context.Background())
result, err := ctx.RequestFuture(serverPid, msg, 30*time.Second).Result()
if err != nil {
logger.Errorf("Send WorkerPullTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
} else {
actorcomm.WorkerPullTasksRespMsgSender() <- result.(*schedulerx.WorkerPullTasksResponse)
}
case *schedulerx.WorkerBatchReportTaskStatuesRequest:
// forward to server
serverPid := actorcomm.SchedulerxServerPid(context.Background())
ctx.Send(serverPid, msg)
case *schedulerx.WorkerReportTaskListStatusRequest:
serverPid := actorcomm.SchedulerxServerPid(context.Background())
result, err := ctx.RequestFuture(serverPid, innerMsg, 30*time.Second).Result()
if err != nil {
logger.Errorf("Send WorkerReportTaskListStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
} else {
actorcomm.WorkerReportTaskListStatusRespMsgSender() <- result.(*schedulerx.WorkerReportTaskListStatusResponse)
}
default:
logger.Errorf("Receive unknown message in taskActor, msg=%+v", msg)
}
}
}