func()

in tsdb/querier.go [799:1008]


func (p *populateWithDelChunkSeriesIterator) Next() bool {
	if !p.next(true) {
		return false
	}
	p.curr = p.currChkMeta
	if p.currDelIter == nil {
		return true
	}
	valueType := p.currDelIter.Next()
	if valueType == chunkenc.ValNone {
		if err := p.currDelIter.Err(); err != nil {
			p.err = errors.Wrap(err, "iterate chunk while re-encoding")
		}
		return false
	}

	// Re-encode the chunk if iterator is provider. This means that it has
	// some samples to be deleted or chunk is opened.
	var (
		newChunk chunkenc.Chunk
		app      chunkenc.Appender
		t        int64
		err      error
	)
	switch valueType {
	case chunkenc.ValHistogram:
		newChunk = chunkenc.NewHistogramChunk()
		if app, err = newChunk.Appender(); err != nil {
			break
		}

		switch hc := p.currChkMeta.Chunk.(type) {
		case *chunkenc.HistogramChunk:
			newChunk.(*chunkenc.HistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
		case *safeHeadChunk:
			if unwrapped, ok := hc.Chunk.(*chunkenc.HistogramChunk); ok {
				newChunk.(*chunkenc.HistogramChunk).SetCounterResetHeader(unwrapped.GetCounterResetHeader())
			} else {
				err = fmt.Errorf("internal error, could not unwrap safeHeadChunk to histogram chunk: %T", hc.Chunk)
			}
		default:
			err = fmt.Errorf("internal error, unknown chunk type %T when expecting histogram", p.currChkMeta.Chunk)
		}
		if err != nil {
			break
		}

		var h *histogram.Histogram
		t, h = p.currDelIter.AtHistogram()
		p.curr.MinTime = t

		// Detect missing gauge reset hint.
		if h.CounterResetHint == histogram.GaugeType && newChunk.(*chunkenc.HistogramChunk).GetCounterResetHeader() != chunkenc.GaugeType {
			err = fmt.Errorf("found gauge histogram in non gauge chunk")
			break
		}

		app.AppendHistogram(t, h)

		for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
			if vt != chunkenc.ValHistogram {
				err = fmt.Errorf("found value type %v in histogram chunk", vt)
				break
			}
			t, h = p.currDelIter.AtHistogram()

			// Defend against corrupted chunks.
			if h.CounterResetHint == histogram.GaugeType {
				pI, nI, bpI, bnI, _, _, okToAppend := app.(*chunkenc.HistogramAppender).AppendableGauge(h)
				if !okToAppend {
					err = errors.New("unable to append histogram due to unexpected schema change")
					break
				}
				if len(pI)+len(nI)+len(bpI)+len(bnI) > 0 {
					err = fmt.Errorf(
						"bucket layout has changed unexpectedly: forward %d positive, %d negative, backward %d positive %d negative bucket interjections required",
						len(pI), len(nI), len(bpI), len(bnI),
					)
					break
				}
			} else {
				pI, nI, okToAppend, counterReset := app.(*chunkenc.HistogramAppender).Appendable(h)
				if len(pI)+len(nI) > 0 {
					err = fmt.Errorf(
						"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
						len(pI), len(nI),
					)
					break
				}
				if counterReset {
					err = errors.New("detected unexpected counter reset in histogram")
					break
				}
				if !okToAppend {
					err = errors.New("unable to append histogram due to unexpected schema change")
					break
				}
			}
			app.AppendHistogram(t, h)
		}
	case chunkenc.ValFloat:
		newChunk = chunkenc.NewXORChunk()
		if app, err = newChunk.Appender(); err != nil {
			break
		}
		var v float64
		t, v = p.currDelIter.At()
		p.curr.MinTime = t
		app.Append(t, v)
		for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
			if vt != chunkenc.ValFloat {
				err = fmt.Errorf("found value type %v in float chunk", vt)
				break
			}
			t, v = p.currDelIter.At()
			app.Append(t, v)
		}
	case chunkenc.ValFloatHistogram:
		newChunk = chunkenc.NewFloatHistogramChunk()
		if app, err = newChunk.Appender(); err != nil {
			break
		}

		switch hc := p.currChkMeta.Chunk.(type) {
		case *chunkenc.FloatHistogramChunk:
			newChunk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
		case *safeHeadChunk:
			if unwrapped, ok := hc.Chunk.(*chunkenc.FloatHistogramChunk); ok {
				newChunk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(unwrapped.GetCounterResetHeader())
			} else {
				err = fmt.Errorf("internal error, could not unwrap safeHeadChunk to float histogram chunk: %T", hc.Chunk)
			}
		default:
			err = fmt.Errorf("internal error, unknown chunk type %T when expecting float histogram", p.currChkMeta.Chunk)
		}
		if err != nil {
			break
		}

		var h *histogram.FloatHistogram
		t, h = p.currDelIter.AtFloatHistogram()
		p.curr.MinTime = t

		// Detect missing gauge reset hint.
		if h.CounterResetHint == histogram.GaugeType && newChunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader() != chunkenc.GaugeType {
			err = fmt.Errorf("found float gauge histogram in non gauge chunk")
			break
		}

		app.AppendFloatHistogram(t, h)

		for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
			if vt != chunkenc.ValFloatHistogram {
				err = fmt.Errorf("found value type %v in histogram chunk", vt)
				break
			}
			t, h = p.currDelIter.AtFloatHistogram()

			// Defend against corrupted chunks.
			if h.CounterResetHint == histogram.GaugeType {
				pI, nI, bpI, bnI, _, _, okToAppend := app.(*chunkenc.FloatHistogramAppender).AppendableGauge(h)
				if !okToAppend {
					err = errors.New("unable to append histogram due to unexpected schema change")
					break
				}
				if len(pI)+len(nI)+len(bpI)+len(bnI) > 0 {
					err = fmt.Errorf(
						"bucket layout has changed unexpectedly: forward %d positive, %d negative, backward %d positive %d negative bucket interjections required",
						len(pI), len(nI), len(bpI), len(bnI),
					)
					break
				}
			} else {
				pI, nI, okToAppend, counterReset := app.(*chunkenc.FloatHistogramAppender).Appendable(h)
				if len(pI)+len(nI) > 0 {
					err = fmt.Errorf(
						"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
						len(pI), len(nI),
					)
					break
				}
				if counterReset {
					err = errors.New("detected unexpected counter reset in histogram")
					break
				}
				if !okToAppend {
					err = errors.New("unable to append histogram due to unexpected schema change")
					break
				}
			}

			app.AppendFloatHistogram(t, h)
		}
	default:
		err = fmt.Errorf("populateWithDelChunkSeriesIterator: value type %v unsupported", valueType)
	}

	if err != nil {
		p.err = errors.Wrap(err, "iterate chunk while re-encoding")
		return false
	}
	if err := p.currDelIter.Err(); err != nil {
		p.err = errors.Wrap(err, "iterate chunk while re-encoding")
		return false
	}

	p.curr.Chunk = newChunk
	p.curr.MaxTime = t
	return true
}