mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
Fix race in TSBD while reading/writing histograms (#9051)
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
63232c1e41
commit
9f206a7a05
39
tsdb/head.go
39
tsdb/head.go
|
@ -2374,6 +2374,11 @@ func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries fu
|
|||
return series, true, nil
|
||||
}
|
||||
|
||||
type hist struct {
|
||||
t int64
|
||||
h histogram.SparseHistogram
|
||||
}
|
||||
|
||||
type sample struct {
|
||||
t int64
|
||||
v float64
|
||||
|
@ -2397,6 +2402,7 @@ type memSeries struct {
|
|||
|
||||
nextAt int64 // Timestamp at which to cut the next chunk.
|
||||
sampleBuf [4]sample
|
||||
histBuf [4]hist
|
||||
pendingCommit bool // Whether there are samples waiting to be committed to this series.
|
||||
|
||||
app chunkenc.Appender // Current appender for the chunk.
|
||||
|
@ -2635,10 +2641,15 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen
|
|||
if !ok {
|
||||
c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper)
|
||||
chunkCreated = true
|
||||
|
||||
} else if len(posInterjections) > 0 || len(negInterjections) > 0 {
|
||||
// new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one.
|
||||
s.headChunk.chunk, s.app = app.Recode(posInterjections, negInterjections, sh.PositiveSpans, sh.NegativeSpans)
|
||||
chunk, app := app.Recode(posInterjections, negInterjections, sh.PositiveSpans, sh.NegativeSpans)
|
||||
s.headChunk = &memChunk{
|
||||
minTime: s.headChunk.minTime,
|
||||
maxTime: s.headChunk.maxTime,
|
||||
chunk: chunk,
|
||||
}
|
||||
s.app = app
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2646,6 +2657,11 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen
|
|||
|
||||
c.maxTime = t
|
||||
|
||||
s.histBuf[0] = s.histBuf[1]
|
||||
s.histBuf[1] = s.histBuf[2]
|
||||
s.histBuf[2] = s.histBuf[3]
|
||||
s.histBuf[3] = hist{t: t, h: sh}
|
||||
|
||||
if appendID > 0 {
|
||||
s.txs.add(appendID)
|
||||
}
|
||||
|
@ -2800,6 +2816,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *
|
|||
msIter.total = numSamples
|
||||
msIter.stopAfter = stopAfter
|
||||
msIter.buf = s.sampleBuf
|
||||
msIter.histBuf = s.histBuf
|
||||
return msIter
|
||||
}
|
||||
return &memSafeIterator{
|
||||
|
@ -2808,8 +2825,9 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *
|
|||
i: -1,
|
||||
stopAfter: stopAfter,
|
||||
},
|
||||
total: numSamples,
|
||||
buf: s.sampleBuf,
|
||||
total: numSamples,
|
||||
buf: s.sampleBuf,
|
||||
histBuf: s.histBuf,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2844,8 +2862,9 @@ func (it *stopIterator) Next() bool {
|
|||
type memSafeIterator struct {
|
||||
stopIterator
|
||||
|
||||
total int
|
||||
buf [4]sample
|
||||
total int
|
||||
buf [4]sample
|
||||
histBuf [4]hist
|
||||
}
|
||||
|
||||
func (it *memSafeIterator) Seek(t int64) bool {
|
||||
|
@ -2884,6 +2903,14 @@ func (it *memSafeIterator) At() (int64, float64) {
|
|||
return s.t, s.v
|
||||
}
|
||||
|
||||
func (it *memSafeIterator) AtHistogram() (int64, histogram.SparseHistogram) {
|
||||
if it.total-it.i > 4 {
|
||||
return it.Iterator.AtHistogram()
|
||||
}
|
||||
s := it.histBuf[4-(it.total-it.i)]
|
||||
return s.t, s.h
|
||||
}
|
||||
|
||||
type mmappedChunk struct {
|
||||
ref uint64
|
||||
numSamples uint16
|
||||
|
|
Loading…
Reference in a new issue