diff --git a/tsdb/head.go b/tsdb/head.go index 06a7721968..0787bb3d09 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2374,6 +2374,11 @@ func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries fu return series, true, nil } +type hist struct { + t int64 + h histogram.SparseHistogram +} + type sample struct { t int64 v float64 @@ -2397,6 +2402,7 @@ type memSeries struct { nextAt int64 // Timestamp at which to cut the next chunk. sampleBuf [4]sample + histBuf [4]hist pendingCommit bool // Whether there are samples waiting to be committed to this series. app chunkenc.Appender // Current appender for the chunk. @@ -2635,10 +2641,15 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen if !ok { c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper) chunkCreated = true - } else if len(posInterjections) > 0 || len(negInterjections) > 0 { // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. - s.headChunk.chunk, s.app = app.Recode(posInterjections, negInterjections, sh.PositiveSpans, sh.NegativeSpans) + chunk, app := app.Recode(posInterjections, negInterjections, sh.PositiveSpans, sh.NegativeSpans) + s.headChunk = &memChunk{ + minTime: s.headChunk.minTime, + maxTime: s.headChunk.maxTime, + chunk: chunk, + } + s.app = app } } @@ -2646,6 +2657,11 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen c.maxTime = t + s.histBuf[0] = s.histBuf[1] + s.histBuf[1] = s.histBuf[2] + s.histBuf[2] = s.histBuf[3] + s.histBuf[3] = hist{t: t, h: sh} + if appendID > 0 { s.txs.add(appendID) } @@ -2800,6 +2816,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper * msIter.total = numSamples msIter.stopAfter = stopAfter msIter.buf = s.sampleBuf + msIter.histBuf = s.histBuf return msIter } return &memSafeIterator{ @@ -2808,8 +2825,9 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper * i: -1, stopAfter: stopAfter, }, - total: numSamples, - buf: s.sampleBuf, + total: numSamples, + buf: s.sampleBuf, + histBuf: s.histBuf, } } @@ -2844,8 +2862,9 @@ func (it *stopIterator) Next() bool { type memSafeIterator struct { stopIterator - total int - buf [4]sample + total int + buf [4]sample + histBuf [4]hist } func (it *memSafeIterator) Seek(t int64) bool { @@ -2884,6 +2903,14 @@ func (it *memSafeIterator) At() (int64, float64) { return s.t, s.v } +func (it *memSafeIterator) AtHistogram() (int64, histogram.SparseHistogram) { + if it.total-it.i > 4 { + return it.Iterator.AtHistogram() + } + s := it.histBuf[4-(it.total-it.i)] + return s.t, s.h +} + type mmappedChunk struct { ref uint64 numSamples uint16