agent db: make rejecting ooo samples configurable (#14094)

feat: Make OOO ingestion time window configurable for Prometheus Agent.

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>
This commit is contained in:
Sebastian Rabenhorst 2024-06-12 16:07:42 +02:00 committed by GitHub
parent 64a9abb8be
commit 05380aa0ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 104 additions and 17 deletions

View file

@ -1197,7 +1197,7 @@ func main() {
}
if agentMode {
// WAL storage.
opts := cfg.agent.ToAgentOptions()
opts := cfg.agent.ToAgentOptions(cfg.tsdb.OutOfOrderTimeWindow)
cancel := make(chan struct{})
g.Add(
func() error {
@ -1233,6 +1233,7 @@ func main() {
"TruncateFrequency", cfg.agent.TruncateFrequency,
"MinWALTime", cfg.agent.MinWALTime,
"MaxWALTime", cfg.agent.MaxWALTime,
"OutOfOrderTimeWindow", cfg.agent.OutOfOrderTimeWindow,
)
localStorage.Set(db, 0)
@ -1736,17 +1737,22 @@ type agentOptions struct {
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
NoLockfile bool
OutOfOrderTimeWindow int64
}
func (opts agentOptions) ToAgentOptions() agent.Options {
func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Options {
if outOfOrderTimeWindow < 0 {
outOfOrderTimeWindow = 0
}
return agent.Options{
WALSegmentSize: int(opts.WALSegmentSize),
WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
StripeSize: opts.StripeSize,
TruncateFrequency: time.Duration(opts.TruncateFrequency),
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
WALSegmentSize: int(opts.WALSegmentSize),
WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
StripeSize: opts.StripeSize,
TruncateFrequency: time.Duration(opts.TruncateFrequency),
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
OutOfOrderTimeWindow: outOfOrderTimeWindow,
}
}

View file

@ -3813,6 +3813,10 @@ NOTE: Out-of-order ingestion is an experimental feature, but you do not need any
# into the TSDB, i.e. it is an in-order sample or an out-of-order/out-of-bounds sample
# that is within the out-of-order window, or (b) too-old, i.e. not in-order
# and before the out-of-order window.
#
# When out_of_order_time_window is greater than 0, it also affects experimental agent. It allows
# the agent's WAL to accept out-of-order samples that fall within the specified time window relative
# to the timestamp of the last appended sample for the same series.
[ out_of_order_time_window: <duration> | default = 0s ]
```

View file

@ -81,19 +81,23 @@ type Options struct {
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
OutOfOrderTimeWindow int64
}
// DefaultOptions used for the WAL storage. They are reasonable for setups using
// millisecond-precision timestamps.
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
WALCompression: wlog.CompressionNone,
StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
NoLockfile: false,
WALSegmentSize: wlog.DefaultSegmentSize,
WALCompression: wlog.CompressionNone,
StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
NoLockfile: false,
OutOfOrderTimeWindow: 0,
}
}
@ -812,6 +816,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
series.Lock()
defer series.Unlock()
if t <= a.minValidTime(series.lastTs) {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
// NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: series.ref,
@ -935,6 +944,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
series.Lock()
defer series.Unlock()
if t <= a.minValidTime(series.lastTs) {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
switch {
case h != nil:
// NOTE: always modify pendingHistograms and histogramSeries together
@ -1103,3 +1117,13 @@ func (a *appender) logSeries() error {
return nil
}
// mintTs returns the minimum timestamp that a sample can have
// and is needed for preventing underflow.
func (a *appender) minValidTime(lastTs int64) int64 {
if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow {
return math.MinInt64
}
return lastTs - a.opts.OutOfOrderTimeWindow
}

View file

@ -16,6 +16,7 @@ package agent
import (
"context"
"fmt"
"math"
"path/filepath"
"strconv"
"testing"
@ -761,7 +762,9 @@ func TestDBAllowOOOSamples(t *testing.T) {
)
reg := prometheus.NewRegistry()
s := createTestAgentDB(t, reg, DefaultOptions())
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = math.MaxInt64
s := createTestAgentDB(t, reg, opts)
app := s.Appender(context.TODO())
// Let's add some samples in the [offset, offset+numDatapoints) range.
@ -879,6 +882,56 @@ func TestDBAllowOOOSamples(t *testing.T) {
require.NoError(t, db.Close())
}
func TestDBOutOfOrderTimeWindow(t *testing.T) {
tc := []struct {
outOfOrderTimeWindow, firstTs, secondTs int64
expectedError error
}{
{0, 100, 101, nil},
{0, 100, 100, storage.ErrOutOfOrderSample},
{0, 100, 99, storage.ErrOutOfOrderSample},
{100, 100, 1, nil},
{100, 100, 0, storage.ErrOutOfOrderSample},
}
for _, c := range tc {
t.Run(fmt.Sprintf("outOfOrderTimeWindow=%d, firstTs=%d, secondTs=%d, expectedError=%s", c.outOfOrderTimeWindow, c.firstTs, c.secondTs, c.expectedError), func(t *testing.T) {
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = c.outOfOrderTimeWindow
s := createTestAgentDB(t, reg, opts)
app := s.Appender(context.TODO())
lbls := labelsForTest(t.Name()+"_histogram", 1)
lset := labels.New(lbls[0]...)
_, err := app.AppendHistogram(0, lset, c.firstTs, tsdbutil.GenerateTestHistograms(1)[0], nil)
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.AppendHistogram(0, lset, c.secondTs, tsdbutil.GenerateTestHistograms(1)[0], nil)
require.ErrorIs(t, err, c.expectedError)
lbls = labelsForTest(t.Name(), 1)
lset = labels.New(lbls[0]...)
_, err = app.Append(0, lset, c.firstTs, 0)
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.Append(0, lset, c.secondTs, 0)
require.ErrorIs(t, err, c.expectedError)
expectedAppendedSamples := float64(2)
if c.expectedError != nil {
expectedAppendedSamples = 1
}
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, expectedAppendedSamples, m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, expectedAppendedSamples, m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
})
}
}
func BenchmarkCreateSeries(b *testing.B) {
s := createTestAgentDB(b, nil, DefaultOptions())
defer s.Close()