api/query_handler.go (319 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"encoding/json"
"github.com/m3db/m3/src/x/sync"
"github.com/uber/aresdb/cluster/topology"
"net/http"
"github.com/uber/aresdb/memstore"
"github.com/uber/aresdb/query"
queryCom "github.com/uber/aresdb/query/common"
"github.com/uber/aresdb/utils"
"time"
"github.com/gorilla/mux"
apiCom "github.com/uber/aresdb/api/common"
"github.com/uber/aresdb/common"
)
// QueryHandler handles query execution.
type QueryHandler struct {
shardOwner topology.ShardOwner
memStore memstore.MemStore
deviceManager *query.DeviceManager
workerPool sync.WorkerPool
}
// NewQueryHandler creates a new QueryHandler.
func NewQueryHandler(
memStore memstore.MemStore,
shardOwner topology.ShardOwner,
cfg common.QueryConfig,
maxConcurrentQueries int,
) *QueryHandler {
workerPool := sync.NewWorkerPool(maxConcurrentQueries)
workerPool.Init()
return &QueryHandler{
memStore: memStore,
shardOwner: shardOwner,
deviceManager: query.NewDeviceManager(cfg),
workerPool: workerPool,
}
}
// GetDeviceManager returns the device manager of query handler.
func (handler *QueryHandler) GetDeviceManager() *query.DeviceManager {
return handler.deviceManager
}
// Register registers http handlers.
func (handler *QueryHandler) Register(router *mux.Router, wrappers ...utils.HTTPHandlerWrapper) {
router.HandleFunc("/aql", utils.ApplyHTTPWrappers(handler.HandleAQL, wrappers...)).Methods(http.MethodGet, http.MethodPost)
router.HandleFunc("/sql", utils.ApplyHTTPWrappers(handler.HandleSQL, wrappers...)).Methods(http.MethodGet, http.MethodPost)
}
// HandleAQL swagger:route POST /query/aql queryAQL
// query in AQL
//
// Consumes:
// - application/json
// - application/hll
//
// Produces:
// - application/json
//
// Responses:
// default: errorResponse
// 200: aqlResponse
// 400: aqlResponse
func (handler *QueryHandler) HandleAQL(w *utils.ResponseWriter, r *http.Request) {
// default device to negative value to differentiate 0 from empty
aqlRequest := apiCom.AQLRequest{Device: -1}
if err := apiCom.ReadRequest(r, &aqlRequest); err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
done := make(chan struct{})
available := handler.workerPool.GoIfAvailable(func() {
defer close(done)
handler.handleAQLInternal(aqlRequest, w, r)
})
if !available {
w.WriteError(apiCom.ErrQueryServiceNotAvailable)
return
}
<-done
}
func (handler *QueryHandler) handleAQLInternal(aqlRequest apiCom.AQLRequest, w *utils.ResponseWriter, r *http.Request) {
var err error
var duration time.Duration
var qcs []*query.AQLQueryContext
var statusCode int
defer func() {
var errStr string
if err != nil {
errStr = err.Error()
}
l := utils.GetQueryLogger().With(
"error", errStr,
"request", aqlRequest,
"queries_enabled_", aqlRequest.Body.Queries,
"duration", duration,
"statusCode", statusCode,
"contexts_enabled_", qcs,
"headers", r.Header,
)
if statusCode == http.StatusOK {
l.Info("All queries succeeded")
} else {
l.Error("Some of the queries finished with error")
}
}()
if aqlRequest.Query != "" {
// Override from query parameter
err = json.Unmarshal([]byte(aqlRequest.Query), &aqlRequest.Body)
if err != nil {
statusCode = http.StatusBadRequest
w.WriteError(utils.APIError{
Code: statusCode,
Message: ErrMsgFailedToUnmarshalRequest,
Cause: err,
})
return
}
}
if aqlRequest.Body.Queries == nil {
statusCode = http.StatusBadRequest
w.WriteError(ErrMissingParameter)
return
}
returnHLL := aqlRequest.Accept == utils.HTTPContentTypeHyperLogLog
if aqlRequest.DeviceChoosingTimeout <= 0 {
aqlRequest.DeviceChoosingTimeout = -1
}
queryTimer := utils.GetRootReporter().GetTimer(utils.QueryLatency)
start := utils.Now()
var requestResponseWriter QueryResponseWriter
if !returnHLL && canEagerFlush(aqlRequest.Body.Queries) {
statusCode = http.StatusOK
aqlQuery := aqlRequest.Body.Queries[0]
qc := &query.AQLQueryContext{
Query: &aqlQuery,
ReturnHLLData: false,
DataOnly: aqlRequest.DataOnly != 0,
}
qc.InitQCHelper()
qc.Compile(handler.memStore, handler.shardOwner)
qc.ResponseWriter = w
if qc.Error != nil {
err = qc.Error
statusCode = http.StatusBadRequest
w.WriteErrorWithCode(statusCode, err)
return
}
// for logging purpose only
qcs = append(qcs, qc)
qc.FindDeviceForQuery(handler.memStore, aqlRequest.Device, handler.deviceManager, aqlRequest.DeviceChoosingTimeout)
if qc.Error != nil {
err = qc.Error
statusCode = http.StatusServiceUnavailable
w.WriteErrorWithCode(statusCode, err)
return
}
defer handler.deviceManager.ReleaseReservedMemory(qc.Device, qc.Query)
qc.ProcessQuery(handler.memStore)
if qc.Error != nil {
err = qc.Error
utils.GetQueryLogger().With(
"error", err,
"query", aqlQuery,
"context", qc,
).Error("Error happened when processing query")
statusCode = http.StatusInternalServerError
return
}
if !qc.DataOnly {
w.Write([]byte(`]}]`))
if aqlRequest.Verbose > 0 {
w.Write([]byte(`,"context":`))
qcBytes, _ := json.Marshal(qcs)
w.Write(qcBytes)
}
w.Write([]byte(`}`))
}
utils.GetRootReporter().GetChildCounter(map[string]string{
"table": aqlQuery.Table,
}, utils.QueryRowsReturned).Inc(int64(qc.ResultsRowsFlushed()))
} else {
requestResponseWriter = getReponseWriter(returnHLL, len(aqlRequest.Body.Queries))
var qc *query.AQLQueryContext
for i, aqlQuery := range aqlRequest.Body.Queries {
qc, statusCode = handleQuery(handler.memStore, handler.shardOwner, handler.deviceManager, aqlRequest, aqlQuery)
if aqlRequest.Verbose > 0 {
requestResponseWriter.ReportQueryContext(qc)
}
if qc.Error != nil {
requestResponseWriter.ReportError(i, aqlQuery.Table, qc.Error, statusCode)
} else {
requestResponseWriter.ReportResult(i, qc)
qc.ReleaseHostResultsBuffers()
utils.GetRootReporter().GetChildCounter(map[string]string{
"table": aqlQuery.Table,
}, utils.QuerySucceeded).Inc(1)
}
qcs = append(qcs, qc)
}
}
duration = utils.Now().Sub(start)
queryTimer.Record(duration)
if requestResponseWriter != nil {
requestResponseWriter.Respond(w)
statusCode = requestResponseWriter.GetStatusCode()
}
return
}
func handleQuery(memStore memstore.MemStore, shardOwner topology.ShardOwner, deviceManager *query.DeviceManager, aqlRequest apiCom.AQLRequest, aqlQuery queryCom.AQLQuery) (qc *query.AQLQueryContext, statusCode int) {
qc = &query.AQLQueryContext{
Query: &aqlQuery,
ReturnHLLData: aqlRequest.Accept == utils.HTTPContentTypeHyperLogLog,
DataOnly: aqlRequest.DataOnly != 0,
}
qc.InitQCHelper()
qc.Compile(memStore, shardOwner)
for tableName := range qc.TableSchemaByName {
utils.GetRootReporter().GetChildCounter(map[string]string{
"table": tableName,
}, utils.QueryReceived).Inc(1)
}
if aqlRequest.Debug > 0 || aqlRequest.Profiling != "" {
qc.Debug = true
aqlRequest.Verbose = 1
}
qc.Profiling = aqlRequest.Profiling
// Compilation error, should be bad request
if qc.Error != nil {
statusCode = http.StatusBadRequest
return
}
// Find a device that meets the resource requirement of this query
// Use query specified device as hint
qc.FindDeviceForQuery(memStore, aqlRequest.Device, deviceManager, aqlRequest.DeviceChoosingTimeout)
// Unable to find a device for the query.
if qc.Error != nil {
// Unable to fulfill this request due to resource not available, clients need to try sometimes later.
statusCode = http.StatusServiceUnavailable
return
}
defer deviceManager.ReleaseReservedMemory(qc.Device, qc.Query)
// Execute.
qc.ProcessQuery(memStore)
if qc.Error != nil {
utils.GetQueryLogger().With(
"error", qc.Error,
"query", aqlQuery,
"context", qc,
).Error("Error happened when processing query")
statusCode = http.StatusInternalServerError
} else {
// Report
utils.GetRootReporter().GetChildCounter(map[string]string{
"table": aqlQuery.Table,
}, utils.QueryRowsReturned).Inc(int64(qc.ResultsRowsFlushed()))
}
return
}
func getReponseWriter(returnHLL bool, nQueries int) QueryResponseWriter {
if returnHLL {
return NewHLLQueryResponseWriter()
}
return NewJSONQueryResponseWriter(nQueries)
}
// QueryResponseWriter defines the interface to write query result and error to final response.
type QueryResponseWriter interface {
ReportError(queryIndex int, table string, err error, statusCode int)
ReportQueryContext(*query.AQLQueryContext)
ReportResult(int, *query.AQLQueryContext)
// param compress means whether client accepts compressed results
Respond(w *utils.ResponseWriter)
GetStatusCode() int
}
// JSONQueryResponseWriter writes query result as json.
type JSONQueryResponseWriter struct {
response queryCom.AQLResponse
statusCode int
}
// NewJSONQueryResponseWriter creates a new JSONQueryResponseWriter.
func NewJSONQueryResponseWriter(nQueries int) QueryResponseWriter {
return &JSONQueryResponseWriter{
response: queryCom.AQLResponse{
Results: make([]queryCom.AQLQueryResult, nQueries),
},
statusCode: http.StatusOK,
}
}
// ReportError writes the error of the query to the response.
func (w *JSONQueryResponseWriter) ReportError(queryIndex int, table string, err error, statusCode int) {
// Usually larger status code means more severe problem.
if statusCode > w.statusCode {
w.statusCode = statusCode
}
if w.response.Errors == nil {
w.response.Errors = make([]error, len(w.response.Results))
}
w.response.Errors[queryIndex] = err
utils.GetRootReporter().GetChildCounter(map[string]string{
"table": table,
}, utils.QueryFailed).Inc(1)
}
// ReportQueryContext writes the query context to the response.
func (w *JSONQueryResponseWriter) ReportQueryContext(qc *query.AQLQueryContext) {
bs, _ := json.Marshal(qc)
w.response.QueryContext = append(w.response.QueryContext, string(bs))
}
// ReportResult writes the query result to the response.
func (w *JSONQueryResponseWriter) ReportResult(queryIndex int, qc *query.AQLQueryContext) {
qc.Postprocess()
if qc.Error != nil {
w.ReportError(queryIndex, qc.Query.Table, qc.Error, http.StatusInternalServerError)
}
w.response.Results[queryIndex] = qc.Results
}
// Respond writes the final response into ResponseWriter.
func (w *JSONQueryResponseWriter) Respond(rw *utils.ResponseWriter) {
rw.WriteObjectWithCode(w.statusCode, w.response)
}
// GetStatusCode returns the status code written into response.
func (w *JSONQueryResponseWriter) GetStatusCode() int {
return w.statusCode
}
// HLLQueryResponseWriter writes query result as application/hll. For more inforamtion, please refer to
// https://github.com/uber/aresdb/wiki/HyperLogLog.
type HLLQueryResponseWriter struct {
response *queryCom.HLLQueryResults
statusCode int
}
// NewHLLQueryResponseWriter creates a new HLLQueryResponseWriter.
func NewHLLQueryResponseWriter() QueryResponseWriter {
w := HLLQueryResponseWriter{
response: queryCom.NewHLLQueryResults(),
statusCode: http.StatusOK,
}
return &w
}
// ReportError writes the error of the query to the response.
func (w *HLLQueryResponseWriter) ReportError(queryIndex int, table string, err error, statusCode int) {
if statusCode > w.statusCode {
w.statusCode = statusCode
}
w.response.WriteError(err)
}
// ReportQueryContext writes the query context to the response. Since the format of application/hll is not
// designed for human reading, we will ignore storing query context in response for now.
func (w *HLLQueryResponseWriter) ReportQueryContext(qc *query.AQLQueryContext) {
}
// ReportResult writes the query result to the response.
func (w *HLLQueryResponseWriter) ReportResult(queryIndex int, qc *query.AQLQueryContext) {
w.response.WriteResult(qc.HLLQueryResult)
}
// Respond writes the final response into ResponseWriter.
func (w *HLLQueryResponseWriter) Respond(rw *utils.ResponseWriter) {
rw.Header().Set(utils.HTTPContentTypeHeaderKey, utils.HTTPContentTypeHyperLogLog)
rw.WriteBytesWithCode(w.statusCode, w.response.GetBytes())
}
// GetStatusCode returns the status code written into response.
func (w *HLLQueryResponseWriter) GetStatusCode() int {
return w.statusCode
}
// for now we only eager flush when
// 1. there's only 1 query in the request
// 2. the query is non aggregate query
func canEagerFlush(queries []queryCom.AQLQuery) bool {
if len(queries) != 1 {
return false
}
aqlQuery := queries[0]
return len(aqlQuery.Measures) == 1 && aqlQuery.Measures[0].Expr == "1"
}