in hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java [362:464]
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
Exception exception) {
HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
switch (fn.getFunctionId()) {
case GET_CLUSTER_CONTROLLER_INFO: {
try {
handle.send(mid, info, null);
} catch (IPCException e) {
e.printStackTrace();
}
return;
}
case GET_JOB_STATUS: {
HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
new IPCResponder<JobStatus>(handle, mid)));
return;
}
case GET_JOB_INFO: {
HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
new IPCResponder<JobInfo>(handle, mid)));
return;
}
case START_JOB: {
HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = createJobId();
workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
return;
}
case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
new IPCResponder<NetworkAddress>(handle, mid)));
return;
}
case GET_DATASET_RESULT_STATUS: {
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
return;
}
case GET_DATASET_RESULT_LOCATIONS: {
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
return;
}
case WAIT_FOR_COMPLETION: {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
new IPCResponder<Object>(handle, mid)));
return;
}
case GET_NODE_CONTROLLERS_INFO: {
workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
new IPCResponder<Map<String, NodeControllerInfo>>(handle, mid)));
return;
}
case GET_CLUSTER_TOPOLOGY: {
try {
handle.send(mid, ccContext.getClusterTopology(), null);
} catch (IPCException e) {
e.printStackTrace();
}
return;
}
case CLI_DEPLOY_BINARY: {
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
dbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
return;
}
case CLI_UNDEPLOY_BINARY: {
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
new IPCResponder<DeploymentId>(handle, mid)));
return;
}
case CLUSTER_SHUTDOWN: {
workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this,
new IPCResponder<Boolean>(handle, mid)));
return;
}
}
try {
handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
} catch (IPCException e) {
e.printStackTrace();
}
}