This commit is contained in:
Vanshika 2024-09-19 23:03:56 +02:00 committed by GitHub
commit ee7a99c4d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 300 additions and 9 deletions

View file

@ -1587,6 +1587,11 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
type notReadyAppender struct{}
// SetHints implements storage.Appender.
func (n notReadyAppender) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (n notReadyAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}

View file

@ -0,0 +1,5 @@
groups:
- name: test_1
rules:
- record: test_2
expr: vector(2)

View file

@ -75,6 +75,7 @@ type Group struct {
// concurrencyController controls the rules evaluation concurrency.
concurrencyController RuleConcurrencyController
hints *storage.AppendHints
}
// GroupEvalIterationFunc is used to implement and extend rule group
@ -141,6 +142,7 @@ func NewGroup(o GroupOptions) *Group {
metrics: metrics,
evalIterationFunc: evalIterationFunc,
concurrencyController: concurrencyController,
hints: &storage.AppendHints{DiscardOutOfOrder: true},
}
}
@ -560,6 +562,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
if s.H != nil {
_, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H)
} else {
app.SetHints(g.hints)
_, err = app.Append(0, s.Metric, s.T, s.F)
}
@ -656,6 +659,7 @@ func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
return
}
app := g.opts.Appendable.Appender(ctx)
app.SetHints(g.hints)
queryOffset := g.QueryOffset()
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.

View file

@ -1193,6 +1193,53 @@ func countStaleNaN(t *testing.T, st storage.Storage) int {
return c
}
func TestRuleMovedBetweenGroups(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
storage := teststorage.New(t, 600000)
defer storage.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
QueryFunc: EngineQueryFunc(engine, storage),
Context: context.Background(),
Logger: log.NewNopLogger(),
})
var stopped bool
ruleManager.start()
defer func() {
if !stopped {
ruleManager.Stop()
}
}()
rule2 := "fixtures/rules2.yaml"
rule1 := "fixtures/rules1.yaml"
// Load initial configuration of rules2
require.NoError(t, ruleManager.Update(1*time.Second, []string{rule2}, labels.EmptyLabels(), "", nil))
// Wait for rule to be evaluated
time.Sleep(5 * time.Second)
// Reload configuration of rules1
require.NoError(t, ruleManager.Update(1*time.Second, []string{rule1}, labels.EmptyLabels(), "", nil))
// Wait for rule to be evaluated in new location and potential staleness marker
time.Sleep(5 * time.Second)
require.Equal(t, 0, countStaleNaN(t, storage)) // Not expecting any stale markers.
}
func TestGroupHasAlertingRules(t *testing.T) {
tests := []struct {
group *Group

View file

@ -43,6 +43,10 @@ func (a nopAppendable) Appender(_ context.Context) storage.Appender {
type nopAppender struct{}
func (a nopAppender) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
return 0, nil
}
@ -107,6 +111,12 @@ type collectResultAppender struct {
pendingExemplars []exemplar.Exemplar
resultMetadata []metadata.Metadata
pendingMetadata []metadata.Metadata
hints *storage.AppendHints
}
// SetHints implements storage.Appender.
func (a *collectResultAppender) SetHints(hints *storage.AppendHints) {
a.hints = hints
}
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {

View file

@ -1829,7 +1829,9 @@ loop:
if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
app.SetHints(&storage.AppendHints{DiscardOutOfOrder: true})
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
app.SetHints(nil)
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if a target
@ -1935,7 +1937,7 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim
func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) {
ts := timestamp.FromTime(start)
app.SetHints(&storage.AppendHints{DiscardOutOfOrder: true})
stale := math.Float64frombits(value.StaleNaN)
b := labels.NewBuilder(labels.EmptyLabels())

View file

@ -84,6 +84,97 @@ func TestNewScrapePool(t *testing.T) {
require.NotNil(t, sp.newLoop, "newLoop function not initialized.")
}
func TestStorageHandlesOutOfOrderTimestamps(t *testing.T) {
// Test with default OutOfOrderTimeWindow (0)
t.Run("Out-Of-Order Sample Disabled", func(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
runScrapeLoopTest(t, s, false)
})
// Test with specific OutOfOrderTimeWindow (600000)
t.Run("Out-Of-Order Sample Enabled", func(t *testing.T) {
s := teststorage.New(t, 600000)
defer s.Close()
runScrapeLoopTest(t, s, true)
})
}
func runScrapeLoopTest(t *testing.T, s *teststorage.TestStorage, expectOutOfOrder bool) {
// Create an appender for adding samples to the storage.
app := s.Appender(context.Background())
capp := &collectResultAppender{next: app}
sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0)
// Current time for generating timestamps.
now := time.Now()
// Calculate timestamps for the samples based on the current time.
now = now.Truncate(time.Minute) // round down the now timestamp to the nearest minute
timestampInorder1 := now
timestampOutOfOrder := now.Add(-5 * time.Minute)
timestampInorder2 := now.Add(5 * time.Minute)
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", timestampInorder1)
require.NoError(t, err)
_, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 2`), "", timestampOutOfOrder)
require.NoError(t, err)
_, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 3`), "", timestampInorder2)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
// Query the samples back from the storage.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
require.NoError(t, err)
defer q.Close()
// Use a matcher to filter the metric name.
series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "metric_a"))
var results []floatSample
for series.Next() {
it := series.At().Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
results = append(results, floatSample{
metric: series.At().Labels(),
t: t,
f: v,
})
}
require.NoError(t, it.Err())
}
require.NoError(t, series.Err())
// Define the expected results
want := []floatSample{
{
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: timestamp.FromTime(timestampInorder1),
f: 1,
},
{
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: timestamp.FromTime(timestampInorder2),
f: 3,
},
}
if expectOutOfOrder {
require.NotEqual(t, want, results, "Expected results to include out-of-order sample:\n%s", results)
} else {
require.Equal(t, want, results, "Appended samples not as expected:\n%s", results)
}
}
func TestDroppedTargetsList(t *testing.T) {
var (
app = &nopAppendable{}
@ -1149,6 +1240,87 @@ func BenchmarkScrapeLoopAppendOM(b *testing.B) {
}
}
func TestSetHintsHandlingStaleness(t *testing.T) {
s := teststorage.New(t, 600000)
defer s.Close()
signal := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Function to run the scrape loop
runScrapeLoop := func(ctx context.Context, t *testing.T, cue int, action func(*scrapeLoop)) {
var (
scraper = &testScraper{}
app = func(ctx context.Context) storage.Appender {
return s.Appender(ctx)
}
)
sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
numScrapes := 0
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
if numScrapes == cue {
action(sl)
}
w.Write([]byte(fmt.Sprintf("metric_a{a=\"1\",b=\"1\"} %d\n", 42+numScrapes)))
return nil
}
sl.run(nil)
}
go func() {
runScrapeLoop(ctx, t, 2, func(sl *scrapeLoop) {
go sl.stop()
// Wait a bit then start a new target.
time.Sleep(100 * time.Millisecond)
go func() {
runScrapeLoop(ctx, t, 4, func(_ *scrapeLoop) {
cancel()
})
signal <- struct{}{}
}()
})
}()
select {
case <-signal:
case <-time.After(10 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
ctx1, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := s.Querier(0, time.Now().UnixNano())
require.NoError(t, err)
defer q.Close()
series := q.Select(ctx1, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "metric_a"))
var results []floatSample
for series.Next() {
it := series.At().Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
results = append(results, floatSample{
metric: series.At().Labels(),
t: t,
f: v,
})
}
require.NoError(t, it.Err())
}
require.NoError(t, series.Err())
var c int
for _, s := range results {
if value.IsStaleNaN(s.f) {
c++
}
}
require.Equal(t, 0, c, "invalid count of staleness markers after stopping the engine")
}
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
appender := &collectResultAppender{}
var (
@ -3503,7 +3675,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *
case <-time.After(5 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
fmt.Println(appender.resultFloats)
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not.
require.Len(t, appender.resultFloats, 27, "Appended samples not as expected:\n%s", appender)

View file

@ -148,6 +148,10 @@ type fanoutAppender struct {
secondaries []Appender
}
func (f *fanoutAppender) SetHints(hints *AppendHints) {
panic("unimplemented")
}
func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) {
ref, err := f.primary.Append(ref, l, t, v)
if err != nil {

View file

@ -238,6 +238,10 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
return f(mint, maxt)
}
type AppendHints struct {
DiscardOutOfOrder bool
}
// Appender provides batched appends against a storage.
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
//
@ -266,6 +270,9 @@ type Appender interface {
// Appender has to be discarded after rollback.
Rollback() error
// New SetHints method to set the append hints.
SetHints(hints *AppendHints)
ExemplarAppender
HistogramAppender
MetadataUpdater

View file

@ -284,6 +284,10 @@ type timestampTracker struct {
highestRecvTimestamp *maxTimestamp
}
func (t *timestampTracker) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
// Append implements storage.Appender.
func (t *timestampTracker) Append(_ storage.SeriesRef, _ labels.Labels, ts int64, _ float64) (storage.SeriesRef, error) {
t.samples++

View file

@ -832,6 +832,10 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
return m
}
func (m *mockAppendable) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if m.appendSampleErr != nil {
return 0, m.appendSampleErr

View file

@ -784,6 +784,10 @@ type appender struct {
floatHistogramSeries []*memSeries
}
func (a *appender) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// series references and chunk references are identical for agent mode.
headRef := chunks.HeadSeriesRef(ref)

View file

@ -36,12 +36,17 @@ import (
// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {
app storage.Appender
head *Head
app storage.Appender
head *Head
hints *storage.AppendHints
}
var _ storage.GetRef = &initAppender{}
func (a *initAppender) SetHints(hints *storage.AppendHints) {
a.hints = hints
}
func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.Append(ref, lset, t, v)
@ -318,6 +323,11 @@ type headAppender struct {
appendID, cleanupAppendIDsBelow uint64
closed bool
hints *storage.AppendHints
}
func (a *headAppender) SetHints(hints *storage.AppendHints) {
a.hints = hints
}
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
@ -347,13 +357,18 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
}
s.Lock()
defer s.Unlock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil {
if isOOO && a.hints != nil && a.hints.DiscardOutOfOrder {
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
return 0, storage.ErrOutOfOrderSample
}
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}

View file

@ -30,15 +30,15 @@ import (
// New returns a new TestStorage for testing purposes
// that removes all associated files on closing.
func New(t testutil.T) *TestStorage {
stor, err := NewWithError()
func New(t testutil.T, outOfOrderTimeWindow ...int64) *TestStorage {
stor, err := NewWithError(outOfOrderTimeWindow...)
require.NoError(t, err)
return stor
}
// NewWithError returns a new TestStorage for user facing tests, which reports
// errors directly.
func NewWithError() (*TestStorage, error) {
func NewWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) {
dir, err := os.MkdirTemp("", "test_storage")
if err != nil {
return nil, fmt.Errorf("opening test directory: %w", err)
@ -51,6 +51,14 @@ func NewWithError() (*TestStorage, error) {
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.RetentionDuration = 0
opts.EnableNativeHistograms = true
// Set OutOfOrderTimeWindow if provided, otherwise use default (0)
if len(outOfOrderTimeWindow) > 0 {
opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0]
} else {
opts.OutOfOrderTimeWindow = 0 // Default value is zero
}
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
if err != nil {
return nil, fmt.Errorf("opening test storage: %w", err)