From 7c26642460418654748c6bd33e1bbe8eab6e727c Mon Sep 17 00:00:00 2001 From: jessicagreben Date: Sun, 14 Mar 2021 10:10:55 -0700 Subject: [PATCH] add block alignment and write in 2 hr blocks Signed-off-by: jessicagreben --- .gitignore | 1 + cmd/promtool/main.go | 37 +++++---------- cmd/promtool/rules.go | 108 +++++++++++++++++++++++++----------------- 3 files changed, 77 insertions(+), 69 deletions(-) diff --git a/.gitignore b/.gitignore index c31d513e23..4e38927c59 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ benchmark.txt /cmd/prometheus/data /cmd/prometheus/debug /benchout +backfilldata !/.travis.yml !/.promu.yml diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index e488293933..1e201bbcf2 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -46,7 +46,6 @@ import ( _ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations. "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/prometheus/prometheus/pkg/rulefmt" - "github.com/prometheus/prometheus/tsdb" ) func main() { @@ -151,10 +150,10 @@ func main() { importRulesCmd := importCmd.Command("rules", "Create new blocks of data from Prometheus data for new rules from recording rule files.") importRulesStart := importRulesCmd.Flag("start", "The time to start backfilling the new rule from. It is required. Start time should be RFC3339 or Unix timestamp."). Required().String() - importRulesEnd := importRulesCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hrs ago. End time should be RFC3339 or Unix timestamp.").String() + importRulesEnd := importRulesCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hours ago. End time should be RFC3339 or Unix timestamp.").String() importRulesOutputDir := importRulesCmd.Flag("output-dir", "The filepath on the local filesystem to write the output to. Output will be blocks containing the data of the backfilled recording rules. Don't use an active Prometheus data directory. If command is run many times with same start/end time, it will create duplicate series.").Default("backfilldata/").String() - importRulesURL := importRulesCmd.Flag("url", "The URL for the Prometheus API with the data where the rule will be backfilled from.").Default("http://localhost:9090").String() - importRulesEvalInterval := importRulesCmd.Flag("eval-interval-default", "How frequently to evaluate rules when backfilling if a value is not set in the recording rule files."). + importRulesURL := importRulesCmd.Flag("url", "The URL for the Prometheus API with the data where the rule will be backfilled from.").Default("http://localhost:9090").URL() + importRulesEvalInterval := importRulesCmd.Flag("eval-interval", "How frequently to evaluate rules when backfilling if a value is not set in the recording rule files."). Default("60s").Duration() importRulesFiles := importRulesCmd.Arg( "rule-files", @@ -838,7 +837,7 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) { // importRules backfills recording rules from the files provided. The output are blocks of data // at the outputDir location. -func importRules(url, start, end, outputDir string, evalInterval time.Duration, files ...string) error { +func importRules(url *url.URL, start, end, outputDir string, evalInterval time.Duration, files ...string) error { ctx := context.Background() var stime, etime time.Time var err error @@ -863,33 +862,21 @@ func importRules(url, start, end, outputDir string, evalInterval time.Duration, return nil } - logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - writer, err := tsdb.NewBlockWriter(logger, - outputDir, - tsdb.DefaultBlockDuration, - ) - if err != nil { - fmt.Fprintln(os.Stderr, "new writer error", err) - return err - } - defer func() { - err = writer.Close() - }() - cfg := ruleImporterConfig{ - Start: stime, - End: etime, - EvalInterval: evalInterval, + outputDir: outputDir, + start: stime, + end: etime, + evalInterval: evalInterval, } - c, err := api.NewClient(api.Config{ - Address: url, + client, err := api.NewClient(api.Config{ + Address: url.String(), }) if err != nil { fmt.Fprintln(os.Stderr, "new api client error", err) return err } - const maxSamplesInMemory = 5000 - ruleImporter := newRuleImporter(logger, cfg, v1.NewAPI(c), newMultipleAppender(ctx, maxSamplesInMemory, writer)) + + ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, v1.NewAPI(client)) errs := ruleImporter.loadGroups(ctx, files) for _, err := range errs { diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 4733019214..6d823794bd 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -28,8 +28,11 @@ import ( "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" ) +const maxSamplesInMemory = 5000 + type queryRangeAPI interface { QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) } @@ -40,33 +43,32 @@ type ruleImporter struct { apiClient queryRangeAPI - appender *multipleAppender - groups map[string]*rules.Group ruleManager *rules.Manager } type ruleImporterConfig struct { - Start time.Time - End time.Time - EvalInterval time.Duration + outputDir string + start time.Time + end time.Time + evalInterval time.Duration } // newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series // written to disk in blocks. -func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI, appender *multipleAppender) *ruleImporter { +func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter { + return &ruleImporter{ logger: logger, config: config, apiClient: apiClient, - appender: appender, ruleManager: rules.NewManager(&rules.ManagerOptions{}), } } // loadGroups parses groups from a list of recording rule files. func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) { - groups, errs := importer.ruleManager.LoadGroups(importer.config.EvalInterval, labels.Labels{}, filenames...) + groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, filenames...) if errs != nil { return errs } @@ -79,46 +81,53 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) { for name, group := range importer.groups { level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing group, name: %s", name)) - stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano()) + stimeWithAlignment := group.EvalTimestamp(importer.config.start.UnixNano()) for i, r := range group.Rules() { level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name())) - if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.End, group.Interval()); err != nil { + if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group.Interval()); err != nil { errs = append(errs, err) } } } - if err := importer.appender.flushAndCommit(ctx); err != nil { - errs = append(errs, err) - } return errs } // importRule queries a prometheus API to evaluate rules at times in the past. -func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, interval time.Duration) error { - - // This loop breaks up the calls to the QueryRange API into 2 hr chunks so that we - // don't ever request too much data or take to long to process to avoid timeout. - for start.Before(end) { - currentBlockEnd := start.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond) - if currentBlockEnd.After(end) { - currentBlockEnd = end - } +func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, interval time.Duration) (err error) { + blockDuration := tsdb.DefaultBlockDuration + startOfBlock := blockDuration * (start.Unix() / blockDuration) + for t := startOfBlock; t <= end.Unix(); t = t + blockDuration { + endOfBlock := t + blockDuration val, warnings, err := importer.apiClient.QueryRange(ctx, ruleExpr, v1.Range{ - Start: start, - End: end, + Start: time.Unix(t, 0), + End: time.Unix(endOfBlock, 0), Step: interval, }, ) if err != nil { - return err + return errors.Wrap(err, "query range") } if warnings != nil { level.Warn(importer.logger).Log("backfiller", fmt.Sprintf("warnings QueryRange api: %v", warnings)) } + // To prevent races with compaction, a block writer only allows appending samples + // that are at most half a block size older than the most recent sample appended so far. + // However, in the way we use the block writer here, compaction doesn't happen, while we + // also need to append samples throughout the whole block range. To allow that, we + // pretend that the block is twice as large here, but only really add sample in the + // original interval later. + w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*tsdb.DefaultBlockDuration) + if err != nil { + return errors.Wrap(err, "new block writer") + } + defer func() { + err = tsdb_errors.NewMulti(err, w.Close()).Err() + }() + app := newMultipleAppender(ctx, w) var matrix model.Matrix switch val.Type() { case model.ValMatrix: @@ -139,17 +148,29 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName }) } for _, value := range sample.Values { - if err := importer.appender.add(ctx, currentLabels, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil { - return err + if err := app.add(ctx, currentLabels, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil { + return errors.Wrap(err, "add") } } } default: return errors.New(fmt.Sprintf("rule result is wrong type %s", val.Type().String())) } - start = currentBlockEnd + + if err := app.flushAndCommit(ctx); err != nil { + return errors.Wrap(err, "flush and commit") + } + } + + return err +} + +func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender { + return &multipleAppender{ + maxSamplesInMemory: maxSamplesInMemory, + writer: blockWriter, + appender: blockWriter.Appender(ctx), } - return nil } // multipleAppender keeps track of how many series have been added to the current appender. @@ -162,36 +183,35 @@ type multipleAppender struct { appender storage.Appender } -func newMultipleAppender(ctx context.Context, maxSamplesInMemory int, blockWriter *tsdb.BlockWriter) *multipleAppender { - return &multipleAppender{ - maxSamplesInMemory: maxSamplesInMemory, - writer: blockWriter, - appender: blockWriter.Appender(ctx), - } -} - func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error { if _, err := m.appender.Append(0, l, t, v); err != nil { - return err + return errors.Wrap(err, "multiappender append") } m.currentSampleCount++ if m.currentSampleCount >= m.maxSamplesInMemory { - return m.flushAndCommit(ctx) + return m.commit(ctx) } return nil } -func (m *multipleAppender) flushAndCommit(ctx context.Context) error { +func (m *multipleAppender) commit(ctx context.Context) error { if m.currentSampleCount == 0 { return nil } if err := m.appender.Commit(); err != nil { - return err - } - if _, err := m.writer.Flush(ctx); err != nil { - return err + return errors.Wrap(err, "multiappender commit") } m.appender = m.writer.Appender(ctx) m.currentSampleCount = 0 return nil } + +func (m *multipleAppender) flushAndCommit(ctx context.Context) error { + if err := m.commit(ctx); err != nil { + return err + } + if _, err := m.writer.Flush(ctx); err != nil { + return errors.Wrap(err, "multiappender flush") + } + return nil +}