public void runCommand()

in exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java [51:182]


  public void runCommand(RpcCommand cmd) {
    final int rpcType = cmd.getRpcType().getNumber();
    final ControlMessageHandler messageHandler = config.getMessageHandler();

    if (RpcConstants.EXTRA_DEBUGGING) {
      logger.debug("Received bit com message of type {} over local connection manager", rpcType);
    }

    switch (rpcType) {

      case BitControl.RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
        final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd);
        final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener();
        final Ack ackResponse = messageHandler.cancelFragment(signalFragment.getMessage());
        outcomeListener.success(ackResponse, null);
        break;
      }

      case BitControl.RpcType.REQ_CUSTOM_VALUE: {
        final ByteBuf[] dataBodies;
        final RpcOutcomeListener<BitControl.CustomMessage> outcomeListener;

        if (cmd instanceof ControlTunnel.CustomMessageSender) {
          dataBodies = ((ControlTunnel.CustomMessageSender)cmd).getDataBodies();
          outcomeListener = ((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener();
        } else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) {
          dataBodies = ((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies();
          outcomeListener = ((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener();
        } else {
          throw new UnsupportedOperationException("Unknown Custom Type control message received");
        }

        DrillBuf reqDrillBuff;
        try {
          reqDrillBuff = convertToByteBuf(dataBodies);
        } catch (Exception ex) {
          outcomeListener.failed(new RpcException("Failed to allocate memory while sending request in " +
            "LocalControlConnectionManager#convertToByteBuff", ex));
          return;
        } finally {
          releaseByteBuf(dataBodies);
        }

        try {
          BitControl.CustomMessage message = (BitControl.CustomMessage) cmd.getMessage();
          final Response response = messageHandler.getHandlerRegistry().handle(message, reqDrillBuff);
          DrillBuf responseBuffer;
          try {
            responseBuffer = convertToByteBuf(response.dBodies);
          } catch (Exception ex) {
            outcomeListener.failed(new RpcException("Failed to allocate memory while sending response in " +
              "LocalControlConnectionManager#convertToByteBuff", ex));
            return;
          } finally {
            releaseByteBuf(response.dBodies);
          }

          // Passed responseBuffer will be owned by consumer
          outcomeListener.success((BitControl.CustomMessage)response.pBody, responseBuffer);
        } catch (RpcException ex) {
          cmd.getOutcomeListener().failed(ex);
        } finally {
          // Release the reqDrillBuff passed into handler
          releaseByteBuf(reqDrillBuff);
        }
        break;
      }

      case BitControl.RpcType.REQ_RECEIVER_FINISHED_VALUE: {
        final ControlTunnel.ReceiverFinished receiverFinished = ((ControlTunnel.ReceiverFinished) cmd);
        final RpcOutcomeListener<Ack> outcomeListener = receiverFinished.getOutcomeListener();
        final Ack ackResponse = messageHandler.receivingFragmentFinished(receiverFinished.getMessage());
        outcomeListener.success(ackResponse, null);
        break;
      }

      case BitControl.RpcType.REQ_FRAGMENT_STATUS_VALUE: {
        final ControlTunnel.SendFragmentStatus fragmentStatus = ((ControlTunnel.SendFragmentStatus) cmd);
        final RpcOutcomeListener<Ack> outcomeListener = fragmentStatus.getOutcomeListener();
        final Ack ackResponse = messageHandler.requestFragmentStatus(fragmentStatus.getMessage());
        outcomeListener.success(ackResponse, null);
        break;
      }

      case BitControl.RpcType.REQ_QUERY_CANCEL_VALUE: {
        final ControlTunnel.CancelQuery cancelQuery = ((ControlTunnel.CancelQuery) cmd);
        final RpcOutcomeListener<Ack> outcomeListener = cancelQuery.getOutcomeListener();
        final Ack ackResponse = messageHandler.requestQueryCancel(cancelQuery.getMessage());
        outcomeListener.success(ackResponse, null);
        break;
      }

      case BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
        final ControlTunnel.SendFragment sendFragment = ((ControlTunnel.SendFragment) cmd);
        final RpcOutcomeListener<Ack> outcomeListener = sendFragment.getOutcomeListener();

        try {
          final Ack ackResponse = messageHandler.initializeFragment(sendFragment.getMessage());
          outcomeListener.success(ackResponse, null);
        } catch (RpcException ex) {
          outcomeListener.failed(ex);
        }
        break;
      }

      case BitControl.RpcType.REQ_QUERY_STATUS_VALUE: {
        final ControlTunnel.RequestProfile requestProfile = ((ControlTunnel.RequestProfile) cmd);
        final RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener = requestProfile.getOutcomeListener();

        try {
          final UserBitShared.QueryProfile profile = messageHandler.requestQueryStatus(requestProfile.getMessage());
          outcomeListener.success(profile, null);
        } catch (RpcException ex) {
          outcomeListener.failed(ex);
        }
        break;
      }

      case BitControl.RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
        final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd);
        final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener();
        final Ack ackResponse = messageHandler.resumeFragment(signalFragment.getMessage());
        outcomeListener.success(ackResponse, null);
        break;
      }

      default:
        final RpcException rpcException = new RpcException(String.format("Unsupported control request type %s " +
          "received on LocalControlConnectionManager", rpcType));
        cmd.getOutcomeListener().failed(rpcException);
    }
  }