add changes per comments, fix tests

Signed-off-by: jessicagreben <jessicagrebens@gmail.com>
This commit is contained in:
jessicagreben 2021-03-20 12:38:30 -07:00
parent e3a8132bb3
commit 8de4da3716
5 changed files with 39 additions and 38 deletions

2
.gitignore vendored
View file

@ -11,7 +11,7 @@ benchmark.txt
/cmd/prometheus/data
/cmd/prometheus/debug
/benchout
backfilldata
/cmd/promtool/data
!/.travis.yml
!/.promu.yml

View file

@ -148,11 +148,11 @@ func main() {
importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String()
importDBPath := openMetricsImportCmd.Arg("output directory", "Output directory for generated blocks.").Default(defaultDBPath).String()
importRulesCmd := importCmd.Command("rules", "Create new blocks of data from Prometheus data for new rules from recording rule files.")
importRulesStart := importRulesCmd.Flag("start", "The time to start backfilling the new rule from. It is required. Start time should be RFC3339 or Unix timestamp.").
Required().String()
importRulesEnd := importRulesCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hours ago. End time should be RFC3339 or Unix timestamp.").String()
importRulesOutputDir := importRulesCmd.Flag("output-dir", "The filepath on the local filesystem to write the output to. Output will be blocks containing the data of the backfilled recording rules. Don't use an active Prometheus data directory. If command is run many times with same start/end time, it will create duplicate series.").Default("backfilldata/").String()
importRulesURL := importRulesCmd.Flag("url", "The URL for the Prometheus API with the data where the rule will be backfilled from.").Default("http://localhost:9090").URL()
importRulesStart := importRulesCmd.Flag("start", "The time to start backfilling the new rule from. Must be a RFC3339 formated date or Unix timestamp. Required.").
Required().String()
importRulesEnd := importRulesCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hours ago. Must be a RFC3339 formated date or Unix timestamp.").String()
importRulesOutputDir := importRulesCmd.Flag("output-dir", "Output directory for generated blocks.").Default("data/").String()
importRulesEvalInterval := importRulesCmd.Flag("eval-interval", "How frequently to evaluate rules when backfilling if a value is not set in the recording rule files.").
Default("60s").Duration()
importRulesFiles := importRulesCmd.Arg(
@ -842,7 +842,7 @@ func importRules(url *url.URL, start, end, outputDir string, evalInterval time.D
var stime, etime time.Time
var err error
if end == "" {
etime = time.Now().Add(-3 * time.Hour)
etime = time.Now().UTC().Add(-3 * time.Hour)
} else {
etime, err = parseTime(end)
if err != nil {

View file

@ -78,11 +78,11 @@ func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string
// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks.
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
for name, group := range importer.groups {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing group, name: %s", name))
level.Info(importer.logger).Log("backfiller", "processing group", "name", name)
stimeWithAlignment := group.EvalTimestamp(importer.config.start.UnixNano())
for i, r := range group.Rules() {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name()))
level.Info(importer.logger).Log("backfiller", "processing rule", "id", i, "name", r.Name())
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group); err != nil {
errs = append(errs, err)
}
@ -95,19 +95,18 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, grp *rules.Group) (err error) {
blockDuration := tsdb.DefaultBlockDuration
startInMs := start.Unix() * int64(time.Second/time.Millisecond)
startOfBlock := blockDuration * (startInMs / blockDuration)
endInMs := end.Unix() * int64(time.Second/time.Millisecond)
for s := startOfBlock; s <= endInMs; s = s + blockDuration {
endOfBlock := s + blockDuration - 1
for startOfBlock := blockDuration * (startInMs / blockDuration); startOfBlock <= endInMs; startOfBlock = startOfBlock + blockDuration {
endOfBlock := startOfBlock + blockDuration - 1
currStart := max(s/int64(time.Second/time.Millisecond), start.Unix())
startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UnixNano())
currStart := max(startOfBlock/int64(time.Second/time.Millisecond), start.Unix())
startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UTC().UnixNano())
val, warnings, err := importer.apiClient.QueryRange(ctx,
ruleExpr,
v1.Range{
Start: startWithAlignment,
End: time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0),
End: time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0).UTC(),
Step: grp.Interval(),
},
)
@ -115,7 +114,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName
return errors.Wrap(err, "query range")
}
if warnings != nil {
level.Warn(importer.logger).Log("backfiller", fmt.Sprintf("warnings QueryRange api: %v", warnings))
level.Warn(importer.logger).Log("msg", "Range query returned warnings.", "warnings", warnings)
}
// To prevent races with compaction, a block writer only allows appending samples
@ -136,6 +135,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName
switch val.Type() {
case model.ValMatrix:
matrix = val.(model.Matrix)
for _, sample := range matrix {
currentLabels := make(labels.Labels, 0, len(sample.Metric)+len(ruleLabels)+1)
currentLabels = append(currentLabels, labels.Label{

View file

@ -30,14 +30,6 @@ import (
"github.com/stretchr/testify/require"
)
const (
testMaxSampleCount = 50
testValue = 123
)
var testTime = model.Time(time.Now().Add(-20 * time.Minute).Unix())
var testTime2 = model.Time(time.Now().Add(-30 * time.Minute).Unix())
type mockQueryRangeAPI struct {
samples model.Matrix
}
@ -46,14 +38,19 @@ func (mockAPI mockQueryRangeAPI) QueryRange(ctx context.Context, query string, r
return mockAPI.samples, v1.Warnings{}, nil
}
func getTestProdData() []*model.SampleStream {
var result = []*model.SampleStream{}
return result
}
// TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together.
func TestBackfillRuleIntegration(t *testing.T) {
const (
testMaxSampleCount = 50
testValue = 123
testValue2 = 98
)
var (
start = time.Date(2009, time.November, 10, 6, 34, 0, 0, time.UTC)
testTime = model.Time(start.Add(-9 * time.Hour).Unix())
testTime2 = model.Time(start.Add(-8 * time.Hour).Unix())
)
var testCases = []struct {
name string
runcount int
@ -63,8 +60,8 @@ func TestBackfillRuleIntegration(t *testing.T) {
samples []*model.SampleStream
}{
{"no samples", 1, 0, 0, 0, []*model.SampleStream{}},
{"run importer once", 1, 1, 4, 4, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}},
{"one importer twice", 2, 1, 4, 4, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}, {Timestamp: testTime2, Value: testValue}}}}},
{"run importer once", 1, 8, 4, 4, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}},
{"one importer twice", 2, 8, 4, 8, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}, {Timestamp: testTime2, Value: testValue2}}}}},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
@ -73,11 +70,10 @@ func TestBackfillRuleIntegration(t *testing.T) {
defer func() {
require.NoError(t, os.RemoveAll(tmpDir))
}()
start := time.Now().UTC()
ctx := context.Background()
// Execute the test more than once to simulate running the rule importer twice with the same data.
// We expect that duplicate blocks with the same series are created when run more than once.
// We expect duplicate blocks with the same series are created when run more than once.
for i := 0; i < tt.runcount; i++ {
ruleImporter, err := newTestRuleImporter(ctx, start, tmpDir, tt.samples)
require.NoError(t, err)
@ -123,7 +119,7 @@ func TestBackfillRuleIntegration(t *testing.T) {
require.NoError(t, err)
blocks := db.Blocks()
require.Equal(t, i+tt.expectedBlockCount, len(blocks))
require.Equal(t, (i+1)*tt.expectedBlockCount, len(blocks))
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
@ -147,8 +143,11 @@ func TestBackfillRuleIntegration(t *testing.T) {
for it.Next() {
samplesCount++
ts, v := it.At()
require.Equal(t, float64(testValue), v)
if v == testValue {
require.Equal(t, int64(testTime), ts)
} else {
require.Equal(t, int64(testTime2), ts)
}
}
require.NoError(t, it.Err())
}
@ -166,8 +165,8 @@ func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string, te
logger := log.NewNopLogger()
cfg := ruleImporterConfig{
outputDir: tmpDir,
start: start.Add(-1 * time.Hour),
end: start,
start: start.Add(-10 * time.Hour),
end: start.Add(-7 * time.Hour),
evalInterval: 60 * time.Second,
}

View file

@ -24,6 +24,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
@ -93,6 +94,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
maxt := w.head.MaxTime() + 1
level.Info(w.logger).Log("msg", "flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt))
compactor, err := NewLeveledCompactor(ctx,
nil,