broker/handler.go (118 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package broker
import (
"context"
"fmt"
"github.com/gorilla/mux"
apiCom "github.com/uber/aresdb/api/common"
"github.com/uber/aresdb/broker/common"
queryCom "github.com/uber/aresdb/query/common"
"github.com/uber/aresdb/query/sql"
"github.com/uber/aresdb/utils"
"net/http"
"sync/atomic"
)
type QueryHandler struct {
exec common.QueryExecutor
nextRequestID int64
instanceID string
}
func NewQueryHandler(executor common.QueryExecutor, instanceID string) QueryHandler {
return QueryHandler{
exec: executor,
instanceID: instanceID,
}
}
func (handler *QueryHandler) Register(router *mux.Router, wrappers ...utils.HTTPHandlerWrapper) {
router.HandleFunc("/sql", utils.ApplyHTTPWrappers(handler.HandleSQL, wrappers...)).Methods(http.MethodPost)
router.HandleFunc("/aql", utils.ApplyHTTPWrappers(handler.HandleAQL, wrappers...)).Methods(http.MethodPost)
}
func (handler *QueryHandler) HandleSQL(w *utils.ResponseWriter, r *http.Request) {
utils.GetRootReporter().GetCounter(utils.SQLQueryReceivedBroker).Inc(1)
var queryReqeust BrokerSQLRequest
start := utils.Now()
var err error
defer func() {
duration := utils.Now().Sub(start)
utils.GetRootReporter().GetTimer(utils.QueryLatencyBroker).Record(duration)
if err != nil {
utils.GetRootReporter().GetCounter(utils.QueryFailedBroker).Inc(1)
utils.GetLogger().With(
"error", err,
"request", queryReqeust).Error("Error happened when processing request")
} else {
utils.GetRootReporter().GetCounter(utils.QuerySucceededBroker).Inc(1)
utils.GetLogger().With("request", queryReqeust).Info("Request succeeded")
}
}()
err = apiCom.ReadRequest(r, &queryReqeust)
if err != nil {
w.WriteError(err)
return
}
sqlParseStart := utils.Now()
var aql *queryCom.AQLQuery
aql, err = sql.Parse(queryReqeust.Body.Query, utils.GetLogger())
utils.GetRootReporter().GetTimer(utils.SQLParsingLatencyBroker).Record(utils.Now().Sub(sqlParseStart))
if err != nil {
w.WriteError(err)
return
}
err = handler.exec.Execute(context.Background(), handler.getReqestID(), aql, queryReqeust.Accept == utils.HTTPContentTypeHyperLogLog, w)
if err != nil {
w.WriteError(err)
return
}
return
}
func (handler *QueryHandler) HandleAQL(w *utils.ResponseWriter, r *http.Request) {
var queryReqeust BrokerAQLRequest
utils.GetRootReporter().GetCounter(utils.AQLQueryReceivedBroker).Inc(1)
start := utils.Now()
var err error
defer func() {
duration := utils.Now().Sub(start)
utils.GetRootReporter().GetTimer(utils.QueryLatencyBroker).Record(duration)
if err != nil {
utils.GetRootReporter().GetCounter(utils.QueryFailedBroker).Inc(1)
utils.GetLogger().With(
"error", err,
"request", queryReqeust).Error("Error happened when processing request")
} else {
utils.GetRootReporter().GetCounter(utils.QuerySucceededBroker).Inc(1)
utils.GetLogger().With("request", queryReqeust).Info("Request succeeded")
}
}()
err = apiCom.ReadRequest(r, &queryReqeust)
if err != nil {
w.WriteError(err)
return
}
err = handler.exec.Execute(context.TODO(), handler.getReqestID(), &queryReqeust.Body.Query, queryReqeust.Accept == utils.HTTPContentTypeHyperLogLog, w)
if err != nil {
w.WriteError(err)
return
}
return
}
func (handler *QueryHandler) getReqestID() string {
newID := atomic.AddInt64(&handler.nextRequestID, 1)
return fmt.Sprintf("%s_%d", handler.instanceID, newID)
}
// BrokerSQLRequest represents SQL query request. Debug mode will
// run **each batch** in synchronized mode and report time
// for each step.
// swagger:parameters querySQL
type BrokerSQLRequest struct {
// in: query
Verbose int `query:"verbose,optional" json:"verbose"`
// in: query
Debug int `query:"debug,optional" json:"debug"`
// in: header
Accept string `header:"Accept,optional" json:"accept"`
// in: header
Origin string `header:"Rpc-Caller,optional" json:"origin"`
// in: body
Body struct {
Query string `json:"query"`
} `body:""`
}
// BrokerAQLRequest represents AQL query request. Debug mode will
// run **each batch** in synchronized mode and report time
// for each step.
// swagger:parameters querySQL
type BrokerAQLRequest struct {
// in: query
Verbose int `query:"verbose,optional" json:"verbose"`
// in: query
Debug int `query:"debug,optional" json:"debug"`
// in: header
Accept string `header:"Accept,optional" json:"accept"`
// in: header
Origin string `header:"Rpc-Caller,optional" json:"origin"`
// in: body
Body struct {
Query queryCom.AQLQuery `json:"query"`
} `body:""`
}