fix block alignment, add sample alignment

Signed-off-by: jessicagreben <jessicagrebens@gmail.com>
This commit is contained in:
jessicagreben 2021-03-15 12:44:58 -07:00
parent 7c26642460
commit e3a8132bb3
3 changed files with 41 additions and 26 deletions

View file

@ -877,7 +877,6 @@ func importRules(url *url.URL, start, end, outputDir string, evalInterval time.D
} }
ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, v1.NewAPI(client)) ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, v1.NewAPI(client))
errs := ruleImporter.loadGroups(ctx, files) errs := ruleImporter.loadGroups(ctx, files)
for _, err := range errs { for _, err := range errs {
if err != nil { if err != nil {

View file

@ -57,7 +57,6 @@ type ruleImporterConfig struct {
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series // newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series
// written to disk in blocks. // written to disk in blocks.
func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter { func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter {
return &ruleImporter{ return &ruleImporter{
logger: logger, logger: logger,
config: config, config: config,
@ -84,7 +83,7 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
stimeWithAlignment := group.EvalTimestamp(importer.config.start.UnixNano()) stimeWithAlignment := group.EvalTimestamp(importer.config.start.UnixNano())
for i, r := range group.Rules() { for i, r := range group.Rules() {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name())) level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name()))
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group.Interval()); err != nil { if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }
@ -93,18 +92,23 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
} }
// importRule queries a prometheus API to evaluate rules at times in the past. // importRule queries a prometheus API to evaluate rules at times in the past.
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, interval time.Duration) (err error) { func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, grp *rules.Group) (err error) {
blockDuration := tsdb.DefaultBlockDuration blockDuration := tsdb.DefaultBlockDuration
startOfBlock := blockDuration * (start.Unix() / blockDuration) startInMs := start.Unix() * int64(time.Second/time.Millisecond)
for t := startOfBlock; t <= end.Unix(); t = t + blockDuration { startOfBlock := blockDuration * (startInMs / blockDuration)
endOfBlock := t + blockDuration endInMs := end.Unix() * int64(time.Second/time.Millisecond)
for s := startOfBlock; s <= endInMs; s = s + blockDuration {
endOfBlock := s + blockDuration - 1
currStart := max(s/int64(time.Second/time.Millisecond), start.Unix())
startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UnixNano())
val, warnings, err := importer.apiClient.QueryRange(ctx, val, warnings, err := importer.apiClient.QueryRange(ctx,
ruleExpr, ruleExpr,
v1.Range{ v1.Range{
Start: time.Unix(t, 0), Start: startWithAlignment,
End: time.Unix(endOfBlock, 0), End: time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0),
Step: interval, Step: grp.Interval(),
}, },
) )
if err != nil { if err != nil {
@ -174,8 +178,7 @@ func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *mu
} }
// multipleAppender keeps track of how many series have been added to the current appender. // multipleAppender keeps track of how many series have been added to the current appender.
// If the max samples have been added, then all series are flushed to disk and commited and a new // If the max samples have been added, then all series are commited and a new appender is created.
// appender is created.
type multipleAppender struct { type multipleAppender struct {
maxSamplesInMemory int maxSamplesInMemory int
currentSampleCount int currentSampleCount int
@ -215,3 +218,17 @@ func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
} }
return nil return nil
} }
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}

View file

@ -46,6 +46,12 @@ func (mockAPI mockQueryRangeAPI) QueryRange(ctx context.Context, query string, r
return mockAPI.samples, v1.Warnings{}, nil return mockAPI.samples, v1.Warnings{}, nil
} }
func getTestProdData() []*model.SampleStream {
var result = []*model.SampleStream{}
return result
}
// TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together. // TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together.
func TestBackfillRuleIntegration(t *testing.T) { func TestBackfillRuleIntegration(t *testing.T) {
var testCases = []struct { var testCases = []struct {
@ -105,7 +111,7 @@ func TestBackfillRuleIntegration(t *testing.T) {
require.Equal(t, "grp2_rule1_expr", g2Rules[0].Query().String()) require.Equal(t, "grp2_rule1_expr", g2Rules[0].Query().String())
require.Equal(t, 0, len(g2Rules[0].Labels())) require.Equal(t, 0, len(g2Rules[0].Labels()))
// Backfill all recording rules then check the blocks to confirm the right data was created. // Backfill all recording rules then check the blocks to confirm the correct data was created.
errs = ruleImporter.importAll(ctx) errs = ruleImporter.importAll(ctx)
for _, err := range errs { for _, err := range errs {
require.NoError(t, err) require.NoError(t, err)
@ -159,22 +165,15 @@ func TestBackfillRuleIntegration(t *testing.T) {
func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string, testSamples model.Matrix) (*ruleImporter, error) { func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string, testSamples model.Matrix) (*ruleImporter, error) {
logger := log.NewNopLogger() logger := log.NewNopLogger()
cfg := ruleImporterConfig{ cfg := ruleImporterConfig{
Start: start.Add(-1 * time.Hour), outputDir: tmpDir,
End: start, start: start.Add(-1 * time.Hour),
EvalInterval: 60 * time.Second, end: start,
} evalInterval: 60 * time.Second,
writer, err := tsdb.NewBlockWriter(logger,
tmpDir,
tsdb.DefaultBlockDuration,
)
if err != nil {
return nil, err
} }
app := newMultipleAppender(ctx, testMaxSampleCount, writer)
return newRuleImporter(logger, cfg, mockQueryRangeAPI{ return newRuleImporter(logger, cfg, mockQueryRangeAPI{
samples: testSamples, samples: testSamples,
}, app), nil }), nil
} }
func createSingleRuleTestFiles(path string) error { func createSingleRuleTestFiles(path string) error {