in broker/query_plan_non_agg.go [164:306]
func (nqp *NonAggQueryPlan) Execute(ctx context.Context, w http.ResponseWriter) (err error) {
var headersBytes []byte
headersBytes, err = json.Marshal(nqp.headers)
if err != nil {
return
}
_, err = w.Write([]byte(`{"headers":`))
if err != nil {
return
}
_, err = w.Write(headersBytes)
if err != nil {
return
}
_, err = w.Write([]byte(`,"matrixData":[`))
if err != nil {
return
}
for _, node := range nqp.nodes {
go func(n *StreamingScanNode) {
var bs []byte
bs, err = n.Execute(ctx)
utils.GetLogger().With("dataSize", len(bs), "error", err).Debug("sending result to result channel")
select {
case <-nqp.doneChan:
utils.GetLogger().Debug("cancel pushing to result channel")
return
case nqp.resultChan <- streamingScanNoderesult{
data: bs,
err: err,
}:
}
}(node)
}
dataNodeWaitStart := utils.Now()
// the first result
processedFirtBatch := false
for i := 0; i < len(nqp.nodes); i++ {
if nqp.getRowsWanted() == 0 {
utils.GetLogger().Debug("got enough rows, exiting")
close(nqp.doneChan)
break
}
res := <-nqp.resultChan
if i == 0 {
// only log time waited for the fastest datanode for now
utils.GetRootReporter().GetTimer(utils.TimeWaitedForDataNode).Record(utils.Now().Sub(dataNodeWaitStart))
}
if res.err != nil {
err = res.err
return
}
if len(res.data) == 0 {
continue
}
// write rows
if nqp.qc.AQLQuery.Limit < 0 && len(nqp.qc.DimensionEnumReverseDicts) == 0 {
// no limit, nor need to translate enums, flush data directly
utils.GetLogger().Debug("flushing without deserializing")
if processedFirtBatch {
w.Write([]byte(`,`))
}
w.Write(res.data)
} else {
// we have to deserialize
if len(res.data) == 0 {
utils.GetLogger().Debug("skipping on empty response")
continue
}
serDeStart := utils.Now()
res.data = append([]byte("["), res.data[:]...)
res.data = append(res.data, byte(']'))
var resultData [][]interface{}
err = json.Unmarshal(res.data, &resultData)
if err != nil {
return
}
// translate enum
for _, row := range resultData {
for i, col := range row {
if enumReverseDict, exists := nqp.qc.DimensionEnumReverseDicts[i]; exists {
if s, ok := col.(string); ok {
if common.NULLString == s {
continue
}
var enumRank int
enumRank, err = strconv.Atoi(s)
if err != nil {
return utils.StackError(err, "failed to translate enum at col %d of row %s", i, row)
}
if enumRank < 0 || enumRank >= len(enumReverseDict) {
return utils.StackError(err, "failed to translate enum at col %d of row %s", i, row)
}
row[i] = enumReverseDict[enumRank]
} else {
return utils.StackError(nil, "failed to translate enum at col %d of row %s", i, row)
}
}
}
}
var dataToFlush [][]interface{}
if nqp.qc.AQLQuery.Limit < 0 {
dataToFlush = resultData
} else {
rowsToFlush := nqp.getRowsWanted()
if rowsToFlush > len(resultData) {
rowsToFlush = len(resultData)
}
dataToFlush = resultData[:rowsToFlush]
}
var bs []byte
bs, err = json.Marshal(dataToFlush)
if err != nil {
return
}
if processedFirtBatch {
w.Write([]byte(`,`))
}
// strip brackets
w.Write(bs[1 : len(bs)-1])
nqp.flushed += len(dataToFlush)
utils.GetLogger().With("nrows", len(dataToFlush)).Debug("flushed rows")
utils.GetRootReporter().GetTimer(utils.TimeSerDeDataNodeResponse).Record(utils.Now().Sub(serDeStart))
}
processedFirtBatch = true
}
_, err = w.Write([]byte(`]}`))
return
}