diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 24e666cb6..32e3feafa 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1587,6 +1587,11 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender { type notReadyAppender struct{} +// SetHints implements storage.Appender. +func (n notReadyAppender) SetHints(hints *storage.AppendHints) { + panic("unimplemented") +} + func (n notReadyAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { return 0, tsdb.ErrNotReady } diff --git a/rules/fixtures/rules1.yaml b/rules/fixtures/rules1.yaml new file mode 100644 index 000000000..76fbf71f3 --- /dev/null +++ b/rules/fixtures/rules1.yaml @@ -0,0 +1,5 @@ +groups: + - name: test_1 + rules: + - record: test_2 + expr: vector(2) diff --git a/rules/group.go b/rules/group.go index 6e98bf52f..8529d65dd 100644 --- a/rules/group.go +++ b/rules/group.go @@ -75,6 +75,7 @@ type Group struct { // concurrencyController controls the rules evaluation concurrency. concurrencyController RuleConcurrencyController + hints *storage.AppendHints } // GroupEvalIterationFunc is used to implement and extend rule group @@ -141,6 +142,7 @@ func NewGroup(o GroupOptions) *Group { metrics: metrics, evalIterationFunc: evalIterationFunc, concurrencyController: concurrencyController, + hints: &storage.AppendHints{DiscardOutOfOrder: true}, } } @@ -560,6 +562,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if s.H != nil { _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) } else { + app.SetHints(g.hints) _, err = app.Append(0, s.Metric, s.T, s.F) } @@ -656,6 +659,7 @@ func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { return } app := g.opts.Appendable.Appender(ctx) + app.SetHints(g.hints) queryOffset := g.QueryOffset() for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. diff --git a/rules/manager_test.go b/rules/manager_test.go index b9f6db327..d658d3f8f 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1193,6 +1193,53 @@ func countStaleNaN(t *testing.T, st storage.Storage) int { return c } +func TestRuleMovedBetweenGroups(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + storage := teststorage.New(t, 600000) + defer storage.Close() + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(opts) + ruleManager := NewManager(&ManagerOptions{ + Appendable: storage, + Queryable: storage, + QueryFunc: EngineQueryFunc(engine, storage), + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + var stopped bool + ruleManager.start() + defer func() { + if !stopped { + ruleManager.Stop() + } + }() + + rule2 := "fixtures/rules2.yaml" + rule1 := "fixtures/rules1.yaml" + + // Load initial configuration of rules2 + require.NoError(t, ruleManager.Update(1*time.Second, []string{rule2}, labels.EmptyLabels(), "", nil)) + + // Wait for rule to be evaluated + time.Sleep(5 * time.Second) + + // Reload configuration of rules1 + require.NoError(t, ruleManager.Update(1*time.Second, []string{rule1}, labels.EmptyLabels(), "", nil)) + + // Wait for rule to be evaluated in new location and potential staleness marker + time.Sleep(5 * time.Second) + + require.Equal(t, 0, countStaleNaN(t, storage)) // Not expecting any stale markers. +} + func TestGroupHasAlertingRules(t *testing.T) { tests := []struct { group *Group diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 116fa5c94..028960ccd 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -43,6 +43,10 @@ func (a nopAppendable) Appender(_ context.Context) storage.Appender { type nopAppender struct{} +func (a nopAppender) SetHints(hints *storage.AppendHints) { + panic("unimplemented") +} + func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) { return 0, nil } @@ -107,6 +111,12 @@ type collectResultAppender struct { pendingExemplars []exemplar.Exemplar resultMetadata []metadata.Metadata pendingMetadata []metadata.Metadata + hints *storage.AppendHints +} + +// SetHints implements storage.Appender. +func (a *collectResultAppender) SetHints(hints *storage.AppendHints) { + a.hints = hints } func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { diff --git a/scrape/scrape.go b/scrape/scrape.go index dca4682b1..c1bfc9b3d 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1829,7 +1829,9 @@ loop: if err == nil { sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. + app.SetHints(&storage.AppendHints{DiscardOutOfOrder: true}) _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) + app.SetHints(nil) switch { case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): // Do not count these in logging, as this is expected if a target @@ -1935,7 +1937,7 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) { ts := timestamp.FromTime(start) - + app.SetHints(&storage.AppendHints{DiscardOutOfOrder: true}) stale := math.Float64frombits(value.StaleNaN) b := labels.NewBuilder(labels.EmptyLabels()) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 9887924c3..d25bb2b67 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -84,6 +84,97 @@ func TestNewScrapePool(t *testing.T) { require.NotNil(t, sp.newLoop, "newLoop function not initialized.") } +func TestStorageHandlesOutOfOrderTimestamps(t *testing.T) { + // Test with default OutOfOrderTimeWindow (0) + t.Run("Out-Of-Order Sample Disabled", func(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + runScrapeLoopTest(t, s, false) + }) + + // Test with specific OutOfOrderTimeWindow (600000) + t.Run("Out-Of-Order Sample Enabled", func(t *testing.T) { + s := teststorage.New(t, 600000) + defer s.Close() + + runScrapeLoopTest(t, s, true) + }) +} + +func runScrapeLoopTest(t *testing.T, s *teststorage.TestStorage, expectOutOfOrder bool) { + // Create an appender for adding samples to the storage. + app := s.Appender(context.Background()) + capp := &collectResultAppender{next: app} + sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0) + + // Current time for generating timestamps. + now := time.Now() + + // Calculate timestamps for the samples based on the current time. + now = now.Truncate(time.Minute) // round down the now timestamp to the nearest minute + timestampInorder1 := now + timestampOutOfOrder := now.Add(-5 * time.Minute) + timestampInorder2 := now.Add(5 * time.Minute) + + slApp := sl.appender(context.Background()) + _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", timestampInorder1) + require.NoError(t, err) + + _, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 2`), "", timestampOutOfOrder) + require.NoError(t, err) + + _, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 3`), "", timestampInorder2) + require.NoError(t, err) + + require.NoError(t, slApp.Commit()) + + // Query the samples back from the storage. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano()) + require.NoError(t, err) + defer q.Close() + + // Use a matcher to filter the metric name. + series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "metric_a")) + + var results []floatSample + for series.Next() { + it := series.At().Iterator(nil) + for it.Next() == chunkenc.ValFloat { + t, v := it.At() + results = append(results, floatSample{ + metric: series.At().Labels(), + t: t, + f: v, + }) + } + require.NoError(t, it.Err()) + } + require.NoError(t, series.Err()) + + // Define the expected results + want := []floatSample{ + { + metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), + t: timestamp.FromTime(timestampInorder1), + f: 1, + }, + { + metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), + t: timestamp.FromTime(timestampInorder2), + f: 3, + }, + } + + if expectOutOfOrder { + require.NotEqual(t, want, results, "Expected results to include out-of-order sample:\n%s", results) + } else { + require.Equal(t, want, results, "Appended samples not as expected:\n%s", results) + } +} + func TestDroppedTargetsList(t *testing.T) { var ( app = &nopAppendable{} @@ -1149,6 +1240,87 @@ func BenchmarkScrapeLoopAppendOM(b *testing.B) { } } +func TestSetHintsHandlingStaleness(t *testing.T) { + s := teststorage.New(t, 600000) + defer s.Close() + + signal := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Function to run the scrape loop + runScrapeLoop := func(ctx context.Context, t *testing.T, cue int, action func(*scrapeLoop)) { + var ( + scraper = &testScraper{} + app = func(ctx context.Context) storage.Appender { + return s.Appender(ctx) + } + ) + sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond) + numScrapes := 0 + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes++ + if numScrapes == cue { + action(sl) + } + w.Write([]byte(fmt.Sprintf("metric_a{a=\"1\",b=\"1\"} %d\n", 42+numScrapes))) + return nil + } + sl.run(nil) + } + go func() { + runScrapeLoop(ctx, t, 2, func(sl *scrapeLoop) { + go sl.stop() + // Wait a bit then start a new target. + time.Sleep(100 * time.Millisecond) + go func() { + runScrapeLoop(ctx, t, 4, func(_ *scrapeLoop) { + cancel() + }) + signal <- struct{}{} + }() + }) + }() + + select { + case <-signal: + case <-time.After(10 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } + + ctx1, cancel := context.WithCancel(context.Background()) + defer cancel() + + q, err := s.Querier(0, time.Now().UnixNano()) + + require.NoError(t, err) + defer q.Close() + + series := q.Select(ctx1, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "metric_a")) + + var results []floatSample + for series.Next() { + it := series.At().Iterator(nil) + for it.Next() == chunkenc.ValFloat { + t, v := it.At() + results = append(results, floatSample{ + metric: series.At().Labels(), + t: t, + f: v, + }) + } + require.NoError(t, it.Err()) + } + require.NoError(t, series.Err()) + var c int + for _, s := range results { + if value.IsStaleNaN(s.f) { + c++ + } + } + require.Equal(t, 0, c, "invalid count of staleness markers after stopping the engine") +} + func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { appender := &collectResultAppender{} var ( @@ -3503,7 +3675,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t * case <-time.After(5 * time.Second): t.Fatalf("Scrape wasn't stopped.") } - + fmt.Println(appender.resultFloats) // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // each scrape successful or not. require.Len(t, appender.resultFloats, 27, "Appended samples not as expected:\n%s", appender) diff --git a/storage/fanout.go b/storage/fanout.go index e52342bc7..ad1c9658c 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -148,6 +148,10 @@ type fanoutAppender struct { secondaries []Appender } +func (f *fanoutAppender) SetHints(hints *AppendHints) { + panic("unimplemented") +} + func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) { ref, err := f.primary.Append(ref, l, t, v) if err != nil { diff --git a/storage/interface.go b/storage/interface.go index 9654c8833..5df14e715 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -238,6 +238,10 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) { return f(mint, maxt) } +type AppendHints struct { + DiscardOutOfOrder bool +} + // Appender provides batched appends against a storage. // It must be completed with a call to Commit or Rollback and must not be reused afterwards. // @@ -266,6 +270,9 @@ type Appender interface { // Appender has to be discarded after rollback. Rollback() error + // New SetHints method to set the append hints. + SetHints(hints *AppendHints) + ExemplarAppender HistogramAppender MetadataUpdater diff --git a/storage/remote/write.go b/storage/remote/write.go index eba429084..ac43efb64 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -284,6 +284,10 @@ type timestampTracker struct { highestRecvTimestamp *maxTimestamp } +func (t *timestampTracker) SetHints(hints *storage.AppendHints) { + panic("unimplemented") +} + // Append implements storage.Appender. func (t *timestampTracker) Append(_ storage.SeriesRef, _ labels.Labels, ts int64, _ float64) (storage.SeriesRef, error) { t.samples++ diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5c89a1ab9..266c3a652 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -832,6 +832,10 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender { return m } +func (m *mockAppendable) SetHints(hints *storage.AppendHints) { + panic("unimplemented") +} + func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if m.appendSampleErr != nil { return 0, m.appendSampleErr diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 596d5c8a3..42cbb2119 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -784,6 +784,10 @@ type appender struct { floatHistogramSeries []*memSeries } +func (a *appender) SetHints(hints *storage.AppendHints) { + panic("unimplemented") +} + func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { // series references and chunk references are identical for agent mode. headRef := chunks.HeadSeriesRef(ref) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 3dd9a367b..a96d33fb5 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -36,12 +36,17 @@ import ( // initAppender is a helper to initialize the time bounds of the head // upon the first sample it receives. type initAppender struct { - app storage.Appender - head *Head + app storage.Appender + head *Head + hints *storage.AppendHints } var _ storage.GetRef = &initAppender{} +func (a *initAppender) SetHints(hints *storage.AppendHints) { + a.hints = hints +} + func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if a.app != nil { return a.app.Append(ref, lset, t, v) @@ -318,6 +323,11 @@ type headAppender struct { appendID, cleanupAppendIDsBelow uint64 closed bool + hints *storage.AppendHints +} + +func (a *headAppender) SetHints(hints *storage.AppendHints) { + a.hints = hints } func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { @@ -347,13 +357,18 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } s.Lock() + + defer s.Unlock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. - _, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow) + isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err == nil { + if isOOO && a.hints != nil && a.hints.DiscardOutOfOrder { + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc() + return 0, storage.ErrOutOfOrderSample + } s.pendingCommit = true } - s.Unlock() if delta > 0 { a.head.metrics.oooHistogram.Observe(float64(delta) / 1000) } diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index 7d1f9dda2..e15d591e0 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -30,15 +30,15 @@ import ( // New returns a new TestStorage for testing purposes // that removes all associated files on closing. -func New(t testutil.T) *TestStorage { - stor, err := NewWithError() +func New(t testutil.T, outOfOrderTimeWindow ...int64) *TestStorage { + stor, err := NewWithError(outOfOrderTimeWindow...) require.NoError(t, err) return stor } // NewWithError returns a new TestStorage for user facing tests, which reports // errors directly. -func NewWithError() (*TestStorage, error) { +func NewWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) { dir, err := os.MkdirTemp("", "test_storage") if err != nil { return nil, fmt.Errorf("opening test directory: %w", err) @@ -51,6 +51,14 @@ func NewWithError() (*TestStorage, error) { opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.RetentionDuration = 0 opts.EnableNativeHistograms = true + + // Set OutOfOrderTimeWindow if provided, otherwise use default (0) + if len(outOfOrderTimeWindow) > 0 { + opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0] + } else { + opts.OutOfOrderTimeWindow = 0 // Default value is zero + } + db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats()) if err != nil { return nil, fmt.Errorf("opening test storage: %w", err)