tsdb: Fix chunk handling during appendHistogram

Previously, the maxTime wasn't updated properly in case of a recoding
happening.

My apologies for reformatting many lines for line length. During the
bug hunt, I tried to make things more readable in a reasonably wide
editor window.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2022-07-06 14:34:02 +02:00
parent 642c5758ff
commit 5d14046d28
4 changed files with 81 additions and 39 deletions

View file

@ -473,14 +473,14 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) {
a.writeSumDelta(h.Sum)
for i, buck := range h.PositiveBuckets {
delta := buck - a.pBuckets[i]
for i, b := range h.PositiveBuckets {
delta := b - a.pBuckets[i]
dod := delta - a.pBucketsDelta[i]
putVarbitInt(a.b, dod)
a.pBucketsDelta[i] = delta
}
for i, buck := range h.NegativeBuckets {
delta := buck - a.nBuckets[i]
for i, b := range h.NegativeBuckets {
delta := b - a.nBuckets[i]
dod := delta - a.nBucketsDelta[i]
putVarbitInt(a.b, dod)
a.nBucketsDelta[i] = delta
@ -505,7 +505,8 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) {
// Recode converts the current chunk to accommodate an expansion of the set of
// (positive and/or negative) buckets used, according to the provided
// interjections, resulting in the honoring of the provided new positive and
// negative spans.
// negative spans. To continue appending, use the returned Appender rather than
// the receiver of this method.
func (a *HistogramAppender) Recode(
positiveInterjections, negativeInterjections []Interjection,
positiveSpans, negativeSpans []histogram.Span,
@ -513,7 +514,7 @@ func (a *HistogramAppender) Recode(
// TODO(beorn7): This currently just decodes everything and then encodes
// it again with the new span layout. This can probably be done in-place
// by editing the chunk. But let's first see how expensive it is in the
// big picture.
// big picture. Also, in-place editing might create concurrency issues.
byts := a.b.bytes()
it := newHistogramIterator(byts)
hc := NewHistogramChunk()

View file

@ -768,7 +768,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
chksIter := s.Iterator()
chks = chks[:0]
for chksIter.Next() {
// We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and
// We are not iterating in streaming way over chunk as
// it's more efficient to do bulk write for index and
// chunk file purposes.
chks = append(chks, chksIter.At())
}

View file

@ -634,28 +634,31 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper chunk reference afterwards.
// We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the meta properly.
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable before
// appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the
// meta properly.
app, _ := s.app.(*chunkenc.HistogramAppender)
var (
positiveInterjections, negativeInterjections []chunkenc.Interjection
okToAppend, counterReset bool
)
if app != nil {
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
}
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
if app != nil {
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
}
if !chunkCreated {
// We have 3 cases here
// - !okToAppend -> We need to cut a new chunk.
// - okToAppend but we have interjections -> Existing chunk needs recoding before we can append our histogram.
// - okToAppend and no interjections -> Chunk is ready to support our histogram.
// - okToAppend but we have interjections → Existing chunk needs
// recoding before we can append our histogram.
// - okToAppend and no interjections → Chunk is ready to support our histogram.
if !okToAppend || counterReset {
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper)
chunkCreated = true
@ -663,12 +666,11 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// New buckets have appeared. We need to recode all
// prior histogram samples within the chunk before we
// can process this one.
chunk, app := app.Recode(positiveInterjections, negativeInterjections, h.PositiveSpans, h.NegativeSpans)
s.headChunk = &memChunk{
minTime: s.headChunk.minTime,
maxTime: s.headChunk.maxTime,
chunk: chunk,
}
chunk, app := app.Recode(
positiveInterjections, negativeInterjections,
h.PositiveSpans, h.NegativeSpans,
)
c.chunk = chunk
s.app = app
}
}
@ -704,7 +706,9 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) (c *memChunk, sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendPreprocessor(
t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper,
) (c *memChunk, sampleInOrder, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
@ -774,7 +778,9 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/n
}
func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
func (s *memSeries) cutNewHeadChunk(
mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper,
) *memChunk {
s.mmapCurrentHeadChunk(chunkDiskMapper)
s.headChunk = &memChunk{

View file

@ -528,16 +528,20 @@ func (b *blockBaseSeriesSet) Err() error {
func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }
// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures
// that chunks are trimmed based on given tombstones interval if any.
// populateWithDelGenericSeriesIterator allows to iterate over given chunk
// metas. In each iteration it ensures that chunks are trimmed based on given
// tombstones interval if any.
//
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase.
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully
// removed by intervals are filtered out in previous phase.
//
// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta
// is invalid and chunk rewrite is needed, currDelIter should be used.
// On each iteration currChkMeta is available. If currDelIter is not nil, it
// means that the chunk iterator in currChkMeta is invalid and a chunk rewrite
// is needed, for which currDelIter should be used.
type populateWithDelGenericSeriesIterator struct {
chunks ChunkReader
// chks are expected to be sorted by minTime and should be related to the same, single series.
// chks are expected to be sorted by minTime and should be related to
// the same, single series.
chks []chunks.Meta
i int
@ -589,15 +593,17 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block or just fetching chunks from TSDB.
//
// TODO think how to avoid the typecasting to verify when it is head block.
// TODO(codesome): think how to avoid the typecasting to verify when it is head block.
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
// If there is no overlap with deletion intervals AND it's NOT
// an "open" head chunk, we can take chunk as it is.
p.currDelIter = nil
return true
}
// We don't want full chunk or it's potentially still opened, take just part of it.
// We don't want the full chunk, or it's potentially still opened, take
// just a part of it.
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
return true
@ -703,12 +709,14 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
return false
}
// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
// Empty chunk, this should not happen, as we assume full
// deletions being filtered before this iterator.
p.err = errors.New("populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
return false
}
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
// Re-encode the chunk if iterator is provider. This means that it has
// some samples to be deleted or chunk is opened.
var (
newChunk chunkenc.Chunk
app chunkenc.Appender
@ -727,10 +735,33 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram()
p.curr.MinTime = t
app.AppendHistogram(t, h)
for p.currDelIter.Next() == chunkenc.ValHistogram {
// TODO(beorn7): Is it possible that the value type changes during iteration?
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
}
t, h = p.currDelIter.AtHistogram()
// Defend against corrupted chunks.
pI, nI, okToAppend, counterReset := app.(*chunkenc.HistogramAppender).Appendable(h)
if len(pI)+len(nI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
len(pI), len(nI),
)
break
}
if counterReset {
err = errors.New("detected unexpected counter reset in histogram")
break
}
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
app.AppendHistogram(t, h)
}
case chunkenc.ValFloat:
@ -742,8 +773,11 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
t, v = p.currDelIter.At()
p.curr.MinTime = t
app.Append(t, v)
for p.currDelIter.Next() == chunkenc.ValFloat {
// TODO(beorn7): Is it possible that the value type changes during iteration?
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloat {
err = fmt.Errorf("found value type %v in float chunk", vt)
break
}
t, v = p.currDelIter.At()
app.Append(t, v)
}