public void deliverIncomingMessage()

in hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java [469:598]


        public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
                Exception exception) {
            CCNCFunctions.Function fn = (Function) payload;
            switch (fn.getFunctionId()) {
                case REGISTER_NODE: {
                    CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
                    workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
                    return;
                }

                case UNREGISTER_NODE: {
                    CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
                    workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
                    return;
                }

                case NODE_HEARTBEAT: {
                    CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
                    workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(),
                            nhf.getHeartbeatData()));
                    return;
                }

                case NOTIFY_JOBLET_CLEANUP: {
                    CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
                    workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(),
                            njcf.getNodeId()));
                    return;
                }

                case NOTIFY_DEPLOY_BINARY: {
                    CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
                    workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(),
                            ndbf.getNodeId(), ndbf.getDeploymentStatus()));
                    return;
                }

                case REPORT_PROFILE: {
                    CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
                    workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
                    return;
                }

                case NOTIFY_TASK_COMPLETE: {
                    CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
                    workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(),
                            ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
                    return;
                }
                case NOTIFY_TASK_FAILURE: {
                    CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
                    workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(),
                            ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
                    return;
                }

                case REGISTER_PARTITION_PROVIDER: {
                    CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
                    workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
                            rppf.getPartitionDescriptor()));
                    return;
                }

                case REGISTER_PARTITION_REQUEST: {
                    CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
                    workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
                            rprf.getPartitionRequest()));
                    return;
                }

                case REGISTER_RESULT_PARTITION_LOCATION: {
                    CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                    workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
                            rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
                    return;
                }

                case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
                    CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
                    workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
                    return;
                }

                case REPORT_RESULT_PARTITION_FAILURE: {
                    CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
                    workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
                    return;
                }

                case SEND_APPLICATION_MESSAGE: {
                    CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
                    workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
                            rsf.getDeploymentId(), rsf.getNodeId()));
                    return;
                }

                case GET_NODE_CONTROLLERS_INFO: {
                    workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
                            new IResultCallback<Map<String, NodeControllerInfo>>() {
                                @Override
                                public void setValue(Map<String, NodeControllerInfo> result) {
                                    new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
                                            .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
                                }

                                @Override
                                public void setException(Exception e) {

                                }
                            }));
                    return;
                }

                case STATE_DUMP_RESPONSE: {
                    CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
                    workQueue.schedule(new NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(),
                            dsrf.getStateDumpId(), dsrf.getState()));
                    return;
                }
                case SHUTDOWN_RESPONSE: {
                    CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
                    workQueue.schedule(new NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId()));
                    return;
                }
            }
            LOGGER.warning("Unknown function: " + fn.getFunctionId());
        }