func()

in internal/actor/task_actor.go [63:153]


func (a *taskActor) Receive(ctx actor.Context) {
	switch msg := ctx.Message().(type) {
	case *schedulerx.ContainerReportTaskStatusRequest:
		if err := a.handleTaskStatus(msg); err != nil {
			logger.Errorf("handleTaskStatus failed, err=%s", err.Error())
		}
	case *schedulerx.ContainerBatchReportTaskStatuesRequest:
		// send to atLeastOnceDeliveryRoutingActor
		a.handleBatchTaskStatues(ctx, msg)
	case *schedulerx.WorkerMapTaskRequest:
		if err := a.handleMapTask(ctx, msg); err != nil {
			logger.Errorf("handleMapTask failed, err=%s", err.Error())
		}
	case *schedulerx.WorkerMapTaskResponse:
		actorcomm.WorkerMapTaskRespMsgSender() <- msg
	case *schedulerx.PullTaskFromMasterRequest:
		a.handlePullTasks(ctx, msg)
	case *actorcomm.SchedulerWrappedMsg:
		switch innerMsg := msg.Msg.(type) {
		case *schedulerx.ContainerBatchReportTaskStatuesRequest:
			// send to atLeastOnceDeliveryRoutingActor
			a.handleBatchTaskStatues(ctx, innerMsg)
		case *schedulerx.WorkerMapTaskResponse:
			actorcomm.WorkerMapTaskRespMsgSender() <- innerMsg
		case *schedulerx.PullTaskFromMasterRequest:
			a.handlePullTasks(ctx, innerMsg)
		case *schedulerx.WorkerReportJobInstanceProgressRequest:
			// forward to server
			ctx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), msg)
		case *schedulerx.WorkerBatchUpdateTaskStatusRequest:
			// forward to server
			serverPid := actorcomm.SchedulerxServerPid(context.Background())
			result, err := ctx.RequestFuture(serverPid, msg, 5*time.Second).Result()
			if err != nil {
				logger.Errorf("Send WorkerBatchUpdateTaskStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
			} else {
				actorcomm.WorkerBatchUpdateTaskStatusRespMsgSender() <- result.(*schedulerx.WorkerBatchUpdateTaskStatusResponse)
			}
		case *schedulerx.WorkerQueryJobInstanceStatusRequest:
			// forward to server
			serverPid := actorcomm.SchedulerxServerPid(context.Background())
			result, err := ctx.RequestFuture(serverPid, msg, 30*time.Second).Result()
			if err != nil {
				logger.Errorf("Send WorkerQueryJobInstanceStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
			} else {
				actorcomm.WorkerQueryJobInstanceStatusRespMsgSender() <- result.(*schedulerx.WorkerQueryJobInstanceStatusResponse)
			}
		case *schedulerx.WorkerClearTasksRequest:
			// forward to server
			serverPid := actorcomm.SchedulerxServerPid(context.Background())
			result, err := ctx.RequestFuture(serverPid, msg, 5*time.Second).Result()
			if err != nil {
				logger.Errorf("Send WorkerClearTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
			} else {
				actorcomm.WorkerClearTasksRespMsgSender() <- result.(*schedulerx.WorkerClearTasksResponse)
			}
		case *schedulerx.WorkerBatchCreateTasksRequest:
			// forward to server
			serverPid := actorcomm.SchedulerxServerPid(context.Background())
			result, err := ctx.RequestFuture(serverPid, msg, 90*time.Second).Result()
			if err != nil {
				logger.Errorf("Send WorkerBatchCreateTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
			} else {
				actorcomm.WorkerBatchCreateTasksRespMsgSender() <- result.(*schedulerx.WorkerBatchCreateTasksResponse)
			}
		case *schedulerx.WorkerPullTasksRequest:
			// forward to server
			serverPid := actorcomm.SchedulerxServerPid(context.Background())
			result, err := ctx.RequestFuture(serverPid, msg, 30*time.Second).Result()
			if err != nil {
				logger.Errorf("Send WorkerPullTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
			} else {
				actorcomm.WorkerPullTasksRespMsgSender() <- result.(*schedulerx.WorkerPullTasksResponse)
			}
		case *schedulerx.WorkerBatchReportTaskStatuesRequest:
			// forward to server
			serverPid := actorcomm.SchedulerxServerPid(context.Background())
			ctx.Send(serverPid, msg)
		case *schedulerx.WorkerReportTaskListStatusRequest:
			serverPid := actorcomm.SchedulerxServerPid(context.Background())
			result, err := ctx.RequestFuture(serverPid, innerMsg, 30*time.Second).Result()
			if err != nil {
				logger.Errorf("Send WorkerReportTaskListStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
			} else {
				actorcomm.WorkerReportTaskListStatusRespMsgSender() <- result.(*schedulerx.WorkerReportTaskListStatusResponse)
			}
		default:
			logger.Errorf("Receive unknown message in taskActor, msg=%+v", msg)
		}
	}
}