mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Simplify how OutOfOrderTimeWindow works (#285)
* Simplify how OutOfOrderTimeWindow works Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Update test Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
c6f3d4ab33
commit
a632c73352
|
@ -314,12 +314,6 @@ func main() {
|
||||||
serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL.").
|
serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL.").
|
||||||
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
|
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
|
||||||
|
|
||||||
serverOnlyFlag(a, "storage.tsdb.out-of-order-cap-min", "Minimum capacity for out of order chunks (in samples. between 0 and 255.)").
|
|
||||||
Hidden().Default("4").IntVar(&cfg.tsdb.OutOfOrderCapMin)
|
|
||||||
|
|
||||||
serverOnlyFlag(a, "storage.tsdb.out-of-order-cap-max", "Maximum capacity for out of order chunks (in samples. between 1 and 255.)").
|
|
||||||
Hidden().Default("32").IntVar(&cfg.tsdb.OutOfOrderCapMax)
|
|
||||||
|
|
||||||
serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
|
serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
|
||||||
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)
|
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)
|
||||||
|
|
||||||
|
@ -1536,8 +1530,6 @@ type tsdbOptions struct {
|
||||||
MinBlockDuration model.Duration
|
MinBlockDuration model.Duration
|
||||||
MaxBlockDuration model.Duration
|
MaxBlockDuration model.Duration
|
||||||
OutOfOrderTimeWindow int64
|
OutOfOrderTimeWindow int64
|
||||||
OutOfOrderCapMin int
|
|
||||||
OutOfOrderCapMax int
|
|
||||||
EnableExemplarStorage bool
|
EnableExemplarStorage bool
|
||||||
MaxExemplars int64
|
MaxExemplars int64
|
||||||
EnableMemorySnapshotOnShutdown bool
|
EnableMemorySnapshotOnShutdown bool
|
||||||
|
@ -1561,8 +1553,6 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||||
MaxExemplars: opts.MaxExemplars,
|
MaxExemplars: opts.MaxExemplars,
|
||||||
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
|
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
|
||||||
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
|
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
|
||||||
OutOfOrderCapMin: int64(opts.OutOfOrderCapMin),
|
|
||||||
OutOfOrderCapMax: int64(opts.OutOfOrderCapMax),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -671,6 +671,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
||||||
if opts.OutOfOrderCapMax <= 0 {
|
if opts.OutOfOrderCapMax <= 0 {
|
||||||
opts.OutOfOrderCapMax = DefaultOutOfOrderCapMax
|
opts.OutOfOrderCapMax = DefaultOutOfOrderCapMax
|
||||||
}
|
}
|
||||||
|
if opts.OutOfOrderCapMin > opts.OutOfOrderCapMax {
|
||||||
|
opts.OutOfOrderCapMax = opts.OutOfOrderCapMin
|
||||||
|
}
|
||||||
if opts.OutOfOrderTimeWindow < 0 {
|
if opts.OutOfOrderTimeWindow < 0 {
|
||||||
opts.OutOfOrderTimeWindow = 0
|
opts.OutOfOrderTimeWindow = 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -4261,8 +4261,18 @@ func TestOOOAppendAndQuery(t *testing.T) {
|
||||||
|
|
||||||
// At the edge of time window, also it would be "out of bound" without the ooo support.
|
// At the edge of time window, also it would be "out of bound" without the ooo support.
|
||||||
addSample(s1, 60, 65, false)
|
addSample(s1, 60, 65, false)
|
||||||
addSample(s2, 50, 55, false)
|
verifyOOOMinMaxTimes(60, 265)
|
||||||
verifyOOOMinMaxTimes(50, 265)
|
testQuery()
|
||||||
|
|
||||||
|
// This sample is not within the time window w.r.t. the head's maxt, but it is within the window
|
||||||
|
// w.r.t. the series' maxt. But we consider only head's maxt.
|
||||||
|
addSample(s2, 59, 59, true)
|
||||||
|
verifyOOOMinMaxTimes(60, 265)
|
||||||
|
testQuery()
|
||||||
|
|
||||||
|
// Now the sample is within time window w.r.t. the head's maxt.
|
||||||
|
addSample(s2, 60, 65, false)
|
||||||
|
verifyOOOMinMaxTimes(60, 265)
|
||||||
testQuery()
|
testQuery()
|
||||||
|
|
||||||
// Out of time window again.
|
// Out of time window again.
|
||||||
|
@ -4276,7 +4286,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
|
||||||
require.Equal(t, float64(4), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
|
require.Equal(t, float64(4), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
|
||||||
addSample(s1, 180, 249, false)
|
addSample(s1, 180, 249, false)
|
||||||
require.Equal(t, float64(6), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
|
require.Equal(t, float64(6), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
|
||||||
verifyOOOMinMaxTimes(50, 265)
|
verifyOOOMinMaxTimes(60, 265)
|
||||||
testQuery()
|
testQuery()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -326,50 +326,43 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
||||||
// The sample belongs to the out of order chunk if we return true and no error.
|
// The sample belongs to the out of order chunk if we return true and no error.
|
||||||
// An error signifies the sample cannot be handled.
|
// An error signifies the sample cannot be handled.
|
||||||
func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTimeWindow int64) (isOutOfOrder bool, delta int64, err error) {
|
func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTimeWindow int64) (isOutOfOrder bool, delta int64, err error) {
|
||||||
msMaxt := s.maxTime()
|
// Check if we can append in the in-order chunk.
|
||||||
if msMaxt == math.MinInt64 {
|
|
||||||
// The series has no sample and was freshly created.
|
|
||||||
if t >= minValidTime {
|
if t >= minValidTime {
|
||||||
// We can append it in the in-order chunk.
|
if s.head() == nil {
|
||||||
|
// The series has no sample and was freshly created.
|
||||||
return false, 0, nil
|
return false, 0, nil
|
||||||
}
|
}
|
||||||
|
msMaxt := s.maxTime()
|
||||||
// We cannot append it in the in-order head. So we check the oooTimeWindow
|
|
||||||
// w.r.t. the head's maxt.
|
|
||||||
// -1 because for the first sample in the Head, headMaxt will be equal to t.
|
|
||||||
msMaxt = headMaxt - 1
|
|
||||||
}
|
|
||||||
|
|
||||||
if t > msMaxt {
|
if t > msMaxt {
|
||||||
return false, 0, nil
|
return false, 0, nil
|
||||||
}
|
}
|
||||||
|
if t == msMaxt {
|
||||||
if t < msMaxt-oooTimeWindow {
|
|
||||||
if oooTimeWindow > 0 {
|
|
||||||
return true, msMaxt - t, storage.ErrTooOldSample
|
|
||||||
}
|
|
||||||
if t < minValidTime {
|
|
||||||
return false, msMaxt - t, storage.ErrOutOfBounds
|
|
||||||
}
|
|
||||||
return false, msMaxt - t, storage.ErrOutOfOrderSample
|
|
||||||
}
|
|
||||||
|
|
||||||
if t != msMaxt || s.head() == nil {
|
|
||||||
// Sample is ooo and within time window OR series has no active chunk to check for duplicate sample.
|
|
||||||
return true, msMaxt - t, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// We are allowing exact duplicates as we can encounter them in valid cases
|
// We are allowing exact duplicates as we can encounter them in valid cases
|
||||||
// like federation and erroring out at that time would be extremely noisy.
|
// like federation and erroring out at that time would be extremely noisy.
|
||||||
// this only checks against the latest in-order sample.
|
// This only checks against the latest in-order sample.
|
||||||
// the OOO headchunk has its own method to detect these duplicates
|
// The OOO headchunk has its own method to detect these duplicates
|
||||||
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
|
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
|
||||||
return false, 0, storage.ErrDuplicateSampleForTimestamp
|
return false, 0, storage.ErrDuplicateSampleForTimestamp
|
||||||
}
|
}
|
||||||
|
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
|
||||||
// sample is identical (ts + value) with most current (highest ts) sample in sampleBuf
|
|
||||||
return false, 0, nil
|
return false, 0, nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
|
||||||
|
if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow {
|
||||||
|
return true, headMaxt - t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The sample cannot go in both in-order and out-of-order chunk.
|
||||||
|
if oooTimeWindow > 0 {
|
||||||
|
return true, headMaxt - t, storage.ErrTooOldSample
|
||||||
|
}
|
||||||
|
if t < minValidTime {
|
||||||
|
return false, headMaxt - t, storage.ErrOutOfBounds
|
||||||
|
}
|
||||||
|
return false, headMaxt - t, storage.ErrOutOfOrderSample
|
||||||
|
}
|
||||||
|
|
||||||
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
|
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
|
||||||
// use getOrCreate or make any of the lset sanity checks that Append does.
|
// use getOrCreate or make any of the lset sanity checks that Append does.
|
||||||
|
|
Loading…
Reference in a new issue