in hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java [469:598]
public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
Exception exception) {
CCNCFunctions.Function fn = (Function) payload;
switch (fn.getFunctionId()) {
case REGISTER_NODE: {
CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
return;
}
case UNREGISTER_NODE: {
CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
return;
}
case NODE_HEARTBEAT: {
CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(),
nhf.getHeartbeatData()));
return;
}
case NOTIFY_JOBLET_CLEANUP: {
CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(),
njcf.getNodeId()));
return;
}
case NOTIFY_DEPLOY_BINARY: {
CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(),
ndbf.getNodeId(), ndbf.getDeploymentStatus()));
return;
}
case REPORT_PROFILE: {
CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
return;
}
case NOTIFY_TASK_COMPLETE: {
CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(),
ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
return;
}
case NOTIFY_TASK_FAILURE: {
CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(),
ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
return;
}
case REGISTER_PARTITION_PROVIDER: {
CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
rppf.getPartitionDescriptor()));
return;
}
case REGISTER_PARTITION_REQUEST: {
CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
rprf.getPartitionRequest()));
return;
}
case REGISTER_RESULT_PARTITION_LOCATION: {
CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
return;
}
case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
return;
}
case REPORT_RESULT_PARTITION_FAILURE: {
CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
return;
}
case SEND_APPLICATION_MESSAGE: {
CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
rsf.getDeploymentId(), rsf.getNodeId()));
return;
}
case GET_NODE_CONTROLLERS_INFO: {
workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
new IResultCallback<Map<String, NodeControllerInfo>>() {
@Override
public void setValue(Map<String, NodeControllerInfo> result) {
new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
.setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
}
@Override
public void setException(Exception e) {
}
}));
return;
}
case STATE_DUMP_RESPONSE: {
CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
workQueue.schedule(new NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(),
dsrf.getStateDumpId(), dsrf.getState()));
return;
}
case SHUTDOWN_RESPONSE: {
CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
workQueue.schedule(new NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId()));
return;
}
}
LOGGER.warning("Unknown function: " + fn.getFunctionId());
}