in tsdb/querier.go [799:1008]
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next(true) {
return false
}
p.curr = p.currChkMeta
if p.currDelIter == nil {
return true
}
valueType := p.currDelIter.Next()
if valueType == chunkenc.ValNone {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
}
return false
}
// Re-encode the chunk if iterator is provider. This means that it has
// some samples to be deleted or chunk is opened.
var (
newChunk chunkenc.Chunk
app chunkenc.Appender
t int64
err error
)
switch valueType {
case chunkenc.ValHistogram:
newChunk = chunkenc.NewHistogramChunk()
if app, err = newChunk.Appender(); err != nil {
break
}
switch hc := p.currChkMeta.Chunk.(type) {
case *chunkenc.HistogramChunk:
newChunk.(*chunkenc.HistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
case *safeHeadChunk:
if unwrapped, ok := hc.Chunk.(*chunkenc.HistogramChunk); ok {
newChunk.(*chunkenc.HistogramChunk).SetCounterResetHeader(unwrapped.GetCounterResetHeader())
} else {
err = fmt.Errorf("internal error, could not unwrap safeHeadChunk to histogram chunk: %T", hc.Chunk)
}
default:
err = fmt.Errorf("internal error, unknown chunk type %T when expecting histogram", p.currChkMeta.Chunk)
}
if err != nil {
break
}
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram()
p.curr.MinTime = t
// Detect missing gauge reset hint.
if h.CounterResetHint == histogram.GaugeType && newChunk.(*chunkenc.HistogramChunk).GetCounterResetHeader() != chunkenc.GaugeType {
err = fmt.Errorf("found gauge histogram in non gauge chunk")
break
}
app.AppendHistogram(t, h)
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
}
t, h = p.currDelIter.AtHistogram()
// Defend against corrupted chunks.
if h.CounterResetHint == histogram.GaugeType {
pI, nI, bpI, bnI, _, _, okToAppend := app.(*chunkenc.HistogramAppender).AppendableGauge(h)
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
if len(pI)+len(nI)+len(bpI)+len(bnI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: forward %d positive, %d negative, backward %d positive %d negative bucket interjections required",
len(pI), len(nI), len(bpI), len(bnI),
)
break
}
} else {
pI, nI, okToAppend, counterReset := app.(*chunkenc.HistogramAppender).Appendable(h)
if len(pI)+len(nI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
len(pI), len(nI),
)
break
}
if counterReset {
err = errors.New("detected unexpected counter reset in histogram")
break
}
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
}
app.AppendHistogram(t, h)
}
case chunkenc.ValFloat:
newChunk = chunkenc.NewXORChunk()
if app, err = newChunk.Appender(); err != nil {
break
}
var v float64
t, v = p.currDelIter.At()
p.curr.MinTime = t
app.Append(t, v)
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloat {
err = fmt.Errorf("found value type %v in float chunk", vt)
break
}
t, v = p.currDelIter.At()
app.Append(t, v)
}
case chunkenc.ValFloatHistogram:
newChunk = chunkenc.NewFloatHistogramChunk()
if app, err = newChunk.Appender(); err != nil {
break
}
switch hc := p.currChkMeta.Chunk.(type) {
case *chunkenc.FloatHistogramChunk:
newChunk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
case *safeHeadChunk:
if unwrapped, ok := hc.Chunk.(*chunkenc.FloatHistogramChunk); ok {
newChunk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(unwrapped.GetCounterResetHeader())
} else {
err = fmt.Errorf("internal error, could not unwrap safeHeadChunk to float histogram chunk: %T", hc.Chunk)
}
default:
err = fmt.Errorf("internal error, unknown chunk type %T when expecting float histogram", p.currChkMeta.Chunk)
}
if err != nil {
break
}
var h *histogram.FloatHistogram
t, h = p.currDelIter.AtFloatHistogram()
p.curr.MinTime = t
// Detect missing gauge reset hint.
if h.CounterResetHint == histogram.GaugeType && newChunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader() != chunkenc.GaugeType {
err = fmt.Errorf("found float gauge histogram in non gauge chunk")
break
}
app.AppendFloatHistogram(t, h)
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloatHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
}
t, h = p.currDelIter.AtFloatHistogram()
// Defend against corrupted chunks.
if h.CounterResetHint == histogram.GaugeType {
pI, nI, bpI, bnI, _, _, okToAppend := app.(*chunkenc.FloatHistogramAppender).AppendableGauge(h)
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
if len(pI)+len(nI)+len(bpI)+len(bnI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: forward %d positive, %d negative, backward %d positive %d negative bucket interjections required",
len(pI), len(nI), len(bpI), len(bnI),
)
break
}
} else {
pI, nI, okToAppend, counterReset := app.(*chunkenc.FloatHistogramAppender).Appendable(h)
if len(pI)+len(nI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
len(pI), len(nI),
)
break
}
if counterReset {
err = errors.New("detected unexpected counter reset in histogram")
break
}
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
}
app.AppendFloatHistogram(t, h)
}
default:
err = fmt.Errorf("populateWithDelChunkSeriesIterator: value type %v unsupported", valueType)
}
if err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
p.curr.Chunk = newChunk
p.curr.MaxTime = t
return true
}