mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-26 13:11:11 -08:00
tsdb: Fix chunk handling during histogram recoding
Previously, the maxTime wasn't updated properly in case of a recoding happening. My apologies for reformatting many lines for line length. During the bug hunt, I tried to make things more readable in a reasonably wide editor window. Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
parent
1dc732c130
commit
49be0784b4
|
@ -473,14 +473,14 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) {
|
|||
|
||||
a.writeSumDelta(h.Sum)
|
||||
|
||||
for i, buck := range h.PositiveBuckets {
|
||||
delta := buck - a.pBuckets[i]
|
||||
for i, b := range h.PositiveBuckets {
|
||||
delta := b - a.pBuckets[i]
|
||||
dod := delta - a.pBucketsDelta[i]
|
||||
putVarbitInt(a.b, dod)
|
||||
a.pBucketsDelta[i] = delta
|
||||
}
|
||||
for i, buck := range h.NegativeBuckets {
|
||||
delta := buck - a.nBuckets[i]
|
||||
for i, b := range h.NegativeBuckets {
|
||||
delta := b - a.nBuckets[i]
|
||||
dod := delta - a.nBucketsDelta[i]
|
||||
putVarbitInt(a.b, dod)
|
||||
a.nBucketsDelta[i] = delta
|
||||
|
@ -505,7 +505,8 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) {
|
|||
// Recode converts the current chunk to accommodate an expansion of the set of
|
||||
// (positive and/or negative) buckets used, according to the provided
|
||||
// interjections, resulting in the honoring of the provided new positive and
|
||||
// negative spans.
|
||||
// negative spans. To continue appending, use the returned Appender rather than
|
||||
// the receiver of this method.
|
||||
func (a *HistogramAppender) Recode(
|
||||
positiveInterjections, negativeInterjections []Interjection,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
|
@ -513,7 +514,7 @@ func (a *HistogramAppender) Recode(
|
|||
// TODO(beorn7): This currently just decodes everything and then encodes
|
||||
// it again with the new span layout. This can probably be done in-place
|
||||
// by editing the chunk. But let's first see how expensive it is in the
|
||||
// big picture.
|
||||
// big picture. Also, in-place editing might create concurrency issues.
|
||||
byts := a.b.bytes()
|
||||
it := newHistogramIterator(byts)
|
||||
hc := NewHistogramChunk()
|
||||
|
|
|
@ -768,7 +768,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
chksIter := s.Iterator()
|
||||
chks = chks[:0]
|
||||
for chksIter.Next() {
|
||||
// We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and
|
||||
// We are not iterating in streaming way over chunk as
|
||||
// it's more efficient to do bulk write for index and
|
||||
// chunk file purposes.
|
||||
chks = append(chks, chksIter.At())
|
||||
}
|
||||
|
|
|
@ -634,28 +634,31 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
|
|||
// appendHistogram adds the histogram.
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
|
||||
// Head controls the execution of recoding, so that we own the proper chunk reference afterwards.
|
||||
// We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk,
|
||||
// we need to know if there was also a counter reset or not to set the meta properly.
|
||||
// Head controls the execution of recoding, so that we own the proper
|
||||
// chunk reference afterwards. We check for Appendable before
|
||||
// appendPreprocessor because in case it ends up creating a new chunk,
|
||||
// we need to know if there was also a counter reset or not to set the
|
||||
// meta properly.
|
||||
app, _ := s.app.(*chunkenc.HistogramAppender)
|
||||
var (
|
||||
positiveInterjections, negativeInterjections []chunkenc.Interjection
|
||||
okToAppend, counterReset bool
|
||||
)
|
||||
if app != nil {
|
||||
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
|
||||
}
|
||||
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
|
||||
if app != nil {
|
||||
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
|
||||
}
|
||||
|
||||
if !chunkCreated {
|
||||
// We have 3 cases here
|
||||
// - !okToAppend -> We need to cut a new chunk.
|
||||
// - okToAppend but we have interjections -> Existing chunk needs recoding before we can append our histogram.
|
||||
// - okToAppend and no interjections -> Chunk is ready to support our histogram.
|
||||
// - okToAppend but we have interjections → Existing chunk needs
|
||||
// recoding before we can append our histogram.
|
||||
// - okToAppend and no interjections → Chunk is ready to support our histogram.
|
||||
if !okToAppend || counterReset {
|
||||
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper)
|
||||
chunkCreated = true
|
||||
|
@ -663,12 +666,11 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
|||
// New buckets have appeared. We need to recode all
|
||||
// prior histogram samples within the chunk before we
|
||||
// can process this one.
|
||||
chunk, app := app.Recode(positiveInterjections, negativeInterjections, h.PositiveSpans, h.NegativeSpans)
|
||||
s.headChunk = &memChunk{
|
||||
minTime: s.headChunk.minTime,
|
||||
maxTime: s.headChunk.maxTime,
|
||||
chunk: chunk,
|
||||
}
|
||||
chunk, app := app.Recode(
|
||||
positiveInterjections, negativeInterjections,
|
||||
h.PositiveSpans, h.NegativeSpans,
|
||||
)
|
||||
c.chunk = chunk
|
||||
s.app = app
|
||||
}
|
||||
}
|
||||
|
@ -704,7 +706,9 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
|||
// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks.
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
// This should be called only when appending data.
|
||||
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) (c *memChunk, sampleInOrder, chunkCreated bool) {
|
||||
func (s *memSeries) appendPreprocessor(
|
||||
t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper,
|
||||
) (c *memChunk, sampleInOrder, chunkCreated bool) {
|
||||
// Based on Gorilla white papers this offers near-optimal compression ratio
|
||||
// so anything bigger that this has diminishing returns and increases
|
||||
// the time range within which we have to decompress all samples.
|
||||
|
@ -774,7 +778,9 @@ func computeChunkEndTime(start, cur, max int64) int64 {
|
|||
return start + (max-start)/n
|
||||
}
|
||||
|
||||
func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
|
||||
func (s *memSeries) cutNewHeadChunk(
|
||||
mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper,
|
||||
) *memChunk {
|
||||
s.mmapCurrentHeadChunk(chunkDiskMapper)
|
||||
|
||||
s.headChunk = &memChunk{
|
||||
|
|
|
@ -528,16 +528,20 @@ func (b *blockBaseSeriesSet) Err() error {
|
|||
|
||||
func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }
|
||||
|
||||
// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures
|
||||
// that chunks are trimmed based on given tombstones interval if any.
|
||||
// populateWithDelGenericSeriesIterator allows to iterate over given chunk
|
||||
// metas. In each iteration it ensures that chunks are trimmed based on given
|
||||
// tombstones interval if any.
|
||||
//
|
||||
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase.
|
||||
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully
|
||||
// removed by intervals are filtered out in previous phase.
|
||||
//
|
||||
// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta
|
||||
// is invalid and chunk rewrite is needed, currDelIter should be used.
|
||||
// On each iteration currChkMeta is available. If currDelIter is not nil, it
|
||||
// means that the chunk iterator in currChkMeta is invalid and a chunk rewrite
|
||||
// is needed, for which currDelIter should be used.
|
||||
type populateWithDelGenericSeriesIterator struct {
|
||||
chunks ChunkReader
|
||||
// chks are expected to be sorted by minTime and should be related to the same, single series.
|
||||
// chks are expected to be sorted by minTime and should be related to
|
||||
// the same, single series.
|
||||
chks []chunks.Meta
|
||||
|
||||
i int
|
||||
|
@ -589,15 +593,17 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
|
|||
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
|
||||
// This happens when snapshotting the head block or just fetching chunks from TSDB.
|
||||
//
|
||||
// TODO think how to avoid the typecasting to verify when it is head block.
|
||||
// TODO(codesome): think how to avoid the typecasting to verify when it is head block.
|
||||
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
|
||||
if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
|
||||
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
|
||||
// If there is no overlap with deletion intervals AND it's NOT
|
||||
// an "open" head chunk, we can take chunk as it is.
|
||||
p.currDelIter = nil
|
||||
return true
|
||||
}
|
||||
|
||||
// We don't want full chunk or it's potentially still opened, take just part of it.
|
||||
// We don't want the full chunk, or it's potentially still opened, take
|
||||
// just a part of it.
|
||||
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil)
|
||||
p.currDelIter = p.bufIter
|
||||
return true
|
||||
|
@ -703,12 +709,14 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
|
||||
// Empty chunk, this should not happen, as we assume full
|
||||
// deletions being filtered before this iterator.
|
||||
p.err = errors.New("populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
|
||||
return false
|
||||
}
|
||||
|
||||
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
|
||||
// Re-encode the chunk if iterator is provider. This means that it has
|
||||
// some samples to be deleted or chunk is opened.
|
||||
var (
|
||||
newChunk chunkenc.Chunk
|
||||
app chunkenc.Appender
|
||||
|
@ -728,8 +736,10 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
|
|||
t, h = p.currDelIter.AtHistogram()
|
||||
p.curr.MinTime = t
|
||||
app.AppendHistogram(t, h)
|
||||
for p.currDelIter.Next() == chunkenc.ValHistogram {
|
||||
// TODO(beorn7): Is it possible that the value type changes during iteration?
|
||||
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
|
||||
if vt != chunkenc.ValHistogram {
|
||||
panic(fmt.Errorf("found value type %v in histogram chunk", vt))
|
||||
}
|
||||
t, h = p.currDelIter.AtHistogram()
|
||||
app.AppendHistogram(t, h)
|
||||
}
|
||||
|
@ -742,8 +752,10 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
|
|||
t, v = p.currDelIter.At()
|
||||
p.curr.MinTime = t
|
||||
app.Append(t, v)
|
||||
for p.currDelIter.Next() == chunkenc.ValFloat {
|
||||
// TODO(beorn7): Is it possible that the value type changes during iteration?
|
||||
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
|
||||
if vt != chunkenc.ValFloat {
|
||||
panic(fmt.Errorf("found value type %v in float chunk", vt))
|
||||
}
|
||||
t, v = p.currDelIter.At()
|
||||
app.Append(t, v)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue