public void deliverIncomingMessage()

in hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java [362:464]


        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
                Exception exception) {
            HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
            switch (fn.getFunctionId()) {
                case GET_CLUSTER_CONTROLLER_INFO: {
                    try {
                        handle.send(mid, info, null);
                    } catch (IPCException e) {
                        e.printStackTrace();
                    }
                    return;
                }

                case GET_JOB_STATUS: {
                    HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
                    workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
                            new IPCResponder<JobStatus>(handle, mid)));
                    return;
                }

                case GET_JOB_INFO: {
                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
                    workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
                            new IPCResponder<JobInfo>(handle, mid)));
                    return;
                }

                case START_JOB: {
                    HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                    JobId jobId = createJobId();
                    workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
                            sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
                    return;
                }

                case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
                    workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
                            new IPCResponder<NetworkAddress>(handle, mid)));
                    return;
                }

                case GET_DATASET_RESULT_STATUS: {
                    HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
                    workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
                            gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
                    return;
                }

                case GET_DATASET_RESULT_LOCATIONS: {
                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
                    workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
                            gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
                            new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
                    return;
                }

                case WAIT_FOR_COMPLETION: {
                    HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
                    workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
                            new IPCResponder<Object>(handle, mid)));
                    return;
                }

                case GET_NODE_CONTROLLERS_INFO: {
                    workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
                            new IPCResponder<Map<String, NodeControllerInfo>>(handle, mid)));
                    return;
                }

                case GET_CLUSTER_TOPOLOGY: {
                    try {
                        handle.send(mid, ccContext.getClusterTopology(), null);
                    } catch (IPCException e) {
                        e.printStackTrace();
                    }
                    return;
                }

                case CLI_DEPLOY_BINARY: {
                    HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
                    workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
                            dbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
                    return;
                }

                case CLI_UNDEPLOY_BINARY: {
                    HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
                    workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
                            new IPCResponder<DeploymentId>(handle, mid)));
                    return;
                }
                case CLUSTER_SHUTDOWN: {
                    workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this,
                            new IPCResponder<Boolean>(handle, mid)));
                    return;
                }
            }
            try {
                handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
            } catch (IPCException e) {
                e.printStackTrace();
            }
        }