diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 16ae0520d..088661198 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -990,9 +990,9 @@ func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (stor } // Appender implements the Storage interface. -func (s *readyStorage) Appender() storage.Appender { +func (s *readyStorage) Appender(ctx context.Context) storage.Appender { if x := s.get(); x != nil { - return x.Appender() + return x.Appender(ctx) } return notReadyAppender{} } diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index f556437b2..a8c907f03 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -263,7 +263,7 @@ func TestTimeMetrics(t *testing.T) { "prometheus_tsdb_head_max_time_seconds", )) - app := db.Appender() + app := db.Appender(context.Background()) _, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1) testutil.Ok(t, err) _, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1) diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 9667f7c76..e3da413e4 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -199,7 +199,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in total := uint64(0) for i := 0; i < scrapeCount; i++ { - app := b.storage.Appender() + app := b.storage.Appender(context.TODO()) ts += timeDelta for _, s := range scrape { diff --git a/promql/bench_test.go b/promql/bench_test.go index b9b12c0aa..f1ea31ceb 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -68,7 +68,7 @@ func BenchmarkRangeQuery(b *testing.B) { numIntervals := 8640 + 10000 for s := 0; s < numIntervals; s++ { - a := storage.Appender() + a := storage.Appender(context.Background()) ts := int64(s * 10000) // 10s interval. for i, metric := range metrics { err := a.AddFast(refs[i], ts, float64(s)) diff --git a/promql/functions_test.go b/promql/functions_test.go index fd2750263..00e028b98 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -40,7 +40,7 @@ func TestDeriv(t *testing.T) { } engine := NewEngine(opts) - a := storage.Appender() + a := storage.Appender(context.Background()) metric := labels.FromStrings("__name__", "foo") a.Add(metric, 1493712816939, 1.0) diff --git a/promql/test.go b/promql/test.go index ff2549144..bcd90200f 100644 --- a/promql/test.go +++ b/promql/test.go @@ -433,7 +433,7 @@ func (t *Test) exec(tc testCommand) error { t.clear() case *loadCmd: - app := t.storage.Appender() + app := t.storage.Appender(t.context) if err := cmd.append(app); err != nil { app.Rollback() return err @@ -644,7 +644,7 @@ func (ll *LazyLoader) clear() { // appendTill appends the defined time series to the storage till the given timestamp (in milliseconds). func (ll *LazyLoader) appendTill(ts int64) error { - app := ll.storage.Appender() + app := ll.storage.Appender(ll.Context()) for h, smpls := range ll.loadCmd.defs { m := ll.loadCmd.metrics[h] for i, s := range smpls { diff --git a/rules/manager.go b/rules/manager.go index c4344fa68..c9b7e45d8 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -352,7 +352,7 @@ func (g *Group) run(ctx context.Context) { select { case <-g.managerDone: case <-time.After(2 * g.interval): - g.cleanupStaleSeries(now) + g.cleanupStaleSeries(ctx, now) } }(time.Now()) }() @@ -588,7 +588,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { numDuplicates = 0 ) - app := g.opts.Appendable.Appender() + app := g.opts.Appendable.Appender(ctx) seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) defer func() { if err := app.Commit(); err != nil { @@ -636,14 +636,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } }(i, rule) } - g.cleanupStaleSeries(ts) + g.cleanupStaleSeries(ctx, ts) } -func (g *Group) cleanupStaleSeries(ts time.Time) { +func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { if len(g.staleSeries) == 0 { return } - app := g.opts.Appendable.Appender() + app := g.opts.Appendable.Appender(ctx) for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. _, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) diff --git a/rules/manager_test.go b/rules/manager_test.go index 10e16e6cc..419b42834 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -546,7 +546,7 @@ func TestStaleness(t *testing.T) { }) // A time series that has two samples and then goes stale. - app := st.Appender() + app := st.Appender(context.Background()) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) @@ -870,7 +870,7 @@ func TestNotify(t *testing.T) { Opts: opts, }) - app := storage.Appender() + app := storage.Appender(context.Background()) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3) diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index da78cb5dd..095ea7cac 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -14,13 +14,14 @@ package scrape import ( + "context" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) type nopAppendable struct{} -func (a nopAppendable) Appender() storage.Appender { +func (a nopAppendable) Appender(_ context.Context) storage.Appender { return nopAppender{} } diff --git a/scrape/scrape.go b/scrape/scrape.go index 91c8e6274..e21ad9c3a 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -254,7 +254,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc) }, func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, - func() storage.Appender { return appender(app.Appender(), opts.limit) }, + func(ctx context.Context) storage.Appender { return appender(app.Appender(ctx), opts.limit) }, cache, jitterSeed, opts.honorTimestamps, @@ -681,7 +681,7 @@ type scrapeLoop struct { forcedErr error forcedErrMtx sync.Mutex - appender func() storage.Appender + appender func(ctx context.Context) storage.Appender sampleMutator labelsMutator reportSampleMutator labelsMutator @@ -943,7 +943,7 @@ func newScrapeLoop(ctx context.Context, buffers *pool.Pool, sampleMutator labelsMutator, reportSampleMutator labelsMutator, - appender func() storage.Appender, + appender func(ctx context.Context) storage.Appender, cache *scrapeCache, jitterSeed uint64, honorTimestamps bool, @@ -1035,7 +1035,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time b := sl.buffers.Get(sl.lastScrapeSize).([]byte) buf := bytes.NewBuffer(b) - app := sl.appender() + app := sl.appender(sl.ctx) var total, added, seriesAdded int var err, appErr, scrapeErr error defer func() { @@ -1053,7 +1053,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time // Add stale markers. if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { app.Rollback() - app = sl.appender() + app = sl.appender(sl.ctx) level.Warn(sl.l).Log("msg", "Append failed", "err", err) } if errc != nil { @@ -1085,13 +1085,13 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time total, added, seriesAdded, appErr = sl.append(app, b, contentType, start) if appErr != nil { app.Rollback() - app = sl.appender() + app = sl.appender(sl.ctx) level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) // The append failed, probably due to a parse error or sample limit. // Call sl.append again with an empty scrape to trigger stale markers. if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { app.Rollback() - app = sl.appender() + app = sl.appender(sl.ctx) level.Warn(sl.l).Log("msg", "Append failed", "err", err) } } @@ -1161,7 +1161,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Call sl.append again with an empty scrape to trigger stale markers. // If the target has since been recreated and scraped, the // stale markers will be out of order and ignored. - app := sl.appender() + app := sl.appender(sl.ctx) var err error defer func() { if err != nil { @@ -1175,7 +1175,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int }() if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil { app.Rollback() - app = sl.appender() + app = sl.appender(sl.ctx) level.Warn(sl.l).Log("msg", "Stale append failed", "err", err) } if err = sl.reportStale(app, staleTime); err != nil { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 7b0e69dd1..4b0bba517 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -412,7 +412,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok := loop.(*scrapeLoop) testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped := appl.appender() + wrapped := appl.appender(context.Background()) tl, ok := wrapped.(*timeLimitAppender) testutil.Assert(t, ok, "Expected timeLimitAppender but got %T", wrapped) @@ -427,7 +427,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok = loop.(*scrapeLoop) testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped = appl.appender() + wrapped = appl.appender(context.Background()) sl, ok := wrapped.(*limitAppender) testutil.Assert(t, ok, "Expected limitAppender but got %T", wrapped) @@ -538,7 +538,7 @@ func TestScrapeLoopStop(t *testing.T) { signal = make(chan struct{}, 1) appender = &collectResultAppender{} scraper = &testScraper{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) sl := newScrapeLoop(context.Background(), @@ -603,7 +603,7 @@ func TestScrapeLoopRun(t *testing.T) { errc = make(chan error) scraper = &testScraper{} - app = func() storage.Appender { return &nopAppender{} } + app = func(ctx context.Context) storage.Appender { return &nopAppender{} } ) ctx, cancel := context.WithCancel(context.Background()) @@ -701,7 +701,7 @@ func TestScrapeLoopForcedErr(t *testing.T) { errc = make(chan error) scraper = &testScraper{} - app = func() storage.Appender { return &nopAppender{} } + app = func(ctx context.Context) storage.Appender { return &nopAppender{} } ) ctx, cancel := context.WithCancel(context.Background()) @@ -760,14 +760,14 @@ func TestScrapeLoopMetadata(t *testing.T) { nil, nil, nopMutator, nopMutator, - func() storage.Appender { return nopAppender{} }, + func(ctx context.Context) storage.Appender { return nopAppender{} }, cache, 0, true, ) defer cancel() - slApp := sl.appender() + slApp := sl.appender(ctx) total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter # HELP test_metric some help text # UNIT test_metric metric @@ -816,7 +816,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) { ) defer cancel() - slApp := sl.appender() + slApp := sl.appender(ctx) total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -824,7 +824,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) { testutil.Equals(t, 1, added) testutil.Equals(t, 1, seriesAdded) - slApp = sl.appender() + slApp = sl.appender(ctx) total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) testutil.Ok(t, slApp.Commit()) testutil.Ok(t, err) @@ -838,7 +838,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { var ( signal = make(chan struct{}, 1) scraper = &testScraper{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -891,7 +891,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { var ( signal = make(chan struct{}, 1) scraper = &testScraper{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } numScrapes = 0 ) @@ -950,7 +950,7 @@ func TestScrapeLoopCache(t *testing.T) { var ( signal = make(chan struct{}, 1) scraper = &testScraper{} - app = func() storage.Appender { appender.next = s.Appender(); return appender } + app = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -1018,13 +1018,13 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { s := teststorage.New(t) defer s.Close() - sapp := s.Appender() + sapp := s.Appender(context.Background()) appender := &collectResultAppender{next: sapp} var ( signal = make(chan struct{}, 1) scraper = &testScraper{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -1139,7 +1139,7 @@ func TestScrapeLoopAppend(t *testing.T) { func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, discoveryLabels) }, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, @@ -1147,7 +1147,7 @@ func TestScrapeLoopAppend(t *testing.T) { now := time.Now() - slApp := sl.appender() + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1180,7 +1180,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, @@ -1200,7 +1200,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { sl.cache.addRef(mets, fakeRef, lset, hash) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(metric), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1229,7 +1229,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { return l }, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, @@ -1243,7 +1243,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { beforeMetricValue := beforeMetric.GetCounter().GetValue() now := time.Now() - slApp := sl.appender() + slApp := sl.appender(context.Background()) total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) @@ -1274,7 +1274,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected") now = time.Now() - slApp = sl.appender() + slApp = sl.appender(context.Background()) total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) @@ -1298,19 +1298,19 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { capp.next = s.Appender(); return capp }, + func(ctx context.Context) storage.Appender { capp.next = s.Appender(ctx); return capp }, nil, 0, true, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender() + slApp = sl.appender(context.Background()) _, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1338,19 +1338,19 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender() + slApp = sl.appender(context.Background()) _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1381,19 +1381,19 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender() + slApp = sl.appender(context.Background()) _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1412,7 +1412,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { var ( scraper = &testScraper{} appender = &collectResultAppender{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -1440,7 +1440,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { var ( scraper = &testScraper{} appender = &collectResultAppender{} - app = func() storage.Appender { return appender } + app = func(ctx context.Context) storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) @@ -1494,14 +1494,14 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T nil, nil, nopMutator, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, ) now := time.Unix(1, 0) - slApp := sl.appender() + slApp := sl.appender(context.Background()) total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1526,7 +1526,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { nil, nil, nopMutator, nopMutator, - func() storage.Appender { + func(ctx context.Context) storage.Appender { return &timeLimitAppender{ Appender: app, maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)), @@ -1538,7 +1538,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { ) now := time.Now().Add(20 * time.Minute) - slApp := sl.appender() + slApp := sl.appender(context.Background()) total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1711,7 +1711,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { s := teststorage.New(t) defer s.Close() - app := s.Appender() + app := s.Appender(context.Background()) capp := &collectResultAppender{next: app} @@ -1719,13 +1719,13 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return capp }, + func(ctx context.Context) storage.Appender { return capp }, nil, 0, true, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1744,7 +1744,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { s := teststorage.New(t) defer s.Close() - app := s.Appender() + app := s.Appender(context.Background()) capp := &collectResultAppender{next: app} @@ -1752,13 +1752,13 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { nil, nil, nil, nopMutator, nopMutator, - func() storage.Appender { return capp }, + func(ctx context.Context) storage.Appender { return capp }, nil, 0, false, ) now := time.Now() - slApp := sl.appender() + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1791,7 +1791,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { defer cancel() // We add a good and a bad metric to check that both are discarded. - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) testutil.NotOk(t, err) testutil.Ok(t, slApp.Rollback()) @@ -1803,7 +1803,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { testutil.Ok(t, series.Err()) // We add a good metric to check that it is recorded. - slApp = sl.appender() + slApp = sl.appender(ctx) _, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{}) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1820,10 +1820,10 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { s := teststorage.New(t) defer s.Close() - app := s.Appender() + app := s.Appender(context.Background()) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), &testScraper{}, nil, nil, func(l labels.Labels) labels.Labels { @@ -1833,14 +1833,14 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { return l }, nopMutator, - func() storage.Appender { return app }, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, ) defer cancel() - slApp := sl.appender() + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{}) testutil.NotOk(t, err) testutil.Ok(t, slApp.Rollback()) @@ -2057,7 +2057,7 @@ func TestScrapeAddFast(t *testing.T) { ) defer cancel() - slApp := sl.appender() + slApp := sl.appender(ctx) _, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{}) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -2068,7 +2068,7 @@ func TestScrapeAddFast(t *testing.T) { v.ref++ } - slApp = sl.appender() + slApp = sl.appender(ctx) _, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) diff --git a/storage/fanout.go b/storage/fanout.go index 35ae7e9a4..c7c738201 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -122,11 +122,11 @@ func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQueri return NewMergeChunkQuerier(primary, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil } -func (f *fanout) Appender() Appender { - primary := f.primary.Appender() +func (f *fanout) Appender(ctx context.Context) Appender { + primary := f.primary.Appender(ctx) secondaries := make([]Appender, 0, len(f.secondaries)) for _, storage := range f.secondaries { - secondaries = append(secondaries, storage.Appender()) + secondaries = append(secondaries, storage.Appender(ctx)) } return &fanoutAppender{ logger: f.logger, diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index d263297ea..e9146021d 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -31,10 +31,11 @@ func TestSelectSorted(t *testing.T) { outputLabel := labels.FromStrings(model.MetricNameLabel, "a") inputTotalSize := 0 + ctx := context.Background() priStorage := teststorage.New(t) defer priStorage.Close() - app1 := priStorage.Appender() + app1 := priStorage.Appender(ctx) app1.Add(inputLabel, 0, 0) inputTotalSize++ app1.Add(inputLabel, 1000, 1) @@ -46,7 +47,7 @@ func TestSelectSorted(t *testing.T) { remoteStorage1 := teststorage.New(t) defer remoteStorage1.Close() - app2 := remoteStorage1.Appender() + app2 := remoteStorage1.Appender(ctx) app2.Add(inputLabel, 3000, 3) inputTotalSize++ app2.Add(inputLabel, 4000, 4) @@ -59,7 +60,7 @@ func TestSelectSorted(t *testing.T) { remoteStorage2 := teststorage.New(t) defer remoteStorage2.Close() - app3 := remoteStorage2.Appender() + app3 := remoteStorage2.Appender(ctx) app3.Add(inputLabel, 6000, 6) inputTotalSize++ app3.Add(inputLabel, 7000, 7) @@ -100,7 +101,7 @@ func TestSelectSorted(t *testing.T) { }) t.Run("chunk querier", func(t *testing.T) { t.Skip("TODO(bwplotka: Unskip when db will implement ChunkQuerier.") - querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000) + querier, err := fanoutStorage.ChunkQuerier(ctx, 0, 8000) testutil.Ok(t, err) defer querier.Close() @@ -222,9 +223,9 @@ type errChunkQuerier struct{ errQuerier } func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) { return errChunkQuerier{}, nil } -func (errStorage) Appender() storage.Appender { return nil } -func (errStorage) StartTime() (int64, error) { return 0, nil } -func (errStorage) Close() error { return nil } +func (errStorage) Appender(_ context.Context) storage.Appender { return nil } +func (errStorage) StartTime() (int64, error) { return 0, nil } +func (errStorage) Close() error { return nil } func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return storage.ErrSeriesSet(errSelect) diff --git a/storage/interface.go b/storage/interface.go index 6ebb03842..e01a88ecf 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -33,8 +33,10 @@ var ( // Appendable allows creating appenders. type Appendable interface { - // Appender returns a new appender for the storage. - Appender() Appender + // Appender returns a new appender for the storage. The implementation + // can choose whether or not to use the context, for deadlines or to check + // for errors. + Appender(ctx context.Context) Appender } // SampleAndChunkQueryable allows retrieving samples as well as encoded samples in form of chunks. diff --git a/storage/remote/storage.go b/storage/remote/storage.go index fd4a0529b..a9d60b196 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -169,8 +169,8 @@ func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C } // Appender implements storage.Storage. -func (s *Storage) Appender() storage.Appender { - return s.rws.Appender() +func (s *Storage) Appender(ctx context.Context) storage.Appender { + return s.rws.Appender(ctx) } // Close the background processing of the storage queues. diff --git a/storage/remote/write.go b/storage/remote/write.go index 28ea8aef1..77a0ab7ce 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -14,6 +14,7 @@ package remote import ( + "context" "fmt" "sync" "time" @@ -170,7 +171,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { } // Appender implements storage.Storage. -func (rws *WriteStorage) Appender() storage.Appender { +func (rws *WriteStorage) Appender(_ context.Context) storage.Appender { return ×tampTracker{ writeStorage: rws, } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 1e95353ee..17f403244 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -328,7 +328,7 @@ func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head { head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(tb, err) - app := head.Appender() + app := head.Appender(context.Background()) for _, s := range series { ref := uint64(0) it := s.Iterator() diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index fdad0fd8c..68a8ce166 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -878,7 +878,7 @@ func BenchmarkCompactionFromHead(b *testing.B) { h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(b, err) for ln := 0; ln < labelNames; ln++ { - app := h.Appender() + app := h.Appender(context.Background()) for lv := 0; lv < labelValues; lv++ { app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0) } @@ -911,7 +911,7 @@ func TestDisableAutoCompactions(t *testing.T) { // Trigger a compaction to check that it was skipped and // no new blocks were created when compaction is disabled. db.DisableCompactions() - app := db.Appender() + app := db.Appender(context.Background()) for i := int64(0); i < 3; i++ { _, err := app.Add(label, i*blockRange, 0) testutil.Ok(t, err) @@ -1027,7 +1027,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { defaultLabel := labels.FromStrings("foo", "bar") // Add some data to the head that is enough to trigger a compaction. - app := db.Appender() + app := db.Appender(context.Background()) _, err := app.Add(defaultLabel, 1, 0) testutil.Ok(t, err) _, err = app.Add(defaultLabel, 2, 0) diff --git a/tsdb/db.go b/tsdb/db.go index 57003968d..be05c84d8 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -710,8 +710,8 @@ func (db *DB) run() { } // Appender opens a new appender against the database. -func (db *DB) Appender() storage.Appender { - return dbAppender{db: db, Appender: db.head.Appender()} +func (db *DB) Appender(ctx context.Context) storage.Appender { + return dbAppender{db: db, Appender: db.head.Appender(ctx)} } // dbAppender wraps the DB's head appender and triggers compactions on commit diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 0a6fc8fed..763502484 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -138,7 +138,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) @@ -169,9 +170,10 @@ func TestNoPanicAfterWALCorrutpion(t *testing.T) { // This is to ensure that all samples can be read from the mmaped chunks when the WAL is corrupted. var expSamples []tsdbutil.Sample var maxt int64 + ctx := context.Background() { for { - app := db.Appender() + app := db.Appender(ctx) _, err := app.Add(labels.FromStrings("foo", "bar"), maxt, 0) expSamples = append(expSamples, sample{t: maxt, v: 0}) testutil.Ok(t, err) @@ -226,7 +228,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + app := db.Appender(context.Background()) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) @@ -248,7 +250,8 @@ func TestDBAppenderAddRef(t *testing.T) { testutil.Ok(t, db.Close()) }() - app1 := db.Appender() + ctx := context.Background() + app1 := db.Appender(ctx) ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) testutil.Ok(t, err) @@ -260,7 +263,7 @@ func TestDBAppenderAddRef(t *testing.T) { err = app1.Commit() testutil.Ok(t, err) - app2 := db.Appender() + app2 := db.Appender(ctx) // first ref should already work in next transaction. err = app2.AddFast(ref1, 125, 0) @@ -302,7 +305,8 @@ func TestAppendEmptyLabelsIgnored(t *testing.T) { testutil.Ok(t, db.Close()) }() - app1 := db.Appender() + ctx := context.Background() + app1 := db.Appender(ctx) ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) testutil.Ok(t, err) @@ -354,7 +358,8 @@ Outer: testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -413,12 +418,13 @@ func TestAmendDatapointCausesError(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) testutil.Ok(t, app.Rollback()) @@ -430,12 +436,13 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN()) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN()) testutil.Ok(t, err) } @@ -446,12 +453,13 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002)) testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) } @@ -462,7 +470,8 @@ func TestEmptyLabelsetCausesError(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{}, 0, 0) testutil.NotOk(t, err) testutil.Equals(t, "empty labelset: invalid sample", err.Error()) @@ -475,7 +484,8 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { }() // Append AmendedValue. - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) testutil.Ok(t, err) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2) @@ -493,7 +503,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { }, ssMap) // Append Out of Order Value. - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3) testutil.Ok(t, err) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5) @@ -514,7 +524,8 @@ func TestDB_Snapshot(t *testing.T) { db := openTestDB(t, nil, nil) // append data - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) mint := int64(1414141414000) for i := 0; i < 1000; i++ { _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) @@ -563,7 +574,8 @@ func TestDB_Snapshot(t *testing.T) { func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { db := openTestDB(t, nil, nil) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) mint := int64(1414141414000) for i := 0; i < 1000; i++ { _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) @@ -615,7 +627,8 @@ func TestDB_SnapshotWithDelete(t *testing.T) { db := openTestDB(t, nil, nil) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -760,7 +773,8 @@ func TestDB_e2e(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for _, l := range lbls { lset := labels.New(l...) @@ -864,7 +878,8 @@ func TestWALFlushedOnDBClose(t *testing.T) { lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err := app.Add(lbls, 0, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -938,7 +953,7 @@ func TestWALSegmentSizeOptions(t *testing.T) { db := openTestDB(t, opts, nil) for i := int64(0); i < 155; i++ { - app := db.Appender() + app := db.Appender(context.Background()) ref, err := app.Add(labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64()) testutil.Ok(t, err) for j := int64(1); j <= 78; j++ { @@ -960,7 +975,8 @@ func TestTombstoneClean(t *testing.T) { db := openTestDB(t, nil, nil) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -1187,7 +1203,7 @@ func TestSizeRetention(t *testing.T) { } // Add some data to the WAL. - headApp := db.Head().Appender() + headApp := db.Head().Appender(context.Background()) for _, m := range headBlocks { series := genSeries(100, 10, m.MinTime, m.MaxTime) for _, s := range series { @@ -1287,7 +1303,8 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { labels.FromStrings("labelname", "labelvalue"), } - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for _, lbls := range labelpairs { _, err := app.Add(lbls, 0, 1) testutil.Ok(t, err) @@ -1467,7 +1484,8 @@ func TestChunkAtBlockBoundary(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) blockRange := db.compactor.(*LeveledCompactor).ranges[0] label := labels.FromStrings("foo", "bar") @@ -1523,7 +1541,8 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { testutil.Ok(t, db.Close()) }() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) blockRange := db.compactor.(*LeveledCompactor).ranges[0] label := labels.FromStrings("foo", "bar") @@ -1572,7 +1591,8 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime()) // First added sample initializes the writable range. - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), 1000, 1) testutil.Ok(t, err) @@ -1669,6 +1689,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { func TestNoEmptyBlocks(t *testing.T) { db := openTestDB(t, nil, []int64{100}) + ctx := context.Background() defer func() { testutil.Ok(t, db.Close()) }() @@ -1688,7 +1709,7 @@ func TestNoEmptyBlocks(t *testing.T) { }) t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { - app := db.Appender() + app := db.Appender(ctx) _, err := app.Add(defaultLabel, 1, 0) testutil.Ok(t, err) _, err = app.Add(defaultLabel, 2, 0) @@ -1705,7 +1726,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Equals(t, len(db.Blocks()), len(actBlocks)) testutil.Equals(t, 0, len(actBlocks)) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(defaultLabel, 1, 0) testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") @@ -1730,7 +1751,7 @@ func TestNoEmptyBlocks(t *testing.T) { t.Run(`When no new block is created from head, and there are some blocks on disk compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { oldBlocks := db.Blocks() - app := db.Appender() + app := db.Appender(ctx) currentTime := db.Head().MaxTime() _, err := app.Add(defaultLabel, currentTime, 0) testutil.Ok(t, err) @@ -1813,7 +1834,8 @@ func TestDB_LabelNames(t *testing.T) { // Appends samples into the database. appendSamples := func(db *DB, mint, maxt int64, sampleLabels [][2]string) { t.Helper() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for i := mint; i <= maxt; i++ { for _, tuple := range sampleLabels { label := labels.FromStrings(tuple[0], tuple[1]) @@ -1880,7 +1902,8 @@ func TestCorrectNumTombstones(t *testing.T) { defaultLabel := labels.FromStrings("foo", "bar") defaultMatcher := labels.MustNewMatcher(labels.MatchEqual, defaultLabel[0].Name, defaultLabel[0].Value) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for i := int64(0); i < 3; i++ { for j := int64(0); j < 15; j++ { _, err := app.Add(defaultLabel, i*blockRange+j, 0) @@ -2283,6 +2306,7 @@ func TestVerticalCompaction(t *testing.T) { // will not overlap with the first block created by the next compaction. func TestBlockRanges(t *testing.T) { logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + ctx := context.Background() dir, err := ioutil.TempDir("", "test_storage") testutil.Ok(t, err) @@ -2298,7 +2322,7 @@ func TestBlockRanges(t *testing.T) { defer func() { os.RemoveAll(dir) }() - app := db.Appender() + app := db.Appender(ctx) lbl := labels.Labels{{Name: "a", Value: "b"}} _, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64()) if err == nil { @@ -2327,7 +2351,7 @@ func TestBlockRanges(t *testing.T) { // Test that wal records are skipped when an existing block covers the same time ranges // and compaction doesn't create an overlapping block. - app = db.Appender() + app = db.Appender(ctx) db.DisableCompactions() _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) testutil.Ok(t, err) @@ -2350,7 +2374,7 @@ func TestBlockRanges(t *testing.T) { testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks") testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block") - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -2411,7 +2435,7 @@ func TestDBReadOnly(t *testing.T) { dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir()) testutil.Ok(t, err) - app := dbWritable.Appender() + app := dbWritable.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -2481,6 +2505,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) err error maxt int + ctx = context.Background() ) // Bootstrap the db. @@ -2496,7 +2521,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { db, err := Open(dbDir, logger, nil, nil) testutil.Ok(t, err) db.DisableCompactions() - app := db.Appender() + app := db.Appender(ctx) maxt = 1000 for i := 0; i < maxt; i++ { _, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0) @@ -2560,12 +2585,13 @@ func TestDBCannotSeePartialCommits(t *testing.T) { stop := make(chan struct{}) firstInsert := make(chan struct{}) + ctx := context.Background() // Insert data in batches. go func() { iter := 0 for { - app := db.Appender() + app := db.Appender(ctx) for j := 0; j < 100; j++ { _, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter)) @@ -2631,7 +2657,8 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { testutil.Ok(t, err) defer querierBeforeAdd.Close() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) @@ -2929,7 +2956,8 @@ func TestCompactHead(t *testing.T) { db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) testutil.Ok(t, err) - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) var expSamples []sample maxt := 100 for i := 0; i < maxt; i++ { diff --git a/tsdb/head.go b/tsdb/head.go index a94f41db8..a013f4b30 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "math" "path/filepath" @@ -1007,7 +1008,7 @@ func (a *initAppender) Rollback() error { } // Appender returns a new Appender on the database. -func (h *Head) Appender() storage.Appender { +func (h *Head) Appender(_ context.Context) storage.Appender { h.metrics.activeAppenders.Inc() // The head cache might not have a starting point yet. The init appender diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2cdc9dbc3..622bd146e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "io/ioutil" "math" @@ -266,14 +267,14 @@ func TestHead_WALMultiRef(t *testing.T) { testutil.Ok(t, head.Init(0)) - app := head.Appender() + app := head.Appender(context.Background()) ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) // Add another sample outside chunk range to mmap a chunk. - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -281,14 +282,14 @@ func TestHead_WALMultiRef(t *testing.T) { testutil.Ok(t, head.Truncate(1600)) - app = head.Appender() + app = head.Appender(context.Background()) ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) // Add another sample outside chunk range to mmap a chunk. - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -539,7 +540,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, c := range cases { head, w := newTestHead(t, 1000, compress) - app := head.Appender() + app := head.Appender(context.Background()) for _, smpl := range smplsAll { _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) @@ -553,7 +554,7 @@ func TestHeadDeleteSimple(t *testing.T) { } // Add more samples. - app = head.Appender() + app = head.Appender(context.Background()) for _, smpl := range c.addSamples { _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) @@ -620,7 +621,7 @@ func TestDeleteUntilCurMax(t *testing.T) { }() numSamples := int64(10) - app := hb.Appender() + app := hb.Appender(context.Background()) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() @@ -644,7 +645,7 @@ func TestDeleteUntilCurMax(t *testing.T) { testutil.Equals(t, 0, len(res.Warnings())) // Add again and test for presence. - app = hb.Appender() + app = hb.Appender(context.Background()) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 11, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -670,7 +671,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { hb, w := newTestHead(t, int64(numSamples)*10, false) for i := 0; i < numSamples; i++ { - app := hb.Appender() + app := hb.Appender(context.Background()) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -762,7 +763,7 @@ func TestDelete_e2e(t *testing.T) { testutil.Ok(t, hb.Close()) }() - app := hb.Appender() + app := hb.Appender(context.Background()) for _, l := range lbls { ls := labels.New(l...) series := []tsdbutil.Sample{} @@ -1174,7 +1175,7 @@ func TestHead_LogRollback(t *testing.T) { testutil.Ok(t, h.Close()) }() - app := h.Appender() + app := h.Appender(context.Background()) _, err := app.Add(labels.FromStrings("a", "b"), 1, 2) testutil.Ok(t, err) @@ -1372,7 +1373,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { testutil.Ok(t, h.Close()) }() add := func(ts int64) { - app := h.Appender() + app := h.Appender(context.Background()) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1403,7 +1404,7 @@ func TestAddDuplicateLabelName(t *testing.T) { }() add := func(labels labels.Labels, labelName string) { - app := h.Appender() + app := h.Appender(context.Background()) _, err := app.Add(labels, 0, 0) testutil.NotOk(t, err) testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error()) @@ -1586,13 +1587,13 @@ func TestIsolationRollback(t *testing.T) { testutil.Ok(t, hb.Close()) }() - app := hb.Appender() + app := hb.Appender(context.Background()) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark()) - app = hb.Appender() + app = hb.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1) testutil.Ok(t, err) _, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2) @@ -1600,7 +1601,7 @@ func TestIsolationRollback(t *testing.T) { testutil.Ok(t, app.Rollback()) testutil.Equals(t, uint64(2), hb.iso.lowWatermark()) - app = hb.Appender() + app = hb.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1613,18 +1614,18 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { testutil.Ok(t, hb.Close()) }() - app1 := hb.Appender() + app1 := hb.Appender(context.Background()) _, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) testutil.Ok(t, app1.Commit()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.") - app1 = hb.Appender() + app1 = hb.Appender(context.Background()) _, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1) testutil.Ok(t, err) testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.") - app2 := hb.Appender() + app2 := hb.Appender(context.Background()) _, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1) testutil.Ok(t, err) testutil.Ok(t, app2.Commit()) @@ -1667,10 +1668,10 @@ func TestIsolationWithoutAdd(t *testing.T) { testutil.Ok(t, hb.Close()) }() - app := hb.Appender() + app := hb.Appender(context.Background()) testutil.Ok(t, app.Commit()) - app = hb.Appender() + app = hb.Appender(context.Background()) _, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1692,7 +1693,8 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { }() db.DisableCompactions() - app := db.Appender() + ctx := context.Background() + app := db.Appender(ctx) for i := 1; i <= 5; i++ { _, err = app.Add(labels.FromStrings("a", "b"), int64(i), 99) testutil.Ok(t, err) @@ -1701,7 +1703,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { // Test out of order metric. testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), 2, 99) testutil.Equals(t, storage.ErrOutOfOrderSample, err) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) @@ -1716,7 +1718,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, app.Commit()) // Compact Head to test out of bound metric. - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1725,7 +1727,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, db.Compact()) testutil.Assert(t, db.head.minValidTime.Load() > 0, "") - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99) testutil.Equals(t, storage.ErrOutOfBounds, err) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) @@ -1736,7 +1738,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, app.Commit()) // Some more valid samples for out of order. - app = db.Appender() + app = db.Appender(ctx) for i := 1; i <= 5; i++ { _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99) testutil.Ok(t, err) @@ -1744,7 +1746,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, app.Commit()) // Test out of order metric. - app = db.Appender() + app = db.Appender(ctx) _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99) testutil.Equals(t, storage.ErrOutOfOrderSample, err) testutil.Equals(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) @@ -1765,7 +1767,7 @@ func testHeadSeriesChunkRace(t *testing.T) { testutil.Ok(t, h.Close()) }() testutil.Ok(t, h.Init(0)) - app := h.Appender() + app := h.Appender(context.Background()) s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0) testutil.Ok(t, err) @@ -1814,7 +1816,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { expectedLabelValues = []string{"d", "e", "f"} ) - app := head.Appender() + app := head.Appender(context.Background()) for i, name := range expectedLabelNames { _, err := app.Add(labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0) testutil.Ok(t, err) @@ -1859,28 +1861,28 @@ func TestErrReuseAppender(t *testing.T) { testutil.Ok(t, head.Close()) }() - app := head.Appender() + app := head.Appender(context.Background()) _, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.NotOk(t, app.Commit()) testutil.NotOk(t, app.Rollback()) - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) testutil.NotOk(t, app.Rollback()) testutil.NotOk(t, app.Commit()) - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.NotOk(t, app.Rollback()) testutil.NotOk(t, app.Commit()) - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index e4eca1c55..212b67372 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "io/ioutil" "os" @@ -41,7 +42,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) { testutil.Ok(b, h.Close()) }() - app := h.Appender() + app := h.Appender(context.Background()) addSeries := func(l labels.Labels) { app.Add(l, 0, 0) } @@ -147,7 +148,7 @@ func BenchmarkQuerierSelect(b *testing.B) { h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(b, err) defer h.Close() - app := h.Appender() + app := h.Appender(context.Background()) numSeries := 1000000 for i := 0; i < numSeries; i++ { app.Add(labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 743255eb1..da11d399c 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1875,7 +1875,7 @@ func TestPostingsForMatchers(t *testing.T) { testutil.Ok(t, h.Close()) }() - app := h.Appender() + app := h.Appender(context.Background()) app.Add(labels.FromStrings("n", "1"), 0, 0) app.Add(labels.FromStrings("n", "1", "i", "a"), 0, 0) app.Add(labels.FromStrings("n", "1", "i", "b"), 0, 0) diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 2264abc75..d152594da 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -38,7 +38,7 @@ func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logg if err != nil { return nil, err } - app := head.Appender() + app := head.Appender(context.TODO()) for _, sample := range samples { _, err = app.Add(sample.Labels, sample.TimestampMs, sample.Value) if err != nil {