mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 21:24:05 -08:00
Merge pull request #7675 from JessicaGreben/jg/11-retroactive-rule-eval
Add rule importer to backfill
This commit is contained in:
commit
9549a15c6f
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -11,6 +11,7 @@ benchmark.txt
|
||||||
/cmd/prometheus/data
|
/cmd/prometheus/data
|
||||||
/cmd/prometheus/debug
|
/cmd/prometheus/debug
|
||||||
/benchout
|
/benchout
|
||||||
|
/cmd/promtool/data
|
||||||
|
|
||||||
!/.travis.yml
|
!/.travis.yml
|
||||||
!/.promu.yml
|
!/.promu.yml
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/google/pprof/profile"
|
"github.com/google/pprof/profile"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/api"
|
"github.com/prometheus/client_golang/api"
|
||||||
|
@ -147,6 +148,18 @@ func main() {
|
||||||
// TODO(aSquare14): add flag to set default block duration
|
// TODO(aSquare14): add flag to set default block duration
|
||||||
importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String()
|
importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String()
|
||||||
importDBPath := openMetricsImportCmd.Arg("output directory", "Output directory for generated blocks.").Default(defaultDBPath).String()
|
importDBPath := openMetricsImportCmd.Arg("output directory", "Output directory for generated blocks.").Default(defaultDBPath).String()
|
||||||
|
importRulesCmd := importCmd.Command("rules", "Create new blocks of data from Prometheus data for new rules from recording rule files.")
|
||||||
|
importRulesURL := importRulesCmd.Flag("url", "The URL for the Prometheus API with the data where the rule will be backfilled from.").Default("http://localhost:9090").URL()
|
||||||
|
importRulesStart := importRulesCmd.Flag("start", "The time to start backfilling the new rule from. Must be a RFC3339 formated date or Unix timestamp. Required.").
|
||||||
|
Required().String()
|
||||||
|
importRulesEnd := importRulesCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hours ago. Must be a RFC3339 formated date or Unix timestamp.").String()
|
||||||
|
importRulesOutputDir := importRulesCmd.Flag("output-dir", "Output directory for generated blocks.").Default("data/").String()
|
||||||
|
importRulesEvalInterval := importRulesCmd.Flag("eval-interval", "How frequently to evaluate rules when backfilling if a value is not set in the recording rule files.").
|
||||||
|
Default("60s").Duration()
|
||||||
|
importRulesFiles := importRulesCmd.Arg(
|
||||||
|
"rule-files",
|
||||||
|
"A list of one or more files containing recording rules to be backfilled. All recording rules listed in the files will be backfilled. Alerting rules are not evaluated.",
|
||||||
|
).Required().ExistingFiles()
|
||||||
|
|
||||||
parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
|
parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
|
||||||
|
|
||||||
|
@ -209,6 +222,9 @@ func main() {
|
||||||
//TODO(aSquare14): Work on adding support for custom block size.
|
//TODO(aSquare14): Work on adding support for custom block size.
|
||||||
case openMetricsImportCmd.FullCommand():
|
case openMetricsImportCmd.FullCommand():
|
||||||
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable))
|
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable))
|
||||||
|
|
||||||
|
case importRulesCmd.FullCommand():
|
||||||
|
os.Exit(checkErr(importRules(*importRulesURL, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *importRulesFiles...)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -819,3 +835,63 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) {
|
||||||
//nolint:errcheck
|
//nolint:errcheck
|
||||||
json.NewEncoder(os.Stdout).Encode(v)
|
json.NewEncoder(os.Stdout).Encode(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// importRules backfills recording rules from the files provided. The output are blocks of data
|
||||||
|
// at the outputDir location.
|
||||||
|
func importRules(url *url.URL, start, end, outputDir string, evalInterval time.Duration, files ...string) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
var stime, etime time.Time
|
||||||
|
var err error
|
||||||
|
if end == "" {
|
||||||
|
etime = time.Now().UTC().Add(-3 * time.Hour)
|
||||||
|
} else {
|
||||||
|
etime, err = parseTime(end)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, "error parsing end time:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stime, err = parseTime(start)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, "error parsing start time:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !stime.Before(etime) {
|
||||||
|
fmt.Fprintln(os.Stderr, "start time is not before end time")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := ruleImporterConfig{
|
||||||
|
outputDir: outputDir,
|
||||||
|
start: stime,
|
||||||
|
end: etime,
|
||||||
|
evalInterval: evalInterval,
|
||||||
|
}
|
||||||
|
client, err := api.NewClient(api.Config{
|
||||||
|
Address: url.String(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, "new api client error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, v1.NewAPI(client))
|
||||||
|
errs := ruleImporter.loadGroups(ctx, files)
|
||||||
|
for _, err := range errs {
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, "rule importer parse error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
errs = ruleImporter.importAll(ctx)
|
||||||
|
for _, err := range errs {
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, "rule importer error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
240
cmd/promtool/rules.go
Normal file
240
cmd/promtool/rules.go
Normal file
|
@ -0,0 +1,240 @@
|
||||||
|
// Copyright 2020 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
|
"github.com/prometheus/prometheus/rules"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxSamplesInMemory = 5000
|
||||||
|
|
||||||
|
type queryRangeAPI interface {
|
||||||
|
QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ruleImporter struct {
|
||||||
|
logger log.Logger
|
||||||
|
config ruleImporterConfig
|
||||||
|
|
||||||
|
apiClient queryRangeAPI
|
||||||
|
|
||||||
|
groups map[string]*rules.Group
|
||||||
|
ruleManager *rules.Manager
|
||||||
|
}
|
||||||
|
|
||||||
|
type ruleImporterConfig struct {
|
||||||
|
outputDir string
|
||||||
|
start time.Time
|
||||||
|
end time.Time
|
||||||
|
evalInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series
|
||||||
|
// written to disk in blocks.
|
||||||
|
func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter {
|
||||||
|
level.Info(logger).Log("backfiller", "new rule importer from start", config.start.Format(time.RFC822), " to end", config.end.Format(time.RFC822))
|
||||||
|
return &ruleImporter{
|
||||||
|
logger: logger,
|
||||||
|
config: config,
|
||||||
|
apiClient: apiClient,
|
||||||
|
ruleManager: rules.NewManager(&rules.ManagerOptions{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadGroups parses groups from a list of recording rule files.
|
||||||
|
func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) {
|
||||||
|
groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, filenames...)
|
||||||
|
if errs != nil {
|
||||||
|
return errs
|
||||||
|
}
|
||||||
|
importer.groups = groups
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks.
|
||||||
|
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
|
||||||
|
for name, group := range importer.groups {
|
||||||
|
level.Info(importer.logger).Log("backfiller", "processing group", "name", name)
|
||||||
|
|
||||||
|
stimeWithAlignment := group.EvalTimestamp(importer.config.start.UnixNano())
|
||||||
|
for i, r := range group.Rules() {
|
||||||
|
level.Info(importer.logger).Log("backfiller", "processing rule", "id", i, "name", r.Name())
|
||||||
|
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errs
|
||||||
|
}
|
||||||
|
|
||||||
|
// importRule queries a prometheus API to evaluate rules at times in the past.
|
||||||
|
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, grp *rules.Group) (err error) {
|
||||||
|
blockDuration := tsdb.DefaultBlockDuration
|
||||||
|
startInMs := start.Unix() * int64(time.Second/time.Millisecond)
|
||||||
|
endInMs := end.Unix() * int64(time.Second/time.Millisecond)
|
||||||
|
|
||||||
|
for startOfBlock := blockDuration * (startInMs / blockDuration); startOfBlock <= endInMs; startOfBlock = startOfBlock + blockDuration {
|
||||||
|
endOfBlock := startOfBlock + blockDuration - 1
|
||||||
|
|
||||||
|
currStart := max(startOfBlock/int64(time.Second/time.Millisecond), start.Unix())
|
||||||
|
startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UTC().UnixNano())
|
||||||
|
val, warnings, err := importer.apiClient.QueryRange(ctx,
|
||||||
|
ruleExpr,
|
||||||
|
v1.Range{
|
||||||
|
Start: startWithAlignment,
|
||||||
|
End: time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0).UTC(),
|
||||||
|
Step: grp.Interval(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "query range")
|
||||||
|
}
|
||||||
|
if warnings != nil {
|
||||||
|
level.Warn(importer.logger).Log("msg", "Range query returned warnings.", "warnings", warnings)
|
||||||
|
}
|
||||||
|
|
||||||
|
// To prevent races with compaction, a block writer only allows appending samples
|
||||||
|
// that are at most half a block size older than the most recent sample appended so far.
|
||||||
|
// However, in the way we use the block writer here, compaction doesn't happen, while we
|
||||||
|
// also need to append samples throughout the whole block range. To allow that, we
|
||||||
|
// pretend that the block is twice as large here, but only really add sample in the
|
||||||
|
// original interval later.
|
||||||
|
w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*tsdb.DefaultBlockDuration)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "new block writer")
|
||||||
|
}
|
||||||
|
var closed bool
|
||||||
|
defer func() {
|
||||||
|
if !closed {
|
||||||
|
err = tsdb_errors.NewMulti(err, w.Close()).Err()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
app := newMultipleAppender(ctx, w)
|
||||||
|
var matrix model.Matrix
|
||||||
|
switch val.Type() {
|
||||||
|
case model.ValMatrix:
|
||||||
|
matrix = val.(model.Matrix)
|
||||||
|
|
||||||
|
for _, sample := range matrix {
|
||||||
|
currentLabels := make(labels.Labels, 0, len(sample.Metric)+len(ruleLabels)+1)
|
||||||
|
currentLabels = append(currentLabels, labels.Label{
|
||||||
|
Name: labels.MetricName,
|
||||||
|
Value: ruleName,
|
||||||
|
})
|
||||||
|
|
||||||
|
currentLabels = append(currentLabels, ruleLabels...)
|
||||||
|
|
||||||
|
for name, value := range sample.Metric {
|
||||||
|
currentLabels = append(currentLabels, labels.Label{
|
||||||
|
Name: string(name),
|
||||||
|
Value: string(value),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
for _, value := range sample.Values {
|
||||||
|
if err := app.add(ctx, currentLabels, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil {
|
||||||
|
return errors.Wrap(err, "add")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return errors.New(fmt.Sprintf("rule result is wrong type %s", val.Type().String()))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := app.flushAndCommit(ctx); err != nil {
|
||||||
|
return errors.Wrap(err, "flush and commit")
|
||||||
|
}
|
||||||
|
err = tsdb_errors.NewMulti(err, w.Close()).Err()
|
||||||
|
closed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender {
|
||||||
|
return &multipleAppender{
|
||||||
|
maxSamplesInMemory: maxSamplesInMemory,
|
||||||
|
writer: blockWriter,
|
||||||
|
appender: blockWriter.Appender(ctx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// multipleAppender keeps track of how many series have been added to the current appender.
|
||||||
|
// If the max samples have been added, then all series are commited and a new appender is created.
|
||||||
|
type multipleAppender struct {
|
||||||
|
maxSamplesInMemory int
|
||||||
|
currentSampleCount int
|
||||||
|
writer *tsdb.BlockWriter
|
||||||
|
appender storage.Appender
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error {
|
||||||
|
if _, err := m.appender.Append(0, l, t, v); err != nil {
|
||||||
|
return errors.Wrap(err, "multiappender append")
|
||||||
|
}
|
||||||
|
m.currentSampleCount++
|
||||||
|
if m.currentSampleCount >= m.maxSamplesInMemory {
|
||||||
|
return m.commit(ctx)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *multipleAppender) commit(ctx context.Context) error {
|
||||||
|
if m.currentSampleCount == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := m.appender.Commit(); err != nil {
|
||||||
|
return errors.Wrap(err, "multiappender commit")
|
||||||
|
}
|
||||||
|
m.appender = m.writer.Appender(ctx)
|
||||||
|
m.currentSampleCount = 0
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
|
||||||
|
if err := m.commit(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := m.writer.Flush(ctx); err != nil {
|
||||||
|
return errors.Wrap(err, "multiappender flush")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func max(x, y int64) int64 {
|
||||||
|
if x > y {
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
return y
|
||||||
|
}
|
||||||
|
|
||||||
|
func min(x, y int64) int64 {
|
||||||
|
if x < y {
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
return y
|
||||||
|
}
|
208
cmd/promtool/rules_test.go
Normal file
208
cmd/promtool/rules_test.go
Normal file
|
@ -0,0 +1,208 @@
|
||||||
|
// Copyright 2020 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io/ioutil"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockQueryRangeAPI struct {
|
||||||
|
samples model.Matrix
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mockAPI mockQueryRangeAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) {
|
||||||
|
return mockAPI.samples, v1.Warnings{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together.
|
||||||
|
func TestBackfillRuleIntegration(t *testing.T) {
|
||||||
|
const (
|
||||||
|
testMaxSampleCount = 50
|
||||||
|
testValue = 123
|
||||||
|
testValue2 = 98
|
||||||
|
)
|
||||||
|
var (
|
||||||
|
start = time.Date(2009, time.November, 10, 6, 34, 0, 0, time.UTC)
|
||||||
|
testTime = model.Time(start.Add(-9 * time.Hour).Unix())
|
||||||
|
testTime2 = model.Time(start.Add(-8 * time.Hour).Unix())
|
||||||
|
)
|
||||||
|
|
||||||
|
var testCases = []struct {
|
||||||
|
name string
|
||||||
|
runcount int
|
||||||
|
expectedBlockCount int
|
||||||
|
expectedSeriesCount int
|
||||||
|
expectedSampleCount int
|
||||||
|
samples []*model.SampleStream
|
||||||
|
}{
|
||||||
|
{"no samples", 1, 0, 0, 0, []*model.SampleStream{}},
|
||||||
|
{"run importer once", 1, 8, 4, 4, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}},
|
||||||
|
{"one importer twice", 2, 8, 4, 8, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}, {Timestamp: testTime2, Value: testValue2}}}}},
|
||||||
|
}
|
||||||
|
for _, tt := range testCases {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
tmpDir, err := ioutil.TempDir("", "backfilldata")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.RemoveAll(tmpDir))
|
||||||
|
}()
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Execute the test more than once to simulate running the rule importer twice with the same data.
|
||||||
|
// We expect duplicate blocks with the same series are created when run more than once.
|
||||||
|
for i := 0; i < tt.runcount; i++ {
|
||||||
|
ruleImporter, err := newTestRuleImporter(ctx, start, tmpDir, tt.samples)
|
||||||
|
require.NoError(t, err)
|
||||||
|
path1 := filepath.Join(tmpDir, "test.file")
|
||||||
|
require.NoError(t, createSingleRuleTestFiles(path1))
|
||||||
|
path2 := filepath.Join(tmpDir, "test2.file")
|
||||||
|
require.NoError(t, createMultiRuleTestFiles(path2))
|
||||||
|
|
||||||
|
// Confirm that the rule files were loaded in correctly.
|
||||||
|
errs := ruleImporter.loadGroups(ctx, []string{path1, path2})
|
||||||
|
for _, err := range errs {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.Equal(t, 3, len(ruleImporter.groups))
|
||||||
|
group1 := ruleImporter.groups[path1+";group0"]
|
||||||
|
require.NotNil(t, group1)
|
||||||
|
const defaultInterval = 60
|
||||||
|
require.Equal(t, time.Duration(defaultInterval*time.Second), group1.Interval())
|
||||||
|
gRules := group1.Rules()
|
||||||
|
require.Equal(t, 1, len(gRules))
|
||||||
|
require.Equal(t, "rule1", gRules[0].Name())
|
||||||
|
require.Equal(t, "ruleExpr", gRules[0].Query().String())
|
||||||
|
require.Equal(t, 1, len(gRules[0].Labels()))
|
||||||
|
|
||||||
|
group2 := ruleImporter.groups[path2+";group2"]
|
||||||
|
require.NotNil(t, group2)
|
||||||
|
require.Equal(t, time.Duration(defaultInterval*time.Second), group2.Interval())
|
||||||
|
g2Rules := group2.Rules()
|
||||||
|
require.Equal(t, 2, len(g2Rules))
|
||||||
|
require.Equal(t, "grp2_rule1", g2Rules[0].Name())
|
||||||
|
require.Equal(t, "grp2_rule1_expr", g2Rules[0].Query().String())
|
||||||
|
require.Equal(t, 0, len(g2Rules[0].Labels()))
|
||||||
|
|
||||||
|
// Backfill all recording rules then check the blocks to confirm the correct data was created.
|
||||||
|
errs = ruleImporter.importAll(ctx)
|
||||||
|
for _, err := range errs {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := tsdb.DefaultOptions()
|
||||||
|
opts.AllowOverlappingBlocks = true
|
||||||
|
db, err := tsdb.Open(tmpDir, nil, nil, opts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
blocks := db.Blocks()
|
||||||
|
require.Equal(t, (i+1)*tt.expectedBlockCount, len(blocks))
|
||||||
|
|
||||||
|
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
selectedSeries := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||||
|
var seriesCount, samplesCount int
|
||||||
|
for selectedSeries.Next() {
|
||||||
|
seriesCount++
|
||||||
|
series := selectedSeries.At()
|
||||||
|
if len(series.Labels()) != 3 {
|
||||||
|
require.Equal(t, 2, len(series.Labels()))
|
||||||
|
x := labels.Labels{
|
||||||
|
labels.Label{Name: "__name__", Value: "grp2_rule1"},
|
||||||
|
labels.Label{Name: "name1", Value: "val1"},
|
||||||
|
}
|
||||||
|
require.Equal(t, x, series.Labels())
|
||||||
|
} else {
|
||||||
|
require.Equal(t, 3, len(series.Labels()))
|
||||||
|
}
|
||||||
|
it := series.Iterator()
|
||||||
|
for it.Next() {
|
||||||
|
samplesCount++
|
||||||
|
ts, v := it.At()
|
||||||
|
if v == testValue {
|
||||||
|
require.Equal(t, int64(testTime), ts)
|
||||||
|
} else {
|
||||||
|
require.Equal(t, int64(testTime2), ts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
}
|
||||||
|
require.NoError(t, selectedSeries.Err())
|
||||||
|
require.Equal(t, tt.expectedSeriesCount, seriesCount)
|
||||||
|
require.Equal(t, tt.expectedSampleCount, samplesCount)
|
||||||
|
require.NoError(t, q.Close())
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string, testSamples model.Matrix) (*ruleImporter, error) {
|
||||||
|
logger := log.NewNopLogger()
|
||||||
|
cfg := ruleImporterConfig{
|
||||||
|
outputDir: tmpDir,
|
||||||
|
start: start.Add(-10 * time.Hour),
|
||||||
|
end: start.Add(-7 * time.Hour),
|
||||||
|
evalInterval: 60 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
return newRuleImporter(logger, cfg, mockQueryRangeAPI{
|
||||||
|
samples: testSamples,
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createSingleRuleTestFiles(path string) error {
|
||||||
|
recordingRules := `groups:
|
||||||
|
- name: group0
|
||||||
|
rules:
|
||||||
|
- record: rule1
|
||||||
|
expr: ruleExpr
|
||||||
|
labels:
|
||||||
|
testlabel11: testlabelvalue11
|
||||||
|
`
|
||||||
|
return ioutil.WriteFile(path, []byte(recordingRules), 0777)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createMultiRuleTestFiles(path string) error {
|
||||||
|
recordingRules := `groups:
|
||||||
|
- name: group1
|
||||||
|
rules:
|
||||||
|
- record: grp1_rule1
|
||||||
|
expr: grp1_rule1_expr
|
||||||
|
labels:
|
||||||
|
testlabel11: testlabelvalue11
|
||||||
|
- name: group2
|
||||||
|
rules:
|
||||||
|
- record: grp2_rule1
|
||||||
|
expr: grp2_rule1_expr
|
||||||
|
- record: grp2_rule2
|
||||||
|
expr: grp2_rule2_expr
|
||||||
|
labels:
|
||||||
|
testlabel11: testlabelvalue11
|
||||||
|
`
|
||||||
|
return ioutil.WriteFile(path, []byte(recordingRules), 0777)
|
||||||
|
}
|
|
@ -216,6 +216,8 @@ type Rule interface {
|
||||||
Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error)
|
Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error)
|
||||||
// String returns a human-readable string representation of the rule.
|
// String returns a human-readable string representation of the rule.
|
||||||
String() string
|
String() string
|
||||||
|
// Query returns the rule query expression.
|
||||||
|
Query() parser.Expr
|
||||||
// SetLastErr sets the current error experienced by the rule.
|
// SetLastErr sets the current error experienced by the rule.
|
||||||
SetLastError(error)
|
SetLastError(error)
|
||||||
// LastErr returns the last error experienced by the rule.
|
// LastErr returns the last error experienced by the rule.
|
||||||
|
@ -278,7 +280,7 @@ func NewGroup(o GroupOptions) *Group {
|
||||||
metrics = NewGroupMetrics(o.Opts.Registerer)
|
metrics = NewGroupMetrics(o.Opts.Registerer)
|
||||||
}
|
}
|
||||||
|
|
||||||
key := groupKey(o.File, o.Name)
|
key := GroupKey(o.File, o.Name)
|
||||||
metrics.iterationsMissed.WithLabelValues(key)
|
metrics.iterationsMissed.WithLabelValues(key)
|
||||||
metrics.iterationsScheduled.WithLabelValues(key)
|
metrics.iterationsScheduled.WithLabelValues(key)
|
||||||
metrics.evalTotal.WithLabelValues(key)
|
metrics.evalTotal.WithLabelValues(key)
|
||||||
|
@ -321,7 +323,7 @@ func (g *Group) run(ctx context.Context) {
|
||||||
defer close(g.terminated)
|
defer close(g.terminated)
|
||||||
|
|
||||||
// Wait an initial amount to have consistently slotted intervals.
|
// Wait an initial amount to have consistently slotted intervals.
|
||||||
evalTimestamp := g.evalTimestamp().Add(g.interval)
|
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Until(evalTimestamp)):
|
case <-time.After(time.Until(evalTimestamp)):
|
||||||
case <-g.done:
|
case <-g.done:
|
||||||
|
@ -336,7 +338,7 @@ func (g *Group) run(ctx context.Context) {
|
||||||
})
|
})
|
||||||
|
|
||||||
iter := func() {
|
iter := func() {
|
||||||
g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Inc()
|
g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
g.Eval(ctx, evalTimestamp)
|
g.Eval(ctx, evalTimestamp)
|
||||||
|
@ -388,8 +390,8 @@ func (g *Group) run(ctx context.Context) {
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
missed := (time.Since(evalTimestamp) / g.interval) - 1
|
missed := (time.Since(evalTimestamp) / g.interval) - 1
|
||||||
if missed > 0 {
|
if missed > 0 {
|
||||||
g.metrics.iterationsMissed.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed))
|
g.metrics.iterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
|
||||||
g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed))
|
g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
|
||||||
}
|
}
|
||||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
|
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
|
||||||
iter()
|
iter()
|
||||||
|
@ -410,8 +412,8 @@ func (g *Group) run(ctx context.Context) {
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
missed := (time.Since(evalTimestamp) / g.interval) - 1
|
missed := (time.Since(evalTimestamp) / g.interval) - 1
|
||||||
if missed > 0 {
|
if missed > 0 {
|
||||||
g.metrics.iterationsMissed.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed))
|
g.metrics.iterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
|
||||||
g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed))
|
g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
|
||||||
}
|
}
|
||||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
|
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
|
||||||
iter()
|
iter()
|
||||||
|
@ -474,7 +476,7 @@ func (g *Group) GetEvaluationTime() time.Duration {
|
||||||
|
|
||||||
// setEvaluationTime sets the time in seconds the last evaluation took.
|
// setEvaluationTime sets the time in seconds the last evaluation took.
|
||||||
func (g *Group) setEvaluationTime(dur time.Duration) {
|
func (g *Group) setEvaluationTime(dur time.Duration) {
|
||||||
g.metrics.groupLastDuration.WithLabelValues(groupKey(g.file, g.name)).Set(dur.Seconds())
|
g.metrics.groupLastDuration.WithLabelValues(GroupKey(g.file, g.name)).Set(dur.Seconds())
|
||||||
|
|
||||||
g.mtx.Lock()
|
g.mtx.Lock()
|
||||||
defer g.mtx.Unlock()
|
defer g.mtx.Unlock()
|
||||||
|
@ -488,21 +490,20 @@ func (g *Group) GetLastEvaluation() time.Time {
|
||||||
return g.lastEvaluation
|
return g.lastEvaluation
|
||||||
}
|
}
|
||||||
|
|
||||||
// setLastEvaluation updates lastEvaluation to the timestamp of when the rule group was last evaluated.
|
// setLastEvaluation updates evaluationTimestamp to the timestamp of when the rule group was last evaluated.
|
||||||
func (g *Group) setLastEvaluation(ts time.Time) {
|
func (g *Group) setLastEvaluation(ts time.Time) {
|
||||||
g.metrics.groupLastEvalTime.WithLabelValues(groupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9)
|
g.metrics.groupLastEvalTime.WithLabelValues(GroupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9)
|
||||||
|
|
||||||
g.mtx.Lock()
|
g.mtx.Lock()
|
||||||
defer g.mtx.Unlock()
|
defer g.mtx.Unlock()
|
||||||
g.lastEvaluation = ts
|
g.lastEvaluation = ts
|
||||||
}
|
}
|
||||||
|
|
||||||
// evalTimestamp returns the immediately preceding consistently slotted evaluation time.
|
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
|
||||||
func (g *Group) evalTimestamp() time.Time {
|
func (g *Group) EvalTimestamp(startTime int64) time.Time {
|
||||||
var (
|
var (
|
||||||
offset = int64(g.hash() % uint64(g.interval))
|
offset = int64(g.hash() % uint64(g.interval))
|
||||||
now = time.Now().UnixNano()
|
adjNow = startTime - offset
|
||||||
adjNow = now - offset
|
|
||||||
base = adjNow - (adjNow % int64(g.interval))
|
base = adjNow - (adjNow % int64(g.interval))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -588,7 +589,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
rule.SetEvaluationTimestamp(t)
|
rule.SetEvaluationTimestamp(t)
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
g.metrics.evalTotal.WithLabelValues(groupKey(g.File(), g.Name())).Inc()
|
g.metrics.evalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
|
||||||
|
|
||||||
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
|
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -600,9 +601,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
if _, ok := err.(promql.ErrQueryCanceled); !ok {
|
if _, ok := err.(promql.ErrQueryCanceled); !ok {
|
||||||
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
|
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
|
||||||
}
|
}
|
||||||
sp.SetTag("error", true)
|
g.metrics.evalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
|
||||||
sp.LogKV("error", err)
|
|
||||||
g.metrics.evalFailures.WithLabelValues(groupKey(g.File(), g.Name())).Inc()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
samplesTotal += float64(len(vector))
|
samplesTotal += float64(len(vector))
|
||||||
|
@ -671,7 +670,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
}(i, rule)
|
}(i, rule)
|
||||||
}
|
}
|
||||||
if g.metrics != nil {
|
if g.metrics != nil {
|
||||||
g.metrics.groupSamples.WithLabelValues(groupKey(g.File(), g.Name())).Set(samplesTotal)
|
g.metrics.groupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
|
||||||
}
|
}
|
||||||
g.cleanupStaleSeries(ctx, ts)
|
g.cleanupStaleSeries(ctx, ts)
|
||||||
}
|
}
|
||||||
|
@ -965,7 +964,7 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
|
||||||
// check if new group equals with the old group, if yes then skip it.
|
// check if new group equals with the old group, if yes then skip it.
|
||||||
// If not equals, stop it and wait for it to finish the current iteration.
|
// If not equals, stop it and wait for it to finish the current iteration.
|
||||||
// Then copy it into the new group.
|
// Then copy it into the new group.
|
||||||
gn := groupKey(newg.file, newg.name)
|
gn := GroupKey(newg.file, newg.name)
|
||||||
oldg, ok := m.groups[gn]
|
oldg, ok := m.groups[gn]
|
||||||
delete(m.groups, gn)
|
delete(m.groups, gn)
|
||||||
|
|
||||||
|
@ -1079,7 +1078,7 @@ func (m *Manager) LoadGroups(
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
|
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
|
||||||
Name: rg.Name,
|
Name: rg.Name,
|
||||||
File: fn,
|
File: fn,
|
||||||
Interval: itv,
|
Interval: itv,
|
||||||
|
@ -1094,8 +1093,8 @@ func (m *Manager) LoadGroups(
|
||||||
return groups, nil
|
return groups, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group names need not be unique across filenames.
|
// GroupKey group names need not be unique across filenames.
|
||||||
func groupKey(file, name string) string {
|
func GroupKey(file, name string) string {
|
||||||
return file + ";" + name
|
return file + ";" + name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,16 +90,11 @@ func (w *BlockWriter) Appender(ctx context.Context) storage.Appender {
|
||||||
// Flush implements the Writer interface. This is where actual block writing
|
// Flush implements the Writer interface. This is where actual block writing
|
||||||
// happens. After flush completes, no writes can be done.
|
// happens. After flush completes, no writes can be done.
|
||||||
func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
|
func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
|
||||||
seriesCount := w.head.NumSeries()
|
|
||||||
if w.head.NumSeries() == 0 {
|
|
||||||
return ulid.ULID{}, ErrNoSeriesAppended
|
|
||||||
}
|
|
||||||
|
|
||||||
mint := w.head.MinTime()
|
mint := w.head.MinTime()
|
||||||
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
|
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
|
||||||
// Because of this block intervals are always +1 than the total samples it includes.
|
// Because of this block intervals are always +1 than the total samples it includes.
|
||||||
maxt := w.head.MaxTime() + 1
|
maxt := w.head.MaxTime() + 1
|
||||||
level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt))
|
level.Info(w.logger).Log("msg", "flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt))
|
||||||
|
|
||||||
compactor, err := NewLeveledCompactor(ctx,
|
compactor, err := NewLeveledCompactor(ctx,
|
||||||
nil,
|
nil,
|
||||||
|
|
|
@ -36,10 +36,6 @@ func TestBlockWriter(t *testing.T) {
|
||||||
w, err := NewBlockWriter(log.NewNopLogger(), outputDir, DefaultBlockDuration)
|
w, err := NewBlockWriter(log.NewNopLogger(), outputDir, DefaultBlockDuration)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Flush with no series results in error.
|
|
||||||
_, err = w.Flush(ctx)
|
|
||||||
require.EqualError(t, err, "no series appended, aborting")
|
|
||||||
|
|
||||||
// Add some series.
|
// Add some series.
|
||||||
app := w.Appender(ctx)
|
app := w.Appender(ctx)
|
||||||
ts1, v1 := int64(44), float64(7)
|
ts1, v1 := int64(44), float64(7)
|
||||||
|
|
Loading…
Reference in a new issue