func()

in broker/query_plan_non_agg.go [164:306]


func (nqp *NonAggQueryPlan) Execute(ctx context.Context, w http.ResponseWriter) (err error) {
	var headersBytes []byte
	headersBytes, err = json.Marshal(nqp.headers)
	if err != nil {
		return
	}
	_, err = w.Write([]byte(`{"headers":`))
	if err != nil {
		return
	}
	_, err = w.Write(headersBytes)
	if err != nil {
		return
	}
	_, err = w.Write([]byte(`,"matrixData":[`))
	if err != nil {
		return
	}

	for _, node := range nqp.nodes {
		go func(n *StreamingScanNode) {
			var bs []byte
			bs, err = n.Execute(ctx)
			utils.GetLogger().With("dataSize", len(bs), "error", err).Debug("sending result to result channel")
			select {
			case <-nqp.doneChan:
				utils.GetLogger().Debug("cancel pushing to result channel")
				return
			case nqp.resultChan <- streamingScanNoderesult{
				data: bs,
				err:  err,
			}:
			}
		}(node)
	}

	dataNodeWaitStart := utils.Now()

	// the first result
	processedFirtBatch := false
	for i := 0; i < len(nqp.nodes); i++ {
		if nqp.getRowsWanted() == 0 {
			utils.GetLogger().Debug("got enough rows, exiting")
			close(nqp.doneChan)
			break
		}
		res := <-nqp.resultChan

		if i == 0 {
			// only log time waited for the fastest datanode for now
			utils.GetRootReporter().GetTimer(utils.TimeWaitedForDataNode).Record(utils.Now().Sub(dataNodeWaitStart))
		}

		if res.err != nil {
			err = res.err
			return
		}

		if len(res.data) == 0 {
			continue
		}

		// write rows
		if nqp.qc.AQLQuery.Limit < 0 && len(nqp.qc.DimensionEnumReverseDicts) == 0 {
			// no limit, nor need to translate enums, flush data directly
			utils.GetLogger().Debug("flushing without deserializing")
			if processedFirtBatch {
				w.Write([]byte(`,`))
			}
			w.Write(res.data)
		} else {
			// we have to deserialize
			if len(res.data) == 0 {
				utils.GetLogger().Debug("skipping on empty response")
				continue
			}

			serDeStart := utils.Now()

			res.data = append([]byte("["), res.data[:]...)
			res.data = append(res.data, byte(']'))
			var resultData [][]interface{}
			err = json.Unmarshal(res.data, &resultData)

			if err != nil {
				return
			}

			// translate enum
			for _, row := range resultData {
				for i, col := range row {
					if enumReverseDict, exists := nqp.qc.DimensionEnumReverseDicts[i]; exists {
						if s, ok := col.(string); ok {
							if common.NULLString == s {
								continue
							}
							var enumRank int
							enumRank, err = strconv.Atoi(s)
							if err != nil {
								return utils.StackError(err, "failed to translate enum at col %d of row %s", i, row)
							}
							if enumRank < 0 || enumRank >= len(enumReverseDict) {
								return utils.StackError(err, "failed to translate enum at col %d of row %s", i, row)
							}
							row[i] = enumReverseDict[enumRank]
						} else {
							return utils.StackError(nil, "failed to translate enum at col %d of row %s", i, row)
						}
					}

				}
			}

			var dataToFlush [][]interface{}
			if nqp.qc.AQLQuery.Limit < 0 {
				dataToFlush = resultData
			} else {
				rowsToFlush := nqp.getRowsWanted()
				if rowsToFlush > len(resultData) {
					rowsToFlush = len(resultData)
				}
				dataToFlush = resultData[:rowsToFlush]
			}
			var bs []byte
			bs, err = json.Marshal(dataToFlush)
			if err != nil {
				return
			}
			if processedFirtBatch {
				w.Write([]byte(`,`))
			}
			// strip brackets
			w.Write(bs[1 : len(bs)-1])
			nqp.flushed += len(dataToFlush)
			utils.GetLogger().With("nrows", len(dataToFlush)).Debug("flushed rows")
			utils.GetRootReporter().GetTimer(utils.TimeSerDeDataNodeResponse).Record(utils.Now().Sub(serDeStart))
		}
		processedFirtBatch = true
	}

	_, err = w.Write([]byte(`]}`))
	return
}