diff --git a/.gitignore b/.gitignore index c31d513e23..de176eaabd 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ benchmark.txt /cmd/prometheus/data /cmd/prometheus/debug /benchout +/cmd/promtool/data !/.travis.yml !/.promu.yml diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 83f214560f..15fd4426f9 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -28,6 +28,7 @@ import ( "strings" "time" + "github.com/go-kit/kit/log" "github.com/google/pprof/profile" "github.com/pkg/errors" "github.com/prometheus/client_golang/api" @@ -147,6 +148,18 @@ func main() { // TODO(aSquare14): add flag to set default block duration importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String() importDBPath := openMetricsImportCmd.Arg("output directory", "Output directory for generated blocks.").Default(defaultDBPath).String() + importRulesCmd := importCmd.Command("rules", "Create new blocks of data from Prometheus data for new rules from recording rule files.") + importRulesURL := importRulesCmd.Flag("url", "The URL for the Prometheus API with the data where the rule will be backfilled from.").Default("http://localhost:9090").URL() + importRulesStart := importRulesCmd.Flag("start", "The time to start backfilling the new rule from. Must be a RFC3339 formated date or Unix timestamp. Required."). + Required().String() + importRulesEnd := importRulesCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hours ago. Must be a RFC3339 formated date or Unix timestamp.").String() + importRulesOutputDir := importRulesCmd.Flag("output-dir", "Output directory for generated blocks.").Default("data/").String() + importRulesEvalInterval := importRulesCmd.Flag("eval-interval", "How frequently to evaluate rules when backfilling if a value is not set in the recording rule files."). + Default("60s").Duration() + importRulesFiles := importRulesCmd.Arg( + "rule-files", + "A list of one or more files containing recording rules to be backfilled. All recording rules listed in the files will be backfilled. Alerting rules are not evaluated.", + ).Required().ExistingFiles() parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:])) @@ -209,6 +222,9 @@ func main() { //TODO(aSquare14): Work on adding support for custom block size. case openMetricsImportCmd.FullCommand(): os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable)) + + case importRulesCmd.FullCommand(): + os.Exit(checkErr(importRules(*importRulesURL, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *importRulesFiles...))) } } @@ -819,3 +835,63 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) { //nolint:errcheck json.NewEncoder(os.Stdout).Encode(v) } + +// importRules backfills recording rules from the files provided. The output are blocks of data +// at the outputDir location. +func importRules(url *url.URL, start, end, outputDir string, evalInterval time.Duration, files ...string) error { + ctx := context.Background() + var stime, etime time.Time + var err error + if end == "" { + etime = time.Now().UTC().Add(-3 * time.Hour) + } else { + etime, err = parseTime(end) + if err != nil { + fmt.Fprintln(os.Stderr, "error parsing end time:", err) + return err + } + } + + stime, err = parseTime(start) + if err != nil { + fmt.Fprintln(os.Stderr, "error parsing start time:", err) + return err + } + + if !stime.Before(etime) { + fmt.Fprintln(os.Stderr, "start time is not before end time") + return nil + } + + cfg := ruleImporterConfig{ + outputDir: outputDir, + start: stime, + end: etime, + evalInterval: evalInterval, + } + client, err := api.NewClient(api.Config{ + Address: url.String(), + }) + if err != nil { + fmt.Fprintln(os.Stderr, "new api client error", err) + return err + } + + ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, v1.NewAPI(client)) + errs := ruleImporter.loadGroups(ctx, files) + for _, err := range errs { + if err != nil { + fmt.Fprintln(os.Stderr, "rule importer parse error", err) + return err + } + } + + errs = ruleImporter.importAll(ctx) + for _, err := range errs { + if err != nil { + fmt.Fprintln(os.Stderr, "rule importer error", err) + } + } + + return err +} diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go new file mode 100644 index 0000000000..e73b48241c --- /dev/null +++ b/cmd/promtool/rules.go @@ -0,0 +1,240 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" +) + +const maxSamplesInMemory = 5000 + +type queryRangeAPI interface { + QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) +} + +type ruleImporter struct { + logger log.Logger + config ruleImporterConfig + + apiClient queryRangeAPI + + groups map[string]*rules.Group + ruleManager *rules.Manager +} + +type ruleImporterConfig struct { + outputDir string + start time.Time + end time.Time + evalInterval time.Duration +} + +// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series +// written to disk in blocks. +func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter { + level.Info(logger).Log("backfiller", "new rule importer from start", config.start.Format(time.RFC822), " to end", config.end.Format(time.RFC822)) + return &ruleImporter{ + logger: logger, + config: config, + apiClient: apiClient, + ruleManager: rules.NewManager(&rules.ManagerOptions{}), + } +} + +// loadGroups parses groups from a list of recording rule files. +func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) { + groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, filenames...) + if errs != nil { + return errs + } + importer.groups = groups + return nil +} + +// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks. +func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) { + for name, group := range importer.groups { + level.Info(importer.logger).Log("backfiller", "processing group", "name", name) + + stimeWithAlignment := group.EvalTimestamp(importer.config.start.UnixNano()) + for i, r := range group.Rules() { + level.Info(importer.logger).Log("backfiller", "processing rule", "id", i, "name", r.Name()) + if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group); err != nil { + errs = append(errs, err) + } + } + } + return errs +} + +// importRule queries a prometheus API to evaluate rules at times in the past. +func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, grp *rules.Group) (err error) { + blockDuration := tsdb.DefaultBlockDuration + startInMs := start.Unix() * int64(time.Second/time.Millisecond) + endInMs := end.Unix() * int64(time.Second/time.Millisecond) + + for startOfBlock := blockDuration * (startInMs / blockDuration); startOfBlock <= endInMs; startOfBlock = startOfBlock + blockDuration { + endOfBlock := startOfBlock + blockDuration - 1 + + currStart := max(startOfBlock/int64(time.Second/time.Millisecond), start.Unix()) + startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UTC().UnixNano()) + val, warnings, err := importer.apiClient.QueryRange(ctx, + ruleExpr, + v1.Range{ + Start: startWithAlignment, + End: time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0).UTC(), + Step: grp.Interval(), + }, + ) + if err != nil { + return errors.Wrap(err, "query range") + } + if warnings != nil { + level.Warn(importer.logger).Log("msg", "Range query returned warnings.", "warnings", warnings) + } + + // To prevent races with compaction, a block writer only allows appending samples + // that are at most half a block size older than the most recent sample appended so far. + // However, in the way we use the block writer here, compaction doesn't happen, while we + // also need to append samples throughout the whole block range. To allow that, we + // pretend that the block is twice as large here, but only really add sample in the + // original interval later. + w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*tsdb.DefaultBlockDuration) + if err != nil { + return errors.Wrap(err, "new block writer") + } + var closed bool + defer func() { + if !closed { + err = tsdb_errors.NewMulti(err, w.Close()).Err() + } + }() + app := newMultipleAppender(ctx, w) + var matrix model.Matrix + switch val.Type() { + case model.ValMatrix: + matrix = val.(model.Matrix) + + for _, sample := range matrix { + currentLabels := make(labels.Labels, 0, len(sample.Metric)+len(ruleLabels)+1) + currentLabels = append(currentLabels, labels.Label{ + Name: labels.MetricName, + Value: ruleName, + }) + + currentLabels = append(currentLabels, ruleLabels...) + + for name, value := range sample.Metric { + currentLabels = append(currentLabels, labels.Label{ + Name: string(name), + Value: string(value), + }) + } + for _, value := range sample.Values { + if err := app.add(ctx, currentLabels, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil { + return errors.Wrap(err, "add") + } + } + } + default: + return errors.New(fmt.Sprintf("rule result is wrong type %s", val.Type().String())) + } + + if err := app.flushAndCommit(ctx); err != nil { + return errors.Wrap(err, "flush and commit") + } + err = tsdb_errors.NewMulti(err, w.Close()).Err() + closed = true + } + + return err +} + +func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender { + return &multipleAppender{ + maxSamplesInMemory: maxSamplesInMemory, + writer: blockWriter, + appender: blockWriter.Appender(ctx), + } +} + +// multipleAppender keeps track of how many series have been added to the current appender. +// If the max samples have been added, then all series are commited and a new appender is created. +type multipleAppender struct { + maxSamplesInMemory int + currentSampleCount int + writer *tsdb.BlockWriter + appender storage.Appender +} + +func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error { + if _, err := m.appender.Append(0, l, t, v); err != nil { + return errors.Wrap(err, "multiappender append") + } + m.currentSampleCount++ + if m.currentSampleCount >= m.maxSamplesInMemory { + return m.commit(ctx) + } + return nil +} + +func (m *multipleAppender) commit(ctx context.Context) error { + if m.currentSampleCount == 0 { + return nil + } + if err := m.appender.Commit(); err != nil { + return errors.Wrap(err, "multiappender commit") + } + m.appender = m.writer.Appender(ctx) + m.currentSampleCount = 0 + return nil +} + +func (m *multipleAppender) flushAndCommit(ctx context.Context) error { + if err := m.commit(ctx); err != nil { + return err + } + if _, err := m.writer.Flush(ctx); err != nil { + return errors.Wrap(err, "multiappender flush") + } + return nil +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} + +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go new file mode 100644 index 0000000000..3459e89426 --- /dev/null +++ b/cmd/promtool/rules_test.go @@ -0,0 +1,208 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "io/ioutil" + "math" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/kit/log" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/require" +) + +type mockQueryRangeAPI struct { + samples model.Matrix +} + +func (mockAPI mockQueryRangeAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) { + return mockAPI.samples, v1.Warnings{}, nil +} + +// TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together. +func TestBackfillRuleIntegration(t *testing.T) { + const ( + testMaxSampleCount = 50 + testValue = 123 + testValue2 = 98 + ) + var ( + start = time.Date(2009, time.November, 10, 6, 34, 0, 0, time.UTC) + testTime = model.Time(start.Add(-9 * time.Hour).Unix()) + testTime2 = model.Time(start.Add(-8 * time.Hour).Unix()) + ) + + var testCases = []struct { + name string + runcount int + expectedBlockCount int + expectedSeriesCount int + expectedSampleCount int + samples []*model.SampleStream + }{ + {"no samples", 1, 0, 0, 0, []*model.SampleStream{}}, + {"run importer once", 1, 8, 4, 4, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}}, + {"one importer twice", 2, 8, 4, 8, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}, {Timestamp: testTime2, Value: testValue2}}}}}, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "backfilldata") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(tmpDir)) + }() + ctx := context.Background() + + // Execute the test more than once to simulate running the rule importer twice with the same data. + // We expect duplicate blocks with the same series are created when run more than once. + for i := 0; i < tt.runcount; i++ { + ruleImporter, err := newTestRuleImporter(ctx, start, tmpDir, tt.samples) + require.NoError(t, err) + path1 := filepath.Join(tmpDir, "test.file") + require.NoError(t, createSingleRuleTestFiles(path1)) + path2 := filepath.Join(tmpDir, "test2.file") + require.NoError(t, createMultiRuleTestFiles(path2)) + + // Confirm that the rule files were loaded in correctly. + errs := ruleImporter.loadGroups(ctx, []string{path1, path2}) + for _, err := range errs { + require.NoError(t, err) + } + require.Equal(t, 3, len(ruleImporter.groups)) + group1 := ruleImporter.groups[path1+";group0"] + require.NotNil(t, group1) + const defaultInterval = 60 + require.Equal(t, time.Duration(defaultInterval*time.Second), group1.Interval()) + gRules := group1.Rules() + require.Equal(t, 1, len(gRules)) + require.Equal(t, "rule1", gRules[0].Name()) + require.Equal(t, "ruleExpr", gRules[0].Query().String()) + require.Equal(t, 1, len(gRules[0].Labels())) + + group2 := ruleImporter.groups[path2+";group2"] + require.NotNil(t, group2) + require.Equal(t, time.Duration(defaultInterval*time.Second), group2.Interval()) + g2Rules := group2.Rules() + require.Equal(t, 2, len(g2Rules)) + require.Equal(t, "grp2_rule1", g2Rules[0].Name()) + require.Equal(t, "grp2_rule1_expr", g2Rules[0].Query().String()) + require.Equal(t, 0, len(g2Rules[0].Labels())) + + // Backfill all recording rules then check the blocks to confirm the correct data was created. + errs = ruleImporter.importAll(ctx) + for _, err := range errs { + require.NoError(t, err) + } + + opts := tsdb.DefaultOptions() + opts.AllowOverlappingBlocks = true + db, err := tsdb.Open(tmpDir, nil, nil, opts) + require.NoError(t, err) + + blocks := db.Blocks() + require.Equal(t, (i+1)*tt.expectedBlockCount, len(blocks)) + + q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + selectedSeries := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + var seriesCount, samplesCount int + for selectedSeries.Next() { + seriesCount++ + series := selectedSeries.At() + if len(series.Labels()) != 3 { + require.Equal(t, 2, len(series.Labels())) + x := labels.Labels{ + labels.Label{Name: "__name__", Value: "grp2_rule1"}, + labels.Label{Name: "name1", Value: "val1"}, + } + require.Equal(t, x, series.Labels()) + } else { + require.Equal(t, 3, len(series.Labels())) + } + it := series.Iterator() + for it.Next() { + samplesCount++ + ts, v := it.At() + if v == testValue { + require.Equal(t, int64(testTime), ts) + } else { + require.Equal(t, int64(testTime2), ts) + } + } + require.NoError(t, it.Err()) + } + require.NoError(t, selectedSeries.Err()) + require.Equal(t, tt.expectedSeriesCount, seriesCount) + require.Equal(t, tt.expectedSampleCount, samplesCount) + require.NoError(t, q.Close()) + require.NoError(t, db.Close()) + } + }) + } +} + +func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string, testSamples model.Matrix) (*ruleImporter, error) { + logger := log.NewNopLogger() + cfg := ruleImporterConfig{ + outputDir: tmpDir, + start: start.Add(-10 * time.Hour), + end: start.Add(-7 * time.Hour), + evalInterval: 60 * time.Second, + } + + return newRuleImporter(logger, cfg, mockQueryRangeAPI{ + samples: testSamples, + }), nil +} + +func createSingleRuleTestFiles(path string) error { + recordingRules := `groups: +- name: group0 + rules: + - record: rule1 + expr: ruleExpr + labels: + testlabel11: testlabelvalue11 +` + return ioutil.WriteFile(path, []byte(recordingRules), 0777) +} + +func createMultiRuleTestFiles(path string) error { + recordingRules := `groups: +- name: group1 + rules: + - record: grp1_rule1 + expr: grp1_rule1_expr + labels: + testlabel11: testlabelvalue11 +- name: group2 + rules: + - record: grp2_rule1 + expr: grp2_rule1_expr + - record: grp2_rule2 + expr: grp2_rule2_expr + labels: + testlabel11: testlabelvalue11 +` + return ioutil.WriteFile(path, []byte(recordingRules), 0777) +} diff --git a/rules/manager.go b/rules/manager.go index 04ce2f3098..3f4ce8d8bd 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -216,6 +216,8 @@ type Rule interface { Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error) // String returns a human-readable string representation of the rule. String() string + // Query returns the rule query expression. + Query() parser.Expr // SetLastErr sets the current error experienced by the rule. SetLastError(error) // LastErr returns the last error experienced by the rule. @@ -278,7 +280,7 @@ func NewGroup(o GroupOptions) *Group { metrics = NewGroupMetrics(o.Opts.Registerer) } - key := groupKey(o.File, o.Name) + key := GroupKey(o.File, o.Name) metrics.iterationsMissed.WithLabelValues(key) metrics.iterationsScheduled.WithLabelValues(key) metrics.evalTotal.WithLabelValues(key) @@ -321,7 +323,7 @@ func (g *Group) run(ctx context.Context) { defer close(g.terminated) // Wait an initial amount to have consistently slotted intervals. - evalTimestamp := g.evalTimestamp().Add(g.interval) + evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval) select { case <-time.After(time.Until(evalTimestamp)): case <-g.done: @@ -336,7 +338,7 @@ func (g *Group) run(ctx context.Context) { }) iter := func() { - g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Inc() + g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc() start := time.Now() g.Eval(ctx, evalTimestamp) @@ -388,8 +390,8 @@ func (g *Group) run(ctx context.Context) { case <-tick.C: missed := (time.Since(evalTimestamp) / g.interval) - 1 if missed > 0 { - g.metrics.iterationsMissed.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed)) - g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.iterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) } evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) iter() @@ -410,8 +412,8 @@ func (g *Group) run(ctx context.Context) { case <-tick.C: missed := (time.Since(evalTimestamp) / g.interval) - 1 if missed > 0 { - g.metrics.iterationsMissed.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed)) - g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.iterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) } evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) iter() @@ -474,7 +476,7 @@ func (g *Group) GetEvaluationTime() time.Duration { // setEvaluationTime sets the time in seconds the last evaluation took. func (g *Group) setEvaluationTime(dur time.Duration) { - g.metrics.groupLastDuration.WithLabelValues(groupKey(g.file, g.name)).Set(dur.Seconds()) + g.metrics.groupLastDuration.WithLabelValues(GroupKey(g.file, g.name)).Set(dur.Seconds()) g.mtx.Lock() defer g.mtx.Unlock() @@ -488,21 +490,20 @@ func (g *Group) GetLastEvaluation() time.Time { return g.lastEvaluation } -// setLastEvaluation updates lastEvaluation to the timestamp of when the rule group was last evaluated. +// setLastEvaluation updates evaluationTimestamp to the timestamp of when the rule group was last evaluated. func (g *Group) setLastEvaluation(ts time.Time) { - g.metrics.groupLastEvalTime.WithLabelValues(groupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9) + g.metrics.groupLastEvalTime.WithLabelValues(GroupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9) g.mtx.Lock() defer g.mtx.Unlock() g.lastEvaluation = ts } -// evalTimestamp returns the immediately preceding consistently slotted evaluation time. -func (g *Group) evalTimestamp() time.Time { +// EvalTimestamp returns the immediately preceding consistently slotted evaluation time. +func (g *Group) EvalTimestamp(startTime int64) time.Time { var ( offset = int64(g.hash() % uint64(g.interval)) - now = time.Now().UnixNano() - adjNow = now - offset + adjNow = startTime - offset base = adjNow - (adjNow % int64(g.interval)) ) @@ -588,7 +589,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { rule.SetEvaluationTimestamp(t) }(time.Now()) - g.metrics.evalTotal.WithLabelValues(groupKey(g.File(), g.Name())).Inc() + g.metrics.evalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL) if err != nil { @@ -600,9 +601,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if _, ok := err.(promql.ErrQueryCanceled); !ok { level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) } - sp.SetTag("error", true) - sp.LogKV("error", err) - g.metrics.evalFailures.WithLabelValues(groupKey(g.File(), g.Name())).Inc() + g.metrics.evalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() return } samplesTotal += float64(len(vector)) @@ -671,7 +670,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { }(i, rule) } if g.metrics != nil { - g.metrics.groupSamples.WithLabelValues(groupKey(g.File(), g.Name())).Set(samplesTotal) + g.metrics.groupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) } g.cleanupStaleSeries(ctx, ts) } @@ -965,7 +964,7 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels // check if new group equals with the old group, if yes then skip it. // If not equals, stop it and wait for it to finish the current iteration. // Then copy it into the new group. - gn := groupKey(newg.file, newg.name) + gn := GroupKey(newg.file, newg.name) oldg, ok := m.groups[gn] delete(m.groups, gn) @@ -1079,7 +1078,7 @@ func (m *Manager) LoadGroups( )) } - groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{ + groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name: rg.Name, File: fn, Interval: itv, @@ -1094,8 +1093,8 @@ func (m *Manager) LoadGroups( return groups, nil } -// Group names need not be unique across filenames. -func groupKey(file, name string) string { +// GroupKey group names need not be unique across filenames. +func GroupKey(file, name string) string { return file + ";" + name } diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 7ede6c6304..2d19c77050 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -90,16 +90,11 @@ func (w *BlockWriter) Appender(ctx context.Context) storage.Appender { // Flush implements the Writer interface. This is where actual block writing // happens. After flush completes, no writes can be done. func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { - seriesCount := w.head.NumSeries() - if w.head.NumSeries() == 0 { - return ulid.ULID{}, ErrNoSeriesAppended - } - mint := w.head.MinTime() // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). // Because of this block intervals are always +1 than the total samples it includes. maxt := w.head.MaxTime() + 1 - level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) + level.Info(w.logger).Log("msg", "flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) compactor, err := NewLeveledCompactor(ctx, nil, diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go index 2be2d193cb..9145a31282 100644 --- a/tsdb/blockwriter_test.go +++ b/tsdb/blockwriter_test.go @@ -36,10 +36,6 @@ func TestBlockWriter(t *testing.T) { w, err := NewBlockWriter(log.NewNopLogger(), outputDir, DefaultBlockDuration) require.NoError(t, err) - // Flush with no series results in error. - _, err = w.Flush(ctx) - require.EqualError(t, err, "no series appended, aborting") - // Add some series. app := w.Appender(ctx) ts1, v1 := int64(44), float64(7)