func()

in internal/actor/job_instance_actor.go [68:115]


func (a *jobInstanceActor) Receive(ctx actor.Context) {
	switch msg := ctx.Message().(type) {
	case *schedulerx.WorkerBatchReportTaskStatuesResponse:
		// send to atLeastOnceDeliveryRoutingActor
		// actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
		//	Msg: msg,
		// }
		// FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s
		a.handleReportWorkerStatus(ctx, ctx.Message())
	case *schedulerx.WorkerReportJobInstanceStatusResponse:
		// send to atLeastOnceDeliveryRoutingActor
		// actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
		//	Msg: msg,
		// }
		// FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s
		a.handleReportWorkerStatus(ctx, ctx.Message())
	case *actorcomm.SchedulerWrappedMsg:
		switch innerMsg := msg.Msg.(type) {
		case *schedulerx.ServerSubmitJobInstanceRequest:
			if err := a.handleSubmitJobInstance(ctx, msg); err != nil {
				logger.Errorf("handleSubmitJobInstanceRequest failed, err=%s", err.Error())
			}
		case *schedulerx.ServerKillJobInstanceRequest:
			if err := a.handleKillJobInstance(ctx, msg); err != nil {
				logger.Errorf("handleKillJobInstanceRequest failed, err=%s", err.Error())
			}
		case *schedulerx.ServerRetryTasksRequest:
			a.handleRetryTasks(ctx, msg)
		case *schedulerx.ServerKillTaskRequest:
			a.handleKillTask(ctx, msg)
		case *schedulerx.ServerCheckTaskMasterRequest:
			a.handCheckTaskMaster(ctx, msg)
		case *schedulerx.MasterNotifyWorkerPullRequest:
			a.handleInitPull(ctx, msg)
		case *schedulerx.WorkerReportJobInstanceStatusRequest:
			// forward to server
			ctx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), msg)
		case *schedulerx.WorkerReportJobInstanceStatusResponse, *schedulerx.WorkerBatchReportTaskStatuesResponse:
			// send to atLeastOnceDeliveryRoutingActor
			actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
				Msg:        innerMsg,
				SenderPath: msg.SenderPath,
			}
		default:
			logger.Errorf("Receive unknown message in jobInstanceActor, msg=%+v", msg)
		}
	}
}