mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
92218ecb9b
Add --ignore-unknown-fields that ignores unknown fields in rule group
files. There are lots of tools in the ecosystem that "like" to extend
the rule group file structure but they are currently unreadable by
promtool if there's anything extra. The purpose of this flag is so that
we could use the "vanilla" promtool instead of rolling our own.
Some examples of tools/code:
https://github.com/grafana/mimir/blob/main/pkg/mimirtool/rules/rwrulefmt/rulefmt.go
8898eb3cc5/pkg/rules/rules.go (L18-L25)
Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
237 lines
7.8 KiB
Go
237 lines
7.8 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/rules"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
)
|
|
|
|
const maxSamplesInMemory = 5000
|
|
|
|
type queryRangeAPI interface {
|
|
QueryRange(ctx context.Context, query string, r v1.Range, opts ...v1.Option) (model.Value, v1.Warnings, error)
|
|
}
|
|
|
|
type ruleImporter struct {
|
|
logger *slog.Logger
|
|
config ruleImporterConfig
|
|
|
|
apiClient queryRangeAPI
|
|
|
|
groups map[string]*rules.Group
|
|
ruleManager *rules.Manager
|
|
}
|
|
|
|
type ruleImporterConfig struct {
|
|
outputDir string
|
|
start time.Time
|
|
end time.Time
|
|
evalInterval time.Duration
|
|
maxBlockDuration time.Duration
|
|
}
|
|
|
|
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series
|
|
// written to disk in blocks.
|
|
func newRuleImporter(logger *slog.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter {
|
|
logger.Info("new rule importer", "component", "backfiller", "start", config.start.Format(time.RFC822), "end", config.end.Format(time.RFC822))
|
|
return &ruleImporter{
|
|
logger: logger,
|
|
config: config,
|
|
apiClient: apiClient,
|
|
ruleManager: rules.NewManager(&rules.ManagerOptions{}),
|
|
}
|
|
}
|
|
|
|
// loadGroups parses groups from a list of recording rule files.
|
|
func (importer *ruleImporter) loadGroups(_ context.Context, filenames []string) (errs []error) {
|
|
groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", nil, false, filenames...)
|
|
if errs != nil {
|
|
return errs
|
|
}
|
|
importer.groups = groups
|
|
return nil
|
|
}
|
|
|
|
// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks.
|
|
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
|
|
for name, group := range importer.groups {
|
|
importer.logger.Info("processing group", "component", "backfiller", "name", name)
|
|
|
|
for i, r := range group.Rules() {
|
|
importer.logger.Info("processing rule", "component", "backfiller", "id", i, "name", r.Name())
|
|
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), importer.config.start, importer.config.end, int64(importer.config.maxBlockDuration/time.Millisecond), group); err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
}
|
|
return errs
|
|
}
|
|
|
|
// importRule queries a prometheus API to evaluate rules at times in the past.
|
|
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time,
|
|
maxBlockDuration int64, grp *rules.Group,
|
|
) (err error) {
|
|
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
|
|
startInMs := start.Unix() * int64(time.Second/time.Millisecond)
|
|
endInMs := end.Unix() * int64(time.Second/time.Millisecond)
|
|
|
|
for startOfBlock := blockDuration * (startInMs / blockDuration); startOfBlock <= endInMs; startOfBlock += blockDuration {
|
|
endOfBlock := startOfBlock + blockDuration - 1
|
|
|
|
currStart := max(startOfBlock/int64(time.Second/time.Millisecond), start.Unix())
|
|
startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UTC().UnixNano())
|
|
for startWithAlignment.Unix() < currStart {
|
|
startWithAlignment = startWithAlignment.Add(grp.Interval())
|
|
}
|
|
end := time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0).UTC()
|
|
if end.Before(startWithAlignment) {
|
|
break
|
|
}
|
|
val, warnings, err := importer.apiClient.QueryRange(ctx,
|
|
ruleExpr,
|
|
v1.Range{
|
|
Start: startWithAlignment,
|
|
End: end,
|
|
Step: grp.Interval(),
|
|
},
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("query range: %w", err)
|
|
}
|
|
if warnings != nil {
|
|
importer.logger.Warn("Range query returned warnings.", "warnings", warnings)
|
|
}
|
|
|
|
// To prevent races with compaction, a block writer only allows appending samples
|
|
// that are at most half a block size older than the most recent sample appended so far.
|
|
// However, in the way we use the block writer here, compaction doesn't happen, while we
|
|
// also need to append samples throughout the whole block range. To allow that, we
|
|
// pretend that the block is twice as large here, but only really add sample in the
|
|
// original interval later.
|
|
w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration)
|
|
if err != nil {
|
|
return fmt.Errorf("new block writer: %w", err)
|
|
}
|
|
var closed bool
|
|
defer func() {
|
|
if !closed {
|
|
err = tsdb_errors.NewMulti(err, w.Close()).Err()
|
|
}
|
|
}()
|
|
app := newMultipleAppender(ctx, w)
|
|
var matrix model.Matrix
|
|
switch val.Type() {
|
|
case model.ValMatrix:
|
|
matrix = val.(model.Matrix)
|
|
|
|
for _, sample := range matrix {
|
|
lb := labels.NewBuilder(labels.Labels{})
|
|
|
|
for name, value := range sample.Metric {
|
|
lb.Set(string(name), string(value))
|
|
}
|
|
|
|
// Setting the rule labels after the output of the query,
|
|
// so they can override query output.
|
|
ruleLabels.Range(func(l labels.Label) {
|
|
lb.Set(l.Name, l.Value)
|
|
})
|
|
|
|
lb.Set(labels.MetricName, ruleName)
|
|
lbls := lb.Labels()
|
|
|
|
for _, value := range sample.Values {
|
|
if err := app.add(ctx, lbls, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil {
|
|
return fmt.Errorf("add: %w", err)
|
|
}
|
|
}
|
|
}
|
|
default:
|
|
return fmt.Errorf("rule result is wrong type %s", val.Type().String())
|
|
}
|
|
|
|
if err := app.flushAndCommit(ctx); err != nil {
|
|
return fmt.Errorf("flush and commit: %w", err)
|
|
}
|
|
err = tsdb_errors.NewMulti(err, w.Close()).Err()
|
|
closed = true
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender {
|
|
return &multipleAppender{
|
|
maxSamplesInMemory: maxSamplesInMemory,
|
|
writer: blockWriter,
|
|
appender: blockWriter.Appender(ctx),
|
|
}
|
|
}
|
|
|
|
// multipleAppender keeps track of how many series have been added to the current appender.
|
|
// If the max samples have been added, then all series are committed and a new appender is created.
|
|
type multipleAppender struct {
|
|
maxSamplesInMemory int
|
|
currentSampleCount int
|
|
writer *tsdb.BlockWriter
|
|
appender storage.Appender
|
|
}
|
|
|
|
func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error {
|
|
if _, err := m.appender.Append(0, l, t, v); err != nil {
|
|
return fmt.Errorf("multiappender append: %w", err)
|
|
}
|
|
m.currentSampleCount++
|
|
if m.currentSampleCount >= m.maxSamplesInMemory {
|
|
return m.commit(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *multipleAppender) commit(ctx context.Context) error {
|
|
if m.currentSampleCount == 0 {
|
|
return nil
|
|
}
|
|
if err := m.appender.Commit(); err != nil {
|
|
return fmt.Errorf("multiappender commit: %w", err)
|
|
}
|
|
m.appender = m.writer.Appender(ctx)
|
|
m.currentSampleCount = 0
|
|
return nil
|
|
}
|
|
|
|
func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
|
|
if err := m.commit(ctx); err != nil {
|
|
return err
|
|
}
|
|
if _, err := m.writer.Flush(ctx); err != nil {
|
|
return fmt.Errorf("multiappender flush: %w", err)
|
|
}
|
|
return nil
|
|
}
|