in exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java [51:182]
public void runCommand(RpcCommand cmd) {
final int rpcType = cmd.getRpcType().getNumber();
final ControlMessageHandler messageHandler = config.getMessageHandler();
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Received bit com message of type {} over local connection manager", rpcType);
}
switch (rpcType) {
case BitControl.RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd);
final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener();
final Ack ackResponse = messageHandler.cancelFragment(signalFragment.getMessage());
outcomeListener.success(ackResponse, null);
break;
}
case BitControl.RpcType.REQ_CUSTOM_VALUE: {
final ByteBuf[] dataBodies;
final RpcOutcomeListener<BitControl.CustomMessage> outcomeListener;
if (cmd instanceof ControlTunnel.CustomMessageSender) {
dataBodies = ((ControlTunnel.CustomMessageSender)cmd).getDataBodies();
outcomeListener = ((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener();
} else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) {
dataBodies = ((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies();
outcomeListener = ((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener();
} else {
throw new UnsupportedOperationException("Unknown Custom Type control message received");
}
DrillBuf reqDrillBuff;
try {
reqDrillBuff = convertToByteBuf(dataBodies);
} catch (Exception ex) {
outcomeListener.failed(new RpcException("Failed to allocate memory while sending request in " +
"LocalControlConnectionManager#convertToByteBuff", ex));
return;
} finally {
releaseByteBuf(dataBodies);
}
try {
BitControl.CustomMessage message = (BitControl.CustomMessage) cmd.getMessage();
final Response response = messageHandler.getHandlerRegistry().handle(message, reqDrillBuff);
DrillBuf responseBuffer;
try {
responseBuffer = convertToByteBuf(response.dBodies);
} catch (Exception ex) {
outcomeListener.failed(new RpcException("Failed to allocate memory while sending response in " +
"LocalControlConnectionManager#convertToByteBuff", ex));
return;
} finally {
releaseByteBuf(response.dBodies);
}
// Passed responseBuffer will be owned by consumer
outcomeListener.success((BitControl.CustomMessage)response.pBody, responseBuffer);
} catch (RpcException ex) {
cmd.getOutcomeListener().failed(ex);
} finally {
// Release the reqDrillBuff passed into handler
releaseByteBuf(reqDrillBuff);
}
break;
}
case BitControl.RpcType.REQ_RECEIVER_FINISHED_VALUE: {
final ControlTunnel.ReceiverFinished receiverFinished = ((ControlTunnel.ReceiverFinished) cmd);
final RpcOutcomeListener<Ack> outcomeListener = receiverFinished.getOutcomeListener();
final Ack ackResponse = messageHandler.receivingFragmentFinished(receiverFinished.getMessage());
outcomeListener.success(ackResponse, null);
break;
}
case BitControl.RpcType.REQ_FRAGMENT_STATUS_VALUE: {
final ControlTunnel.SendFragmentStatus fragmentStatus = ((ControlTunnel.SendFragmentStatus) cmd);
final RpcOutcomeListener<Ack> outcomeListener = fragmentStatus.getOutcomeListener();
final Ack ackResponse = messageHandler.requestFragmentStatus(fragmentStatus.getMessage());
outcomeListener.success(ackResponse, null);
break;
}
case BitControl.RpcType.REQ_QUERY_CANCEL_VALUE: {
final ControlTunnel.CancelQuery cancelQuery = ((ControlTunnel.CancelQuery) cmd);
final RpcOutcomeListener<Ack> outcomeListener = cancelQuery.getOutcomeListener();
final Ack ackResponse = messageHandler.requestQueryCancel(cancelQuery.getMessage());
outcomeListener.success(ackResponse, null);
break;
}
case BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
final ControlTunnel.SendFragment sendFragment = ((ControlTunnel.SendFragment) cmd);
final RpcOutcomeListener<Ack> outcomeListener = sendFragment.getOutcomeListener();
try {
final Ack ackResponse = messageHandler.initializeFragment(sendFragment.getMessage());
outcomeListener.success(ackResponse, null);
} catch (RpcException ex) {
outcomeListener.failed(ex);
}
break;
}
case BitControl.RpcType.REQ_QUERY_STATUS_VALUE: {
final ControlTunnel.RequestProfile requestProfile = ((ControlTunnel.RequestProfile) cmd);
final RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener = requestProfile.getOutcomeListener();
try {
final UserBitShared.QueryProfile profile = messageHandler.requestQueryStatus(requestProfile.getMessage());
outcomeListener.success(profile, null);
} catch (RpcException ex) {
outcomeListener.failed(ex);
}
break;
}
case BitControl.RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd);
final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener();
final Ack ackResponse = messageHandler.resumeFragment(signalFragment.getMessage());
outcomeListener.success(ackResponse, null);
break;
}
default:
final RpcException rpcException = new RpcException(String.format("Unsupported control request type %s " +
"received on LocalControlConnectionManager", rpcType));
cmd.getOutcomeListener().failed(rpcException);
}
}