changes in group.go

Signed-off-by: Vanshikav123 <vanshikav928@gmail.com>
This commit is contained in:
Vanshikav123 2024-08-22 17:43:05 +05:30
parent be946a3447
commit ba3e442b1c
4 changed files with 57 additions and 1 deletions

View file

@ -0,0 +1,5 @@
groups:
- name: test_1
rules:
- record: test_2
expr: vector(2)

View file

@ -75,6 +75,7 @@ type Group struct {
// concurrencyController controls the rules evaluation concurrency.
concurrencyController RuleConcurrencyController
hints *storage.AppendHints
}
// GroupEvalIterationFunc is used to implement and extend rule group
@ -141,6 +142,7 @@ func NewGroup(o GroupOptions) *Group {
metrics: metrics,
evalIterationFunc: evalIterationFunc,
concurrencyController: concurrencyController,
hints: &storage.AppendHints{DiscardOutOfOrder: true},
}
}
@ -560,6 +562,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
if s.H != nil {
_, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H)
} else {
app.SetHints(g.hints)
_, err = app.Append(0, s.Metric, s.T, s.F)
}
@ -656,6 +659,7 @@ func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
return
}
app := g.opts.Appendable.Appender(ctx)
app.SetHints(g.hints)
queryOffset := g.QueryOffset()
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.

View file

@ -1189,6 +1189,52 @@ func countStaleNaN(t *testing.T, st storage.Storage) int {
}
return c
}
func TestRuleMovedBetweenGroups(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
storage := teststorage.New(t)
defer storage.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
QueryFunc: EngineQueryFunc(engine, storage),
Context: context.Background(),
Logger: log.NewNopLogger(),
})
var stopped bool
ruleManager.start()
defer func() {
if !stopped {
ruleManager.Stop()
}
}()
rule2 := "fixtures/rules2.yaml"
rule1 := "fixtures/rules1.yaml"
// Load initial configuration of rules2
require.NoError(t, ruleManager.Update(1*time.Second, []string{rule2}, labels.EmptyLabels(), "", nil))
// Wait for rule to be evaluated
time.Sleep(5 * time.Second)
// Reload configuration of rules1
require.NoError(t, ruleManager.Update(1*time.Second, []string{rule1}, labels.EmptyLabels(), "", nil))
// Wait for rule to be evaluated in new location and potential staleness marker
time.Sleep(5 * time.Second)
require.Equal(t, 0, countStaleNaN(t, storage)) // Not expecting any stale markers.
}
func TestGroupHasAlertingRules(t *testing.T) {
tests := []struct {

View file

@ -50,7 +50,8 @@ func NewWithError() (*TestStorage, error) {
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.RetentionDuration = 0
opts.EnableNativeHistograms = true
//opts.EnableNativeHistograms = true
opts.OutOfOrderTimeWindow = 600000
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
if err != nil {
return nil, fmt.Errorf("opening test storage: %w", err)