protected BrokerResponse doHandleRequest()

in pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java [358:770]


  protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
      JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
      @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
      throws Exception {
    // Compile the request into PinotQuery
    long compilationStartTimeNs = System.nanoTime();
    CompileResult compileResult =
          compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders,
              accessControl);

    if (compileResult._errorOrLiteralOnlyBrokerResponse != null) {
      /*
       * If the compileRequest method sets the BrokerResponse field, then it is either an error response or
       * a literal-only query. In either case, we can return the response directly.
       */
      return compileResult._errorOrLiteralOnlyBrokerResponse;
    }

    Schema schema = compileResult._schema;
    String tableName = compileResult._tableName;
    String rawTableName = compileResult._rawTableName;
    PinotQuery pinotQuery = compileResult._pinotQuery;
    PinotQuery serverPinotQuery = compileResult._serverPinotQuery;
    long compilationEndTimeNs = System.nanoTime();
    // full request compile time = compilationTimeNs + parserTimeNs
    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION,
        (compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs());

    // Second-stage table-level access control
    // TODO: Modify AccessControl interface to directly take PinotQuery
    BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
    BrokerRequest serverBrokerRequest =
        serverPinotQuery == pinotQuery ? brokerRequest : CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
    AuthorizationResult authorizationResult = accessControl.authorize(requesterIdentity, serverBrokerRequest);

    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
        System.nanoTime() - compilationEndTimeNs);

    if (!authorizationResult.hasAccess()) {
      throwAccessDeniedError(requestId, query, requestContext, tableName, authorizationResult);
    }

    // Get the tables hit by the request
    TableRouteProvider routeProvider = _implicitHybridTableRouteProvider;
    TableRouteInfo routeInfo = routeProvider.getTableRouteInfo(tableName, _tableCache, _routingManager);

    if (!routeInfo.isExists()) {
      LOGGER.info("Table not found for request {}: {}", requestId, query);
      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
    }

    if (!routeInfo.isRouteExists()) {
      LOGGER.info("No table matches for request {}: {}", requestId, query);
      requestContext.setErrorCode(QueryErrorCode.BROKER_RESOURCE_MISSING);
      _brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1);
      return BrokerResponseNative.NO_TABLE_RESULT;
    }

    String offlineTableName = routeInfo.getOfflineTableName();
    String realtimeTableName = routeInfo.getRealtimeTableName();
    TableConfig offlineTableConfig = routeInfo.getOfflineTableConfig();
    TableConfig realtimeTableConfig = routeInfo.getRealtimeTableConfig();
    TimeBoundaryInfo timeBoundaryInfo = routeInfo.getTimeBoundaryInfo();

    HandlerContext handlerContext = getHandlerContext(offlineTableConfig, realtimeTableConfig);
    validateGroovyScript(serverPinotQuery, handlerContext._disableGroovy);
    if (handlerContext._useApproximateFunction) {
      handleApproximateFunctionOverride(serverPinotQuery);
    }

    // Validate QPS quota
    String database = DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
    if (!_queryQuotaManager.acquireDatabase(database)) {
      String errorMessage =
          String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database);
      LOGGER.info(errorMessage);
      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
      return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage);
    }
    if (!_queryQuotaManager.acquire(tableName)) {
      String errorMessage =
          String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName);
      LOGGER.info(errorMessage);
      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
      _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
      return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage);
    }

    // Validate the request
    try {
      validateRequest(serverPinotQuery, _queryResponseLimit);
    } catch (Exception e) {
      LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage());
      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
      _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage());
    }

    _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1);
    _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1);
    _brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.REQUEST_SIZE, query.length());

    if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) {
      // Check if the query is a v2 supported query
      database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders);
      // Attempt to add the query to the compile queue; drop if queue is full
      if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) {
        LOGGER.trace("Not compiling query `{}` using the multi-stage query engine because the query queue is full",
            query);
      }
    }

    // Prepare OFFLINE and REALTIME requests
    BrokerRequest offlineBrokerRequest = null;
    BrokerRequest realtimeBrokerRequest = null;

    if (routeInfo.isHybrid()) {
      // Hybrid
      PinotQuery offlinePinotQuery = serverPinotQuery.deepCopy();
      offlinePinotQuery.getDataSource().setTableName(offlineTableName);
      assert timeBoundaryInfo != null;
      attachTimeBoundary(offlinePinotQuery, timeBoundaryInfo, true);
      handleExpressionOverride(offlinePinotQuery, _tableCache.getExpressionOverrideMap(offlineTableName));
      handleTimestampIndexOverride(offlinePinotQuery, offlineTableConfig);
      _queryOptimizer.optimize(offlinePinotQuery, offlineTableConfig, schema);
      offlineBrokerRequest = CalciteSqlCompiler.convertToBrokerRequest(offlinePinotQuery);

      PinotQuery realtimePinotQuery = serverPinotQuery.deepCopy();
      realtimePinotQuery.getDataSource().setTableName(realtimeTableName);
      attachTimeBoundary(realtimePinotQuery, timeBoundaryInfo, false);
      handleExpressionOverride(realtimePinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName));
      handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig);
      _queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig, schema);
      realtimeBrokerRequest = CalciteSqlCompiler.convertToBrokerRequest(realtimePinotQuery);

      requestContext.setFanoutType(RequestContext.FanoutType.HYBRID);
      requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
      requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
    } else if (routeInfo.isOffline()) {
      // OFFLINE only
      setTableName(serverBrokerRequest, offlineTableName);
      handleExpressionOverride(serverPinotQuery, _tableCache.getExpressionOverrideMap(offlineTableName));
      handleTimestampIndexOverride(serverPinotQuery, offlineTableConfig);
      _queryOptimizer.optimize(serverPinotQuery, offlineTableConfig, schema);
      offlineBrokerRequest = serverBrokerRequest;

      requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE);
      requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
    } else {
      // REALTIME only
      setTableName(serverBrokerRequest, realtimeTableName);
      handleExpressionOverride(serverPinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName));
      handleTimestampIndexOverride(serverPinotQuery, realtimeTableConfig);
      _queryOptimizer.optimize(serverPinotQuery, realtimeTableConfig, schema);
      realtimeBrokerRequest = serverBrokerRequest;

      requestContext.setFanoutType(RequestContext.FanoutType.REALTIME);
      requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
    }

    // Check if response can be sent without server query evaluation.
    if (offlineBrokerRequest != null && isFilterAlwaysFalse(offlineBrokerRequest.getPinotQuery())) {
      // We don't need to evaluate offline request
      offlineBrokerRequest = null;
    }
    if (realtimeBrokerRequest != null && isFilterAlwaysFalse(realtimeBrokerRequest.getPinotQuery())) {
      // We don't need to evaluate realtime request
      realtimeBrokerRequest = null;
    }

    if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
      return getEmptyBrokerOnlyResponse(pinotQuery, serverPinotQuery, requestContext, tableName, requesterIdentity,
          schema, query, database);
    }

    if (offlineBrokerRequest != null && isFilterAlwaysTrue(offlineBrokerRequest.getPinotQuery())) {
      // Drop offline request filter since it is always true
      offlineBrokerRequest.getPinotQuery().setFilterExpression(null);
    }
    if (realtimeBrokerRequest != null && isFilterAlwaysTrue(realtimeBrokerRequest.getPinotQuery())) {
      // Drop realtime request filter since it is always true
      realtimeBrokerRequest.getPinotQuery().setFilterExpression(null);
    }

    // Calculate routing table for the query
    // TODO: Modify RoutingManager interface to directly take PinotQuery
    long routingStartTimeNs = System.nanoTime();
    routeProvider.calculateRoutes(routeInfo, _routingManager, offlineBrokerRequest, realtimeBrokerRequest,
        requestId);

    Set<ServerInstance> offlineExecutionServers = routeInfo.getOfflineExecutionServers();
    Set<ServerInstance> realtimeExecutionServers = routeInfo.getRealtimeExecutionServers();
    List<String> unavailableSegments = routeInfo.getUnavailableSegments();
    int numPrunedSegmentsTotal = routeInfo.getNumPrunedSegmentsTotal();

    // Rewrite the broker requests as the rest of the code expects them to be null or not based on whether the routing
    // calculation was successful or not.
    offlineBrokerRequest = routeInfo.getOfflineBrokerRequest();
    realtimeBrokerRequest = routeInfo.getRealtimeBrokerRequest();

    List<QueryProcessingException> errorMsgs = new ArrayList<>();

    // If all tables in a hybrid are disabled then return an error.
    // Note that if a query is for one of OFFLINE or REALTIME table and the other physical table is disabled,
    // we still want to run the query. So this condition is false.
    // Tested by the tables "hybrid_o_disabled_REALTIME" and "hybrid_r_disabled_OFFLINE" in ImplicitTableRouteTest
    if (routeInfo.isDisabled()) {
      requestContext.setErrorCode(QueryErrorCode.TABLE_IS_DISABLED);
      return BrokerResponseNative.TABLE_IS_DISABLED;
    }

    List<String> disabledTableNames = routeInfo.getDisabledTableNames();
    if (disabledTableNames != null) {
      for (String name : disabledTableNames) {
        String errorMessage = String.format("%s Table is disabled", name);
        LOGGER.info("{}: {}", errorMessage, query);
        errorMsgs.add(new QueryProcessingException(QueryErrorCode.TABLE_IS_DISABLED, errorMessage));
      }
    }

    int numUnavailableSegments = unavailableSegments.size();
    requestContext.setNumUnavailableSegments(numUnavailableSegments);

    if (numUnavailableSegments > 0) {
      String errorMessage;
      if (numUnavailableSegments > MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION) {
        errorMessage = String.format("%d segments unavailable, sampling %d: %s", numUnavailableSegments,
            MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION,
            unavailableSegments.subList(0, MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION));
      } else {
        errorMessage = String.format("%d segments unavailable: %s", numUnavailableSegments, unavailableSegments);
      }
      String realtimeRoutingPolicy = realtimeBrokerRequest != null ? getRoutingPolicy(realtimeTableConfig) : null;
      String offlineRoutingPolicy = offlineBrokerRequest != null ? getRoutingPolicy(offlineTableConfig) : null;
      errorMessage = addRoutingPolicyInErrMsg(errorMessage, realtimeRoutingPolicy, offlineRoutingPolicy);
      errorMsgs.add(new QueryProcessingException(QueryErrorCode.BROKER_SEGMENT_UNAVAILABLE, errorMessage));
      _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_UNAVAILABLE_SEGMENTS, 1);
    }

    if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
      if (!errorMsgs.isEmpty()) {
        QueryProcessingException firstErrorMsg = errorMsgs.get(0);
        String logTail = errorMsgs.size() > 1 ? (errorMsgs.size()) + " errorMsgs found. Logging only the first one"
            : "1 exception found";
        LOGGER.info("No server found for request {}: {}. {} {}", requestId, query, logTail, firstErrorMsg);
        _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1);
        return BrokerResponseNative.fromBrokerErrors(errorMsgs);
      } else {
        // If no route is found, send an empty response
        return getEmptyBrokerOnlyResponse(pinotQuery, serverPinotQuery, requestContext, tableName, requesterIdentity,
            schema, query, database);
      }
    }
    long routingEndTimeNs = System.nanoTime();
    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.QUERY_ROUTING,
        routingEndTimeNs - routingStartTimeNs);

    // Set timeout in the requests
    long timeSpentMs = TimeUnit.NANOSECONDS.toMillis(routingEndTimeNs - compilationStartTimeNs);
    // Remaining time in milliseconds for the server query execution
    // NOTE: For hybrid use case, in most cases offline table and real-time table should have the same query timeout
    //       configured, but if necessary, we also allow different timeout for them.
    //       If the timeout is not the same for offline table and real-time table, use the max of offline table
    //       remaining time and realtime table remaining time. Server side will have different remaining time set for
    //       each table type, and broker should wait for both types to return.
    long remainingTimeMs = 0;
    try {
      if (offlineBrokerRequest != null) {
        remainingTimeMs =
            setQueryTimeout(offlineTableName, offlineBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs);
      }
      if (realtimeBrokerRequest != null) {
        remainingTimeMs = Math.max(remainingTimeMs,
            setQueryTimeout(realtimeTableName, realtimeBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs));
      }
    } catch (TimeoutException e) {
      String errorMessage = e.getMessage();
      LOGGER.info("{} {}: {}", errorMessage, requestId, query);
      _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS, 1);
      errorMsgs.add(new QueryProcessingException(QueryErrorCode.BROKER_TIMEOUT, errorMessage));
      return BrokerResponseNative.fromBrokerErrors(errorMsgs);
    }

    // Set the maximum serialized response size per server, and ask server to directly return final response when only
    // one server is queried
    int numServers = 0;
    if (offlineExecutionServers != null) {
      numServers += offlineExecutionServers.size();
    }
    if (realtimeExecutionServers != null) {
      numServers += realtimeExecutionServers.size();
    }
    if (offlineBrokerRequest != null) {
      Map<String, String> queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions();
      setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig);
      // Set the query option to directly return final result for single server query unless it is explicitly disabled
      if (numServers == 1) {
        // Set the same flag in the original server request to be used in the reduce phase for hybrid table
        if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
            && offlineBrokerRequest != serverBrokerRequest) {
          serverBrokerRequest.getPinotQuery().getQueryOptions()
              .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
        }
      }
    }
    if (realtimeBrokerRequest != null) {
      Map<String, String> queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions();
      setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig);
      // Set the query option to directly return final result for single server query unless it is explicitly disabled
      if (numServers == 1) {
        // Set the same flag in the original server request to be used in the reduce phase for hybrid table
        if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
            && realtimeBrokerRequest != serverBrokerRequest) {
          serverBrokerRequest.getPinotQuery().getQueryOptions()
              .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
        }
      }
    }

    // Execute the query
    // TODO: Replace ServerStats with ServerRoutingStatsEntry.
    ServerStats serverStats = new ServerStats();
    // TODO: Handle broker specific operations for explain plan queries such as:
    //       - Alias handling
    //       - Compile time function invocation
    //       - Literal only queries
    //       - Any rewrites
    if (pinotQuery.isExplain()) {
      // Update routing tables to only send request to offline servers for OFFLINE and HYBRID tables.
      // TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables
      if (offlineExecutionServers != null) {
        // For OFFLINE and HYBRID tables, don't send EXPLAIN query to realtime servers.
        realtimeBrokerRequest = null;
        realtimeExecutionServers = null;
      }
    }
    BrokerResponseNative brokerResponse;
    if (isQueryCancellationEnabled()) {
      // Start to track the running query for cancellation just before sending it out to servers to avoid any
      // potential failures that could happen before sending it out, like failures to calculate the routing table etc.
      // TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
      //       query being sent out to servers can still happen. If cancel request arrives earlier than query being
      //       sent out to servers, the servers miss the cancel request and continue to run the queries. The users
      //       can always list the running queries and cancel query again until it ends. Just that such race
      //       condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
      //       servers takes time, but will address later if needed.
      String clientRequestId = extractClientRequestId(sqlNodeAndOptions);
      onQueryStart(requestId, clientRequestId, query,
          new QueryServers(query, offlineExecutionServers, realtimeExecutionServers));
      try {
        brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, routeInfo,
            remainingTimeMs, serverStats, requestContext);
        brokerResponse.setClientRequestId(clientRequestId);
      } finally {
        onQueryFinish(requestId);
        LOGGER.debug("Remove track of running query: {}", requestId);
      }
    } else {
      brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, routeInfo,
          remainingTimeMs, serverStats, requestContext);
    }
    brokerResponse.setTablesQueried(Set.of(rawTableName));

    for (QueryProcessingException errorMsg : errorMsgs) {
      brokerResponse.addException(errorMsg);
    }
    brokerResponse.setNumSegmentsPrunedByBroker(numPrunedSegmentsTotal);
    long executionEndTimeNs = System.nanoTime();
    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.QUERY_EXECUTION,
        executionEndTimeNs - routingEndTimeNs);

    // Track number of queries with number of groups limit reached
    if (brokerResponse.isNumGroupsLimitReached()) {
      _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
          1);
    }

    // server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned
    // this is an attempt to return more faithful information based on other sources
    fillEmptyResponseSchema(pinotQuery, brokerResponse, schema, database, query);

    // Set total query processing time
    long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
    brokerResponse.setTimeUsedMs(totalTimeMs);
    augmentStatistics(requestContext, brokerResponse);
    // include both broker side errorMsgs and server side errorMsgs
    List<QueryProcessingException> brokerExceptions = brokerResponse.getExceptions();
    brokerExceptions.stream()
        .filter(exception -> exception.getErrorCode() == QueryErrorCode.QUERY_VALIDATION.getId())
        .findFirst()
        .ifPresent(exception -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1));
    if (!pinotQuery.isExplain() && QueryOptionsUtils.shouldDropResults(pinotQuery.getQueryOptions())) {
      brokerResponse.setResultTable(null);
    }
    if (QueryOptionsUtils.isSecondaryWorkload(pinotQuery.getQueryOptions())) {
      _brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.SECONDARY_WORKLOAD_QUERY_TOTAL_TIME_MS, totalTimeMs,
          TimeUnit.MILLISECONDS);
      _brokerMetrics.addTimedValue(BrokerTimer.SECONDARY_WORKLOAD_QUERY_TOTAL_TIME_MS, totalTimeMs,
          TimeUnit.MILLISECONDS);
    } else {
      _brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs,
          TimeUnit.MILLISECONDS);
      _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS);
    }

    // Log query and stats
    _queryLogger.log(
        new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse,
            QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, serverStats));

    return brokerResponse;
  }