datanode/client/query_client.go (129 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/uber/aresdb/cluster/topology"
"github.com/uber/aresdb/common"
queryCom "github.com/uber/aresdb/query/common"
"github.com/uber/aresdb/utils"
. "io/ioutil"
"net/http"
"net/url"
)
const (
requestIDHeaderKey = "RequestID"
)
// ErrFailedToConnect represents error to connect to datanode
var ErrFailedToConnect = errors.New("Datanode query client failed to connect")
// NewDataNodeQueryClient creates query client to datanode
func NewDataNodeQueryClient(logger common.Logger) DataNodeQueryClient {
return &dataNodeQueryClientImpl{
client: http.Client{},
logger: logger,
}
}
type dataNodeQueryClientImpl struct {
client http.Client
logger common.Logger
}
type aqlRequestBody struct {
Queries []queryCom.AQLQuery `json:"queries"`
}
type aqlRespBody struct {
Results []queryCom.AQLQueryResult `json:"results"`
}
func (dc *dataNodeQueryClientImpl) Query(ctx context.Context, requestID string, host topology.Host, query queryCom.AQLQuery, hll bool) (result queryCom.AQLQueryResult, err error) {
var bs []byte
bs, err = dc.queryRaw(ctx, requestID, host, query, hll)
if err != nil {
return
}
if hll {
var results []queryCom.AQLQueryResult
var errs []error
results, errs, err = queryCom.ParseHLLQueryResults(bs, true)
if err != nil {
dc.logger.With("host", host, "query", query, "error", err, "errors", errs, "hll", hll).Error("datanode query client Query failed")
return
}
if len(results) != 1 {
err = errors.New(fmt.Sprintf("invalid response from datanode, resp: %s", bs))
return
}
result = results[0]
} else {
var respBody aqlRespBody
err = json.Unmarshal(bs, &respBody)
if err != nil || len(respBody.Results) != 1 {
err = errors.New(fmt.Sprintf("invalid response from datanode, resp: %s", bs))
return
}
result = respBody.Results[0]
}
dc.logger.With("host", host, "query", query, "result", result, "hll", hll).Debug("datanode query client Query succeeded")
return
}
func (dc *dataNodeQueryClientImpl) QueryRaw(ctx context.Context, requestID string, host topology.Host, query queryCom.AQLQuery) (bs []byte, err error) {
bs, err = dc.queryRaw(ctx, requestID, host, query, false)
if err == nil {
dc.logger.With("host", host, "query", query).Debug("datanode query client QueryRaw succeeded")
}
return
}
func (dc *dataNodeQueryClientImpl) queryRaw(ctx context.Context, requestID string, host topology.Host, query queryCom.AQLQuery, hll bool) (bs []byte, err error) {
var u *url.URL
if err = ctx.Err(); err != nil {
return
}
if host == nil {
err = utils.StackError(nil, "host is nil")
return
}
u, err = url.Parse(host.Address())
if err != nil {
return
}
u.Scheme = "http"
u.Path = "/query/aql"
q := u.Query()
q.Set("dataonly", "1")
u.RawQuery = q.Encode()
aqlRequestBody := aqlRequestBody{
[]queryCom.AQLQuery{query},
}
var bodyBytes []byte
bodyBytes, err = json.Marshal(aqlRequestBody)
if err != nil {
return
}
var req *http.Request
req, err = http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(bodyBytes))
if err != nil {
return
}
req.Header.Add(requestIDHeaderKey, requestID)
if hll {
req.Header.Add(utils.HTTPAcceptTypeHeaderKey, utils.HTTPContentTypeHyperLogLog)
}
req = req.WithContext(ctx)
var res *http.Response
res, err = dc.client.Do(req)
if res != nil {
defer res.Body.Close()
}
if err != nil {
dc.logger.With("err", err).Error("error connecting to datanode")
err = ErrFailedToConnect
return
}
if res.StatusCode != http.StatusOK {
err = errors.New(fmt.Sprintf("got status code %d from datanode", res.StatusCode))
return
}
bs, err = ReadAll(res.Body)
if err != nil {
bs = nil
}
return
}