diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e2fb6b26f8..7945597008 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -984,9 +984,9 @@ func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (stor } // Appender implements the Storage interface. -func (s *readyStorage) Appender() storage.Appender { +func (s *readyStorage) Appender(ctx context.Context) storage.Appender { if x := s.get(); x != nil { - return x.Appender() + return x.Appender(ctx) } return notReadyAppender{} } diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index f556437b26..a8c907f03f 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -263,7 +263,7 @@ func TestTimeMetrics(t *testing.T) { "prometheus_tsdb_head_max_time_seconds", )) - app := db.Appender() + app := db.Appender(context.Background()) _, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1) testutil.Ok(t, err) _, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1) diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 04f0877bce..78940c1c91 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -199,7 +199,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in total := uint64(0) for i := 0; i < scrapeCount; i++ { - app := b.storage.Appender() + app := b.storage.Appender(context.TODO()) ts += timeDelta for _, s := range scrape { diff --git a/promql/bench_test.go b/promql/bench_test.go index b9b12c0aa7..f1ea31ceb6 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -68,7 +68,7 @@ func BenchmarkRangeQuery(b *testing.B) { numIntervals := 8640 + 10000 for s := 0; s < numIntervals; s++ { - a := storage.Appender() + a := storage.Appender(context.Background()) ts := int64(s * 10000) // 10s interval. for i, metric := range metrics { err := a.AddFast(refs[i], ts, float64(s)) diff --git a/promql/functions_test.go b/promql/functions_test.go index fd27502639..00e028b987 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -40,7 +40,7 @@ func TestDeriv(t *testing.T) { } engine := NewEngine(opts) - a := storage.Appender() + a := storage.Appender(context.Background()) metric := labels.FromStrings("__name__", "foo") a.Add(metric, 1493712816939, 1.0) diff --git a/promql/test.go b/promql/test.go index ff2549144e..bcd90200f8 100644 --- a/promql/test.go +++ b/promql/test.go @@ -433,7 +433,7 @@ func (t *Test) exec(tc testCommand) error { t.clear() case *loadCmd: - app := t.storage.Appender() + app := t.storage.Appender(t.context) if err := cmd.append(app); err != nil { app.Rollback() return err @@ -644,7 +644,7 @@ func (ll *LazyLoader) clear() { // appendTill appends the defined time series to the storage till the given timestamp (in milliseconds). func (ll *LazyLoader) appendTill(ts int64) error { - app := ll.storage.Appender() + app := ll.storage.Appender(ll.Context()) for h, smpls := range ll.loadCmd.defs { m := ll.loadCmd.metrics[h] for i, s := range smpls { diff --git a/rules/manager.go b/rules/manager.go index c4344fa68e..c9b7e45d85 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -352,7 +352,7 @@ func (g *Group) run(ctx context.Context) { select { case <-g.managerDone: case <-time.After(2 * g.interval): - g.cleanupStaleSeries(now) + g.cleanupStaleSeries(ctx, now) } }(time.Now()) }() @@ -588,7 +588,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { numDuplicates = 0 ) - app := g.opts.Appendable.Appender() + app := g.opts.Appendable.Appender(ctx) seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) defer func() { if err := app.Commit(); err != nil { @@ -636,14 +636,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } }(i, rule) } - g.cleanupStaleSeries(ts) + g.cleanupStaleSeries(ctx, ts) } -func (g *Group) cleanupStaleSeries(ts time.Time) { +func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { if len(g.staleSeries) == 0 { return } - app := g.opts.Appendable.Appender() + app := g.opts.Appendable.Appender(ctx) for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. _, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) diff --git a/rules/manager_test.go b/rules/manager_test.go index a865097d4e..875fe9b1f3 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -541,7 +541,7 @@ func TestStaleness(t *testing.T) { }) // A time series that has two samples and then goes stale. - app := st.Appender() + app := st.Appender(context.Background()) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) @@ -865,7 +865,7 @@ func TestNotify(t *testing.T) { Opts: opts, }) - app := storage.Appender() + app := storage.Appender(context.Background()) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3) diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index da78cb5dd0..095ea7cac3 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -14,13 +14,14 @@ package scrape import ( + "context" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) type nopAppendable struct{} -func (a nopAppendable) Appender() storage.Appender { +func (a nopAppendable) Appender(_ context.Context) storage.Appender { return nopAppender{} } diff --git a/scrape/scrape.go b/scrape/scrape.go index ce0e333321..48c679e922 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -230,7 +230,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc) }, func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, - func() storage.Appender { return appender(app.Appender(), opts.limit) }, + func(ctx context.Context) storage.Appender { return appender(app.Appender(ctx), opts.limit) }, cache, jitterSeed, opts.honorTimestamps, @@ -611,7 +611,7 @@ type scrapeLoop struct { jitterSeed uint64 honorTimestamps bool - appender func() storage.Appender + appender func(ctx context.Context) storage.Appender sampleMutator labelsMutator reportSampleMutator labelsMutator @@ -873,7 +873,7 @@ func newScrapeLoop(ctx context.Context, buffers *pool.Pool, sampleMutator labelsMutator, reportSampleMutator labelsMutator, - appender func() storage.Appender, + appender func(ctx context.Context) storage.Appender, cache *scrapeCache, jitterSeed uint64, honorTimestamps bool, @@ -986,7 +986,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time } } - app := sl.appender() + app := sl.appender(scrapeCtx) var err error defer func() { if err != nil { @@ -1003,13 +1003,13 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time total, added, seriesAdded, appErr := sl.append(app, b, contentType, start) if appErr != nil { app.Rollback() - app = sl.appender() + app = sl.appender(scrapeCtx) level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) // The append failed, probably due to a parse error or sample limit. // Call sl.append again with an empty scrape to trigger stale markers. if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { app.Rollback() - app = sl.appender() + app = sl.appender(scrapeCtx) level.Warn(sl.l).Log("msg", "Append failed", "err", err) } } @@ -1066,7 +1066,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Call sl.append again with an empty scrape to trigger stale markers. // If the target has since been recreated and scraped, the // stale markers will be out of order and ignored. - app := sl.appender() + app := sl.appender(sl.ctx) var err error defer func() { if err != nil { @@ -1080,7 +1080,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int }() if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil { app.Rollback() - app = sl.appender() + app = sl.appender(sl.ctx) level.Warn(sl.l).Log("msg", "Stale append failed", "err", err) } if err = sl.reportStale(app, staleTime); err != nil { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 5240f78cac..b8227910f8 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -300,6 +300,7 @@ func TestScrapePoolReload(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} + ctx := context.Background() sp, _ := newScrapePool(cfg, app, 0, nil) loop := sp.newLoop(scrapeLoopOptions{ @@ -308,7 +309,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok := loop.(*scrapeLoop) testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped := appl.appender() + wrapped := appl.appender(ctx) tl, ok := wrapped.(*timeLimitAppender) testutil.Assert(t, ok, "Expected timeLimitAppender but got %T", wrapped) @@ -323,7 +324,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok = loop.(*scrapeLoop) testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped = appl.appender() + wrapped = appl.appender(ctx) sl, ok := wrapped.(*limitAppender) testutil.Assert(t, ok, "Expected limitAppender but got %T", wrapped) @@ -374,8 +375,9 @@ func TestScrapePoolRaces(t *testing.T) { func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} + ctx := context.Background() - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, @@ -434,10 +436,11 @@ func TestScrapeLoopStop(t *testing.T) { signal = make(chan struct{}, 1) appender = &collectResultAppender{} scraper = &testScraper{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) + ctx := context.Background() - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, @@ -499,7 +502,7 @@ func TestScrapeLoopRun(t *testing.T) { errc = make(chan error) scraper = &testScraper{} - app = func() storage.Appender { return &nopAppender{} } + app = func(ctx context.Context) storage.Appender { return &nopAppender{} } ) ctx, cancel := context.WithCancel(context.Background()) @@ -605,14 +608,14 @@ func TestScrapeLoopMetadata(t *testing.T) { nil, nil, nopMutator, nopMutator, - func() storage.Appender { return nopAppender{} }, + func(ctx context.Context) storage.Appender { return nopAppender{} }, cache, 0, true, ) defer cancel() - slApp := sl.appender() + slApp := sl.appender(ctx) total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter # HELP test_metric some help text # UNIT test_metric metric @@ -661,7 +664,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) { ) defer cancel() - slApp := sl.appender() + slApp := sl.appender(ctx) total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -669,7 +672,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) { testutil.Equals(t, 1, added) testutil.Equals(t, 1, seriesAdded) - slApp = sl.appender() + slApp = sl.appender(ctx) total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) testutil.Ok(t, slApp.Commit()) testutil.Ok(t, err) @@ -683,7 +686,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { var ( signal = make(chan struct{}, 1) scraper = &testScraper{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -736,7 +739,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { var ( signal = make(chan struct{}, 1) scraper = &testScraper{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } numScrapes = 0 ) @@ -795,7 +798,7 @@ func TestScrapeLoopCache(t *testing.T) { var ( signal = make(chan struct{}, 1) scraper = &testScraper{} - app = func() storage.Appender { appender.next = s.Appender(); return appender } + app = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -863,13 +866,13 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { s := teststorage.New(t) defer s.Close() - sapp := s.Appender() + sapp := s.Appender(context.Background()) appender := &collectResultAppender{next: sapp} var ( signal = make(chan struct{}, 1) scraper = &testScraper{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -975,8 +978,9 @@ func TestScrapeLoopAppend(t *testing.T) { discoveryLabels := &Target{ labels: labels.FromStrings(test.discoveryLabels...), } + ctx := context.Background() - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, nil, nil, nil, func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil) @@ -984,7 +988,7 @@ func TestScrapeLoopAppend(t *testing.T) { func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, discoveryLabels) }, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, @@ -992,7 +996,7 @@ func TestScrapeLoopAppend(t *testing.T) { now := time.Now() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1020,12 +1024,13 @@ func TestScrapeLoopAppend(t *testing.T) { func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { // collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next. app := &collectResultAppender{} + ctx := context.Background() - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, @@ -1045,7 +1050,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { sl.cache.addRef(mets, fakeRef, lset, hash) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte(metric), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1064,8 +1069,9 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { func TestScrapeLoopAppendSampleLimit(t *testing.T) { resApp := &collectResultAppender{} app := &limitAppender{Appender: resApp, limit: 1} + ctx := context.Background() - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, nil, nil, nil, func(l labels.Labels) labels.Labels { if l.Has("deleteme") { @@ -1074,7 +1080,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { return l }, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, @@ -1088,7 +1094,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { beforeMetricValue := beforeMetric.GetCounter().GetValue() now := time.Now() - slApp := sl.appender() + slApp := sl.appender(ctx) total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) @@ -1119,7 +1125,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected") now = time.Now() - slApp = sl.appender() + slApp = sl.appender(ctx) total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) @@ -1138,24 +1144,25 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { defer s.Close() capp := &collectResultAppender{} + ctx := context.Background() - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { capp.next = s.Appender(); return capp }, + func(ctx context.Context) storage.Appender { capp.next = s.Appender(ctx); return capp }, nil, 0, true, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender() + slApp = sl.appender(ctx) _, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1178,24 +1185,25 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) { app := &collectResultAppender{} + ctx := context.Background() - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender() + slApp = sl.appender(ctx) _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1222,23 +1230,24 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), + ctx := context.Background() + sl := newScrapeLoop(ctx, nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender() + slApp = sl.appender(ctx) _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1257,7 +1266,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { var ( scraper = &testScraper{} appender = &collectResultAppender{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -1285,7 +1294,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { var ( scraper = &testScraper{} appender = &collectResultAppender{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -1333,20 +1342,21 @@ func (app *errorAppender) AddFast(ref uint64, t int64, v float64) error { func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { app := &errorAppender{} + ctx := context.Background() - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, ) now := time.Unix(1, 0) - slApp := sl.appender() + slApp := sl.appender(ctx) total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1366,12 +1376,13 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), + ctx := context.Background() + sl := newScrapeLoop(ctx, nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { + func(ctx context.Context) storage.Appender { return &timeLimitAppender{ Appender: app, maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)), @@ -1383,7 +1394,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { ) now := time.Now().Add(20 * time.Minute) - slApp := sl.appender() + slApp := sl.appender(ctx) total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1556,21 +1567,22 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { s := teststorage.New(t) defer s.Close() - app := s.Appender() + ctx := context.Background() + app := s.Appender(ctx) capp := &collectResultAppender{next: app} - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return capp }, + func(ctx context.Context) storage.Appender { return capp }, nil, 0, true, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1589,21 +1601,22 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { s := teststorage.New(t) defer s.Close() - app := s.Appender() + ctx := context.Background() + app := s.Appender(ctx) capp := &collectResultAppender{next: app} - sl := newScrapeLoop(context.Background(), + sl := newScrapeLoop(ctx, nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return capp }, + func(ctx context.Context) storage.Appender { return capp }, nil, 0, false, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1636,7 +1649,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { defer cancel() // We add a good and a bad metric to check that both are discarded. - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) testutil.NotOk(t, err) testutil.Ok(t, slApp.Rollback()) @@ -1648,7 +1661,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { testutil.Ok(t, series.Err()) // We add a good metric to check that it is recorded. - slApp = sl.appender() + slApp = sl.appender(ctx) _, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{}) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1665,7 +1678,8 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { s := teststorage.New(t) defer s.Close() - app := s.Appender() + ctx := context.Background() + app := s.Appender(ctx) ctx, cancel := context.WithCancel(context.Background()) sl := newScrapeLoop(ctx, @@ -1678,14 +1692,14 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { return l }, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, ) defer cancel() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{}) testutil.NotOk(t, err) testutil.Ok(t, slApp.Rollback()) @@ -1901,7 +1915,7 @@ func TestScrapeAddFast(t *testing.T) { ) defer cancel() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{}) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1912,7 +1926,7 @@ func TestScrapeAddFast(t *testing.T) { v.ref++ } - slApp = sl.appender() + slApp = sl.appender(ctx) _, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) diff --git a/storage/fanout.go b/storage/fanout.go index 35ae7e9a4a..c7c7382010 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -122,11 +122,11 @@ func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQueri return NewMergeChunkQuerier(primary, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil } -func (f *fanout) Appender() Appender { - primary := f.primary.Appender() +func (f *fanout) Appender(ctx context.Context) Appender { + primary := f.primary.Appender(ctx) secondaries := make([]Appender, 0, len(f.secondaries)) for _, storage := range f.secondaries { - secondaries = append(secondaries, storage.Appender()) + secondaries = append(secondaries, storage.Appender(ctx)) } return &fanoutAppender{ logger: f.logger, diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index d263297ea0..8a730e9ed8 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -31,10 +31,11 @@ func TestSelectSorted(t *testing.T) { outputLabel := labels.FromStrings(model.MetricNameLabel, "a") inputTotalSize := 0 + ctx := context.Background() priStorage := teststorage.New(t) defer priStorage.Close() - app1 := priStorage.Appender() + app1 := priStorage.Appender(ctx) app1.Add(inputLabel, 0, 0) inputTotalSize++ app1.Add(inputLabel, 1000, 1) @@ -46,7 +47,7 @@ func TestSelectSorted(t *testing.T) { remoteStorage1 := teststorage.New(t) defer remoteStorage1.Close() - app2 := remoteStorage1.Appender() + app2 := remoteStorage1.Appender(ctx) app2.Add(inputLabel, 3000, 3) inputTotalSize++ app2.Add(inputLabel, 4000, 4) @@ -59,7 +60,7 @@ func TestSelectSorted(t *testing.T) { remoteStorage2 := teststorage.New(t) defer remoteStorage2.Close() - app3 := remoteStorage2.Appender() + app3 := remoteStorage2.Appender(ctx) app3.Add(inputLabel, 6000, 6) inputTotalSize++ app3.Add(inputLabel, 7000, 7) @@ -100,7 +101,7 @@ func TestSelectSorted(t *testing.T) { }) t.Run("chunk querier", func(t *testing.T) { t.Skip("TODO(bwplotka: Unskip when db will implement ChunkQuerier.") - querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000) + querier, err := fanoutStorage.ChunkQuerier(ctx, 0, 8000) testutil.Ok(t, err) defer querier.Close() @@ -222,7 +223,7 @@ type errChunkQuerier struct{ errQuerier } func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) { return errChunkQuerier{}, nil } -func (errStorage) Appender() storage.Appender { return nil } +func (errStorage) Appender(_ context.Context) storage.Appender { return nil } func (errStorage) StartTime() (int64, error) { return 0, nil } func (errStorage) Close() error { return nil } diff --git a/storage/interface.go b/storage/interface.go index 6ebb038424..10f581e88a 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -34,7 +34,7 @@ var ( // Appendable allows creating appenders. type Appendable interface { // Appender returns a new appender for the storage. - Appender() Appender + Appender(ctx context.Context) Appender } // SampleAndChunkQueryable allows retrieving samples as well as encoded samples in form of chunks. diff --git a/storage/remote/storage.go b/storage/remote/storage.go index fd4a0529b2..656272988e 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -169,7 +169,7 @@ func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C } // Appender implements storage.Storage. -func (s *Storage) Appender() storage.Appender { +func (s *Storage) Appender(_ context.Context) storage.Appender { return s.rws.Appender() } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index fdad0fd8c7..271a2365ea 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -911,7 +911,8 @@ func TestDisableAutoCompactions(t *testing.T) { // Trigger a compaction to check that it was skipped and // no new blocks were created when compaction is disabled. db.DisableCompactions() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for i := int64(0); i < 3; i++ { _, err := app.Add(label, i*blockRange, 0) testutil.Ok(t, err) @@ -1027,7 +1028,8 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { defaultLabel := labels.FromStrings("foo", "bar") // Add some data to the head that is enough to trigger a compaction. - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(defaultLabel, 1, 0) testutil.Ok(t, err) _, err = app.Add(defaultLabel, 2, 0) diff --git a/tsdb/db.go b/tsdb/db.go index e02ad5fae0..2dc91aad24 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -710,7 +710,7 @@ func (db *DB) run() { } // Appender opens a new appender against the database. -func (db *DB) Appender() storage.Appender { +func (db *DB) Appender(ctx context.Context) storage.Appender { return dbAppender{db: db, Appender: db.head.Appender()} } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2c26cebf3d..d3f80ca459 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -138,7 +138,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) @@ -169,9 +170,10 @@ func TestNoPanicAfterWALCorrutpion(t *testing.T) { // This is to ensure that all samples can be read from the mmaped chunks when the WAL is corrupted. var expSamples []tsdbutil.Sample var maxt int64 + ctx := context.Background() { for { - app := db.Appender() + app := db.Appender(ctx) _, err := app.Add(labels.FromStrings("foo", "bar"), maxt, 0) expSamples = append(expSamples, sample{t: maxt, v: 0}) testutil.Ok(t, err) @@ -226,7 +228,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + app := db.Appender(context.Background()) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) @@ -248,7 +250,8 @@ func TestDBAppenderAddRef(t *testing.T) { testutil.Ok(t, db.Close()) }() - app1 := db.Appender() + ctx := context.Background() + app1 := db.Appender(ctx) ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) testutil.Ok(t, err) @@ -260,7 +263,7 @@ func TestDBAppenderAddRef(t *testing.T) { err = app1.Commit() testutil.Ok(t, err) - app2 := db.Appender() + app2 := db.Appender(ctx) // first ref should already work in next transaction. err = app2.AddFast(ref1, 125, 0) @@ -302,7 +305,8 @@ func TestAppendEmptyLabelsIgnored(t *testing.T) { testutil.Ok(t, db.Close()) }() - app1 := db.Appender() + ctx := context.Background() + app1 := db.Appender(ctx) ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) testutil.Ok(t, err) @@ -354,7 +358,8 @@ Outer: testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -413,12 +418,13 @@ func TestAmendDatapointCausesError(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) testutil.Ok(t, app.Rollback()) @@ -430,12 +436,13 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN()) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN()) testutil.Ok(t, err) } @@ -446,12 +453,13 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002)) testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) } @@ -462,7 +470,8 @@ func TestEmptyLabelsetCausesError(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{}, 0, 0) testutil.NotOk(t, err) testutil.Equals(t, "empty labelset: invalid sample", err.Error()) @@ -475,7 +484,8 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { }() // Append AmendedValue. - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) testutil.Ok(t, err) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2) @@ -493,7 +503,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { }, ssMap) // Append Out of Order Value. - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3) testutil.Ok(t, err) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5) @@ -514,7 +524,8 @@ func TestDB_Snapshot(t *testing.T) { db := openTestDB(t, nil, nil) // append data - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) mint := int64(1414141414000) for i := 0; i < 1000; i++ { _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) @@ -563,7 +574,8 @@ func TestDB_Snapshot(t *testing.T) { func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { db := openTestDB(t, nil, nil) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) mint := int64(1414141414000) for i := 0; i < 1000; i++ { _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) @@ -615,7 +627,8 @@ func TestDB_SnapshotWithDelete(t *testing.T) { db := openTestDB(t, nil, nil) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -760,7 +773,8 @@ func TestDB_e2e(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for _, l := range lbls { lset := labels.New(l...) @@ -864,7 +878,8 @@ func TestWALFlushedOnDBClose(t *testing.T) { lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(lbls, 0, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -938,7 +953,7 @@ func TestWALSegmentSizeOptions(t *testing.T) { db := openTestDB(t, opts, nil) for i := int64(0); i < 155; i++ { - app := db.Appender() + app := db.Appender(context.Background()) ref, err := app.Add(labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64()) testutil.Ok(t, err) for j := int64(1); j <= 78; j++ { @@ -960,7 +975,8 @@ func TestTombstoneClean(t *testing.T) { db := openTestDB(t, nil, nil) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -1287,7 +1303,8 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { labels.FromStrings("labelname", "labelvalue"), } - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for _, lbls := range labelpairs { _, err := app.Add(lbls, 0, 1) testutil.Ok(t, err) @@ -1467,7 +1484,8 @@ func TestChunkAtBlockBoundary(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) blockRange := db.compactor.(*LeveledCompactor).ranges[0] label := labels.FromStrings("foo", "bar") @@ -1523,7 +1541,8 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) blockRange := db.compactor.(*LeveledCompactor).ranges[0] label := labels.FromStrings("foo", "bar") @@ -1572,7 +1591,8 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime()) // First added sample initializes the writable range. - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), 1000, 1) testutil.Ok(t, err) @@ -1669,6 +1689,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { func TestNoEmptyBlocks(t *testing.T) { db := openTestDB(t, nil, []int64{100}) + ctx := context.Background() defer func() { testutil.Ok(t, db.Close()) }() @@ -1688,7 +1709,7 @@ func TestNoEmptyBlocks(t *testing.T) { }) t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { - app := db.Appender() + app := db.Appender(ctx) _, err := app.Add(defaultLabel, 1, 0) testutil.Ok(t, err) _, err = app.Add(defaultLabel, 2, 0) @@ -1705,7 +1726,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Equals(t, len(db.Blocks()), len(actBlocks)) testutil.Equals(t, 0, len(actBlocks)) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(defaultLabel, 1, 0) testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") @@ -1730,7 +1751,7 @@ func TestNoEmptyBlocks(t *testing.T) { t.Run(`When no new block is created from head, and there are some blocks on disk compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { oldBlocks := db.Blocks() - app := db.Appender() + app := db.Appender(ctx) currentTime := db.Head().MaxTime() _, err := app.Add(defaultLabel, currentTime, 0) testutil.Ok(t, err) @@ -1813,7 +1834,8 @@ func TestDB_LabelNames(t *testing.T) { // Appends samples into the database. appendSamples := func(db *DB, mint, maxt int64, sampleLabels [][2]string) { t.Helper() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for i := mint; i <= maxt; i++ { for _, tuple := range sampleLabels { label := labels.FromStrings(tuple[0], tuple[1]) @@ -1880,7 +1902,8 @@ func TestCorrectNumTombstones(t *testing.T) { defaultLabel := labels.FromStrings("foo", "bar") defaultMatcher := labels.MustNewMatcher(labels.MatchEqual, defaultLabel[0].Name, defaultLabel[0].Value) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for i := int64(0); i < 3; i++ { for j := int64(0); j < 15; j++ { _, err := app.Add(defaultLabel, i*blockRange+j, 0) @@ -2283,6 +2306,7 @@ func TestVerticalCompaction(t *testing.T) { // will not overlap with the first block created by the next compaction. func TestBlockRanges(t *testing.T) { logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + ctx := context.Background() dir, err := ioutil.TempDir("", "test_storage") testutil.Ok(t, err) @@ -2298,7 +2322,7 @@ func TestBlockRanges(t *testing.T) { defer func() { os.RemoveAll(dir) }() - app := db.Appender() + app := db.Appender(ctx) lbl := labels.Labels{{Name: "a", Value: "b"}} _, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64()) if err == nil { @@ -2327,7 +2351,7 @@ func TestBlockRanges(t *testing.T) { // Test that wal records are skipped when an existing block covers the same time ranges // and compaction doesn't create an overlapping block. - app = db.Appender() + app = db.Appender(ctx) db.DisableCompactions() _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) testutil.Ok(t, err) @@ -2350,7 +2374,7 @@ func TestBlockRanges(t *testing.T) { testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks") testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block") - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -2411,7 +2435,7 @@ func TestDBReadOnly(t *testing.T) { dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir()) testutil.Ok(t, err) - app := dbWritable.Appender() + app := dbWritable.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -2481,6 +2505,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) err error maxt int + ctx = context.Background() ) // Bootstrap the db. @@ -2496,7 +2521,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { db, err := Open(dbDir, logger, nil, nil) testutil.Ok(t, err) db.DisableCompactions() - app := db.Appender() + app := db.Appender(ctx) maxt = 1000 for i := 0; i < maxt; i++ { _, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0) @@ -2560,12 +2585,13 @@ func TestDBCannotSeePartialCommits(t *testing.T) { stop := make(chan struct{}) firstInsert := make(chan struct{}) + ctx := context.Background() // Insert data in batches. go func() { iter := 0 for { - app := db.Appender() + app := db.Appender(ctx) for j := 0; j < 100; j++ { _, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter)) @@ -2631,7 +2657,8 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { testutil.Ok(t, err) defer querierBeforeAdd.Close() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) @@ -2929,7 +2956,8 @@ func TestCompactHead(t *testing.T) { db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) testutil.Ok(t, err) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) var expSamples []sample maxt := 100 for i := 0; i < maxt; i++ { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 368ed8c8cf..64281cbb43 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "io/ioutil" "math" @@ -1692,7 +1693,8 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { }() db.DisableCompactions() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for i := 1; i <= 5; i++ { _, err = app.Add(labels.FromStrings("a", "b"), int64(i), 99) testutil.Ok(t, err) @@ -1701,7 +1703,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { // Test out of order metric. testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), 2, 99) testutil.Equals(t, storage.ErrOutOfOrderSample, err) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) @@ -1716,7 +1718,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, app.Commit()) // Compact Head to test out of bound metric. - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1725,7 +1727,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, db.Compact()) testutil.Assert(t, db.head.minValidTime > 0, "") - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime-2, 99) testutil.Equals(t, storage.ErrOutOfBounds, err) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) @@ -1736,7 +1738,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, app.Commit()) // Some more valid samples for out of order. - app = db.Appender() + app = db.Appender(ctx) for i := 1; i <= 5; i++ { _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+int64(i), 99) testutil.Ok(t, err) @@ -1744,7 +1746,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, app.Commit()) // Test out of order metric. - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+2, 99) testutil.Equals(t, storage.ErrOutOfOrderSample, err) testutil.Equals(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))