in pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java [358:770]
protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception {
// Compile the request into PinotQuery
long compilationStartTimeNs = System.nanoTime();
CompileResult compileResult =
compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders,
accessControl);
if (compileResult._errorOrLiteralOnlyBrokerResponse != null) {
/*
* If the compileRequest method sets the BrokerResponse field, then it is either an error response or
* a literal-only query. In either case, we can return the response directly.
*/
return compileResult._errorOrLiteralOnlyBrokerResponse;
}
Schema schema = compileResult._schema;
String tableName = compileResult._tableName;
String rawTableName = compileResult._rawTableName;
PinotQuery pinotQuery = compileResult._pinotQuery;
PinotQuery serverPinotQuery = compileResult._serverPinotQuery;
long compilationEndTimeNs = System.nanoTime();
// full request compile time = compilationTimeNs + parserTimeNs
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION,
(compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs());
// Second-stage table-level access control
// TODO: Modify AccessControl interface to directly take PinotQuery
BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
BrokerRequest serverBrokerRequest =
serverPinotQuery == pinotQuery ? brokerRequest : CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
AuthorizationResult authorizationResult = accessControl.authorize(requesterIdentity, serverBrokerRequest);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
System.nanoTime() - compilationEndTimeNs);
if (!authorizationResult.hasAccess()) {
throwAccessDeniedError(requestId, query, requestContext, tableName, authorizationResult);
}
// Get the tables hit by the request
TableRouteProvider routeProvider = _implicitHybridTableRouteProvider;
TableRouteInfo routeInfo = routeProvider.getTableRouteInfo(tableName, _tableCache, _routingManager);
if (!routeInfo.isExists()) {
LOGGER.info("Table not found for request {}: {}", requestId, query);
requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
}
if (!routeInfo.isRouteExists()) {
LOGGER.info("No table matches for request {}: {}", requestId, query);
requestContext.setErrorCode(QueryErrorCode.BROKER_RESOURCE_MISSING);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1);
return BrokerResponseNative.NO_TABLE_RESULT;
}
String offlineTableName = routeInfo.getOfflineTableName();
String realtimeTableName = routeInfo.getRealtimeTableName();
TableConfig offlineTableConfig = routeInfo.getOfflineTableConfig();
TableConfig realtimeTableConfig = routeInfo.getRealtimeTableConfig();
TimeBoundaryInfo timeBoundaryInfo = routeInfo.getTimeBoundaryInfo();
HandlerContext handlerContext = getHandlerContext(offlineTableConfig, realtimeTableConfig);
validateGroovyScript(serverPinotQuery, handlerContext._disableGroovy);
if (handlerContext._useApproximateFunction) {
handleApproximateFunctionOverride(serverPinotQuery);
}
// Validate QPS quota
String database = DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
if (!_queryQuotaManager.acquireDatabase(database)) {
String errorMessage =
String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database);
LOGGER.info(errorMessage);
requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage);
}
if (!_queryQuotaManager.acquire(tableName)) {
String errorMessage =
String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName);
LOGGER.info(errorMessage);
requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage);
}
// Validate the request
try {
validateRequest(serverPinotQuery, _queryResponseLimit);
} catch (Exception e) {
LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage());
requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage());
}
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1);
_brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.REQUEST_SIZE, query.length());
if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) {
// Check if the query is a v2 supported query
database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders);
// Attempt to add the query to the compile queue; drop if queue is full
if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) {
LOGGER.trace("Not compiling query `{}` using the multi-stage query engine because the query queue is full",
query);
}
}
// Prepare OFFLINE and REALTIME requests
BrokerRequest offlineBrokerRequest = null;
BrokerRequest realtimeBrokerRequest = null;
if (routeInfo.isHybrid()) {
// Hybrid
PinotQuery offlinePinotQuery = serverPinotQuery.deepCopy();
offlinePinotQuery.getDataSource().setTableName(offlineTableName);
assert timeBoundaryInfo != null;
attachTimeBoundary(offlinePinotQuery, timeBoundaryInfo, true);
handleExpressionOverride(offlinePinotQuery, _tableCache.getExpressionOverrideMap(offlineTableName));
handleTimestampIndexOverride(offlinePinotQuery, offlineTableConfig);
_queryOptimizer.optimize(offlinePinotQuery, offlineTableConfig, schema);
offlineBrokerRequest = CalciteSqlCompiler.convertToBrokerRequest(offlinePinotQuery);
PinotQuery realtimePinotQuery = serverPinotQuery.deepCopy();
realtimePinotQuery.getDataSource().setTableName(realtimeTableName);
attachTimeBoundary(realtimePinotQuery, timeBoundaryInfo, false);
handleExpressionOverride(realtimePinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName));
handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig);
_queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig, schema);
realtimeBrokerRequest = CalciteSqlCompiler.convertToBrokerRequest(realtimePinotQuery);
requestContext.setFanoutType(RequestContext.FanoutType.HYBRID);
requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
} else if (routeInfo.isOffline()) {
// OFFLINE only
setTableName(serverBrokerRequest, offlineTableName);
handleExpressionOverride(serverPinotQuery, _tableCache.getExpressionOverrideMap(offlineTableName));
handleTimestampIndexOverride(serverPinotQuery, offlineTableConfig);
_queryOptimizer.optimize(serverPinotQuery, offlineTableConfig, schema);
offlineBrokerRequest = serverBrokerRequest;
requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE);
requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
} else {
// REALTIME only
setTableName(serverBrokerRequest, realtimeTableName);
handleExpressionOverride(serverPinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName));
handleTimestampIndexOverride(serverPinotQuery, realtimeTableConfig);
_queryOptimizer.optimize(serverPinotQuery, realtimeTableConfig, schema);
realtimeBrokerRequest = serverBrokerRequest;
requestContext.setFanoutType(RequestContext.FanoutType.REALTIME);
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
}
// Check if response can be sent without server query evaluation.
if (offlineBrokerRequest != null && isFilterAlwaysFalse(offlineBrokerRequest.getPinotQuery())) {
// We don't need to evaluate offline request
offlineBrokerRequest = null;
}
if (realtimeBrokerRequest != null && isFilterAlwaysFalse(realtimeBrokerRequest.getPinotQuery())) {
// We don't need to evaluate realtime request
realtimeBrokerRequest = null;
}
if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
return getEmptyBrokerOnlyResponse(pinotQuery, serverPinotQuery, requestContext, tableName, requesterIdentity,
schema, query, database);
}
if (offlineBrokerRequest != null && isFilterAlwaysTrue(offlineBrokerRequest.getPinotQuery())) {
// Drop offline request filter since it is always true
offlineBrokerRequest.getPinotQuery().setFilterExpression(null);
}
if (realtimeBrokerRequest != null && isFilterAlwaysTrue(realtimeBrokerRequest.getPinotQuery())) {
// Drop realtime request filter since it is always true
realtimeBrokerRequest.getPinotQuery().setFilterExpression(null);
}
// Calculate routing table for the query
// TODO: Modify RoutingManager interface to directly take PinotQuery
long routingStartTimeNs = System.nanoTime();
routeProvider.calculateRoutes(routeInfo, _routingManager, offlineBrokerRequest, realtimeBrokerRequest,
requestId);
Set<ServerInstance> offlineExecutionServers = routeInfo.getOfflineExecutionServers();
Set<ServerInstance> realtimeExecutionServers = routeInfo.getRealtimeExecutionServers();
List<String> unavailableSegments = routeInfo.getUnavailableSegments();
int numPrunedSegmentsTotal = routeInfo.getNumPrunedSegmentsTotal();
// Rewrite the broker requests as the rest of the code expects them to be null or not based on whether the routing
// calculation was successful or not.
offlineBrokerRequest = routeInfo.getOfflineBrokerRequest();
realtimeBrokerRequest = routeInfo.getRealtimeBrokerRequest();
List<QueryProcessingException> errorMsgs = new ArrayList<>();
// If all tables in a hybrid are disabled then return an error.
// Note that if a query is for one of OFFLINE or REALTIME table and the other physical table is disabled,
// we still want to run the query. So this condition is false.
// Tested by the tables "hybrid_o_disabled_REALTIME" and "hybrid_r_disabled_OFFLINE" in ImplicitTableRouteTest
if (routeInfo.isDisabled()) {
requestContext.setErrorCode(QueryErrorCode.TABLE_IS_DISABLED);
return BrokerResponseNative.TABLE_IS_DISABLED;
}
List<String> disabledTableNames = routeInfo.getDisabledTableNames();
if (disabledTableNames != null) {
for (String name : disabledTableNames) {
String errorMessage = String.format("%s Table is disabled", name);
LOGGER.info("{}: {}", errorMessage, query);
errorMsgs.add(new QueryProcessingException(QueryErrorCode.TABLE_IS_DISABLED, errorMessage));
}
}
int numUnavailableSegments = unavailableSegments.size();
requestContext.setNumUnavailableSegments(numUnavailableSegments);
if (numUnavailableSegments > 0) {
String errorMessage;
if (numUnavailableSegments > MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION) {
errorMessage = String.format("%d segments unavailable, sampling %d: %s", numUnavailableSegments,
MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION,
unavailableSegments.subList(0, MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION));
} else {
errorMessage = String.format("%d segments unavailable: %s", numUnavailableSegments, unavailableSegments);
}
String realtimeRoutingPolicy = realtimeBrokerRequest != null ? getRoutingPolicy(realtimeTableConfig) : null;
String offlineRoutingPolicy = offlineBrokerRequest != null ? getRoutingPolicy(offlineTableConfig) : null;
errorMessage = addRoutingPolicyInErrMsg(errorMessage, realtimeRoutingPolicy, offlineRoutingPolicy);
errorMsgs.add(new QueryProcessingException(QueryErrorCode.BROKER_SEGMENT_UNAVAILABLE, errorMessage));
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_UNAVAILABLE_SEGMENTS, 1);
}
if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
if (!errorMsgs.isEmpty()) {
QueryProcessingException firstErrorMsg = errorMsgs.get(0);
String logTail = errorMsgs.size() > 1 ? (errorMsgs.size()) + " errorMsgs found. Logging only the first one"
: "1 exception found";
LOGGER.info("No server found for request {}: {}. {} {}", requestId, query, logTail, firstErrorMsg);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1);
return BrokerResponseNative.fromBrokerErrors(errorMsgs);
} else {
// If no route is found, send an empty response
return getEmptyBrokerOnlyResponse(pinotQuery, serverPinotQuery, requestContext, tableName, requesterIdentity,
schema, query, database);
}
}
long routingEndTimeNs = System.nanoTime();
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.QUERY_ROUTING,
routingEndTimeNs - routingStartTimeNs);
// Set timeout in the requests
long timeSpentMs = TimeUnit.NANOSECONDS.toMillis(routingEndTimeNs - compilationStartTimeNs);
// Remaining time in milliseconds for the server query execution
// NOTE: For hybrid use case, in most cases offline table and real-time table should have the same query timeout
// configured, but if necessary, we also allow different timeout for them.
// If the timeout is not the same for offline table and real-time table, use the max of offline table
// remaining time and realtime table remaining time. Server side will have different remaining time set for
// each table type, and broker should wait for both types to return.
long remainingTimeMs = 0;
try {
if (offlineBrokerRequest != null) {
remainingTimeMs =
setQueryTimeout(offlineTableName, offlineBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs);
}
if (realtimeBrokerRequest != null) {
remainingTimeMs = Math.max(remainingTimeMs,
setQueryTimeout(realtimeTableName, realtimeBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs));
}
} catch (TimeoutException e) {
String errorMessage = e.getMessage();
LOGGER.info("{} {}: {}", errorMessage, requestId, query);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS, 1);
errorMsgs.add(new QueryProcessingException(QueryErrorCode.BROKER_TIMEOUT, errorMessage));
return BrokerResponseNative.fromBrokerErrors(errorMsgs);
}
// Set the maximum serialized response size per server, and ask server to directly return final response when only
// one server is queried
int numServers = 0;
if (offlineExecutionServers != null) {
numServers += offlineExecutionServers.size();
}
if (realtimeExecutionServers != null) {
numServers += realtimeExecutionServers.size();
}
if (offlineBrokerRequest != null) {
Map<String, String> queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions();
setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig);
// Set the query option to directly return final result for single server query unless it is explicitly disabled
if (numServers == 1) {
// Set the same flag in the original server request to be used in the reduce phase for hybrid table
if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
&& offlineBrokerRequest != serverBrokerRequest) {
serverBrokerRequest.getPinotQuery().getQueryOptions()
.put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
}
}
}
if (realtimeBrokerRequest != null) {
Map<String, String> queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions();
setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig);
// Set the query option to directly return final result for single server query unless it is explicitly disabled
if (numServers == 1) {
// Set the same flag in the original server request to be used in the reduce phase for hybrid table
if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
&& realtimeBrokerRequest != serverBrokerRequest) {
serverBrokerRequest.getPinotQuery().getQueryOptions()
.put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
}
}
}
// Execute the query
// TODO: Replace ServerStats with ServerRoutingStatsEntry.
ServerStats serverStats = new ServerStats();
// TODO: Handle broker specific operations for explain plan queries such as:
// - Alias handling
// - Compile time function invocation
// - Literal only queries
// - Any rewrites
if (pinotQuery.isExplain()) {
// Update routing tables to only send request to offline servers for OFFLINE and HYBRID tables.
// TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables
if (offlineExecutionServers != null) {
// For OFFLINE and HYBRID tables, don't send EXPLAIN query to realtime servers.
realtimeBrokerRequest = null;
realtimeExecutionServers = null;
}
}
BrokerResponseNative brokerResponse;
if (isQueryCancellationEnabled()) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any
// potential failures that could happen before sending it out, like failures to calculate the routing table etc.
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
// query being sent out to servers can still happen. If cancel request arrives earlier than query being
// sent out to servers, the servers miss the cancel request and continue to run the queries. The users
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
String clientRequestId = extractClientRequestId(sqlNodeAndOptions);
onQueryStart(requestId, clientRequestId, query,
new QueryServers(query, offlineExecutionServers, realtimeExecutionServers));
try {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, routeInfo,
remainingTimeMs, serverStats, requestContext);
brokerResponse.setClientRequestId(clientRequestId);
} finally {
onQueryFinish(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
} else {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, routeInfo,
remainingTimeMs, serverStats, requestContext);
}
brokerResponse.setTablesQueried(Set.of(rawTableName));
for (QueryProcessingException errorMsg : errorMsgs) {
brokerResponse.addException(errorMsg);
}
brokerResponse.setNumSegmentsPrunedByBroker(numPrunedSegmentsTotal);
long executionEndTimeNs = System.nanoTime();
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.QUERY_EXECUTION,
executionEndTimeNs - routingEndTimeNs);
// Track number of queries with number of groups limit reached
if (brokerResponse.isNumGroupsLimitReached()) {
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
1);
}
// server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned
// this is an attempt to return more faithful information based on other sources
fillEmptyResponseSchema(pinotQuery, brokerResponse, schema, database, query);
// Set total query processing time
long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
brokerResponse.setTimeUsedMs(totalTimeMs);
augmentStatistics(requestContext, brokerResponse);
// include both broker side errorMsgs and server side errorMsgs
List<QueryProcessingException> brokerExceptions = brokerResponse.getExceptions();
brokerExceptions.stream()
.filter(exception -> exception.getErrorCode() == QueryErrorCode.QUERY_VALIDATION.getId())
.findFirst()
.ifPresent(exception -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1));
if (!pinotQuery.isExplain() && QueryOptionsUtils.shouldDropResults(pinotQuery.getQueryOptions())) {
brokerResponse.setResultTable(null);
}
if (QueryOptionsUtils.isSecondaryWorkload(pinotQuery.getQueryOptions())) {
_brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.SECONDARY_WORKLOAD_QUERY_TOTAL_TIME_MS, totalTimeMs,
TimeUnit.MILLISECONDS);
_brokerMetrics.addTimedValue(BrokerTimer.SECONDARY_WORKLOAD_QUERY_TOTAL_TIME_MS, totalTimeMs,
TimeUnit.MILLISECONDS);
} else {
_brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs,
TimeUnit.MILLISECONDS);
_brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS);
}
// Log query and stats
_queryLogger.log(
new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse,
QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, serverStats));
return brokerResponse;
}