Merge pull request #7660 from annanay25/appender-context

Add context to storage.Appendable interface
This commit is contained in:
Frederic Branczyk 2020-07-31 11:40:43 +02:00 committed by GitHub
commit db7c5e204a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 212 additions and 175 deletions

View file

@ -990,9 +990,9 @@ func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (stor
}
// Appender implements the Storage interface.
func (s *readyStorage) Appender() storage.Appender {
func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
if x := s.get(); x != nil {
return x.Appender()
return x.Appender(ctx)
}
return notReadyAppender{}
}

View file

@ -263,7 +263,7 @@ func TestTimeMetrics(t *testing.T) {
"prometheus_tsdb_head_max_time_seconds",
))
app := db.Appender()
app := db.Appender(context.Background())
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1)
testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1)

View file

@ -199,7 +199,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in
total := uint64(0)
for i := 0; i < scrapeCount; i++ {
app := b.storage.Appender()
app := b.storage.Appender(context.TODO())
ts += timeDelta
for _, s := range scrape {

View file

@ -68,7 +68,7 @@ func BenchmarkRangeQuery(b *testing.B) {
numIntervals := 8640 + 10000
for s := 0; s < numIntervals; s++ {
a := storage.Appender()
a := storage.Appender(context.Background())
ts := int64(s * 10000) // 10s interval.
for i, metric := range metrics {
err := a.AddFast(refs[i], ts, float64(s))

View file

@ -40,7 +40,7 @@ func TestDeriv(t *testing.T) {
}
engine := NewEngine(opts)
a := storage.Appender()
a := storage.Appender(context.Background())
metric := labels.FromStrings("__name__", "foo")
a.Add(metric, 1493712816939, 1.0)

View file

@ -433,7 +433,7 @@ func (t *Test) exec(tc testCommand) error {
t.clear()
case *loadCmd:
app := t.storage.Appender()
app := t.storage.Appender(t.context)
if err := cmd.append(app); err != nil {
app.Rollback()
return err
@ -644,7 +644,7 @@ func (ll *LazyLoader) clear() {
// appendTill appends the defined time series to the storage till the given timestamp (in milliseconds).
func (ll *LazyLoader) appendTill(ts int64) error {
app := ll.storage.Appender()
app := ll.storage.Appender(ll.Context())
for h, smpls := range ll.loadCmd.defs {
m := ll.loadCmd.metrics[h]
for i, s := range smpls {

View file

@ -352,7 +352,7 @@ func (g *Group) run(ctx context.Context) {
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(now)
g.cleanupStaleSeries(ctx, now)
}
}(time.Now())
}()
@ -588,7 +588,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
numDuplicates = 0
)
app := g.opts.Appendable.Appender()
app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
@ -636,14 +636,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
}(i, rule)
}
g.cleanupStaleSeries(ts)
g.cleanupStaleSeries(ctx, ts)
}
func (g *Group) cleanupStaleSeries(ts time.Time) {
func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
if len(g.staleSeries) == 0 {
return
}
app := g.opts.Appendable.Appender()
app := g.opts.Appendable.Appender(ctx)
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))

View file

@ -546,7 +546,7 @@ func TestStaleness(t *testing.T) {
})
// A time series that has two samples and then goes stale.
app := st.Appender()
app := st.Appender(context.Background())
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
@ -870,7 +870,7 @@ func TestNotify(t *testing.T) {
Opts: opts,
})
app := storage.Appender()
app := storage.Appender(context.Background())
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3)

View file

@ -14,13 +14,14 @@
package scrape
import (
"context"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)
type nopAppendable struct{}
func (a nopAppendable) Appender() storage.Appender {
func (a nopAppendable) Appender(_ context.Context) storage.Appender {
return nopAppender{}
}

View file

@ -254,7 +254,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
},
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
func() storage.Appender { return appender(app.Appender(), opts.limit) },
func(ctx context.Context) storage.Appender { return appender(app.Appender(ctx), opts.limit) },
cache,
jitterSeed,
opts.honorTimestamps,
@ -681,7 +681,7 @@ type scrapeLoop struct {
forcedErr error
forcedErrMtx sync.Mutex
appender func() storage.Appender
appender func(ctx context.Context) storage.Appender
sampleMutator labelsMutator
reportSampleMutator labelsMutator
@ -943,7 +943,7 @@ func newScrapeLoop(ctx context.Context,
buffers *pool.Pool,
sampleMutator labelsMutator,
reportSampleMutator labelsMutator,
appender func() storage.Appender,
appender func(ctx context.Context) storage.Appender,
cache *scrapeCache,
jitterSeed uint64,
honorTimestamps bool,
@ -1035,7 +1035,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
buf := bytes.NewBuffer(b)
app := sl.appender()
app := sl.appender(sl.ctx)
var total, added, seriesAdded int
var err, appErr, scrapeErr error
defer func() {
@ -1053,7 +1053,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
// Add stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
app.Rollback()
app = sl.appender()
app = sl.appender(sl.ctx)
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
if errc != nil {
@ -1085,13 +1085,13 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
total, added, seriesAdded, appErr = sl.append(app, b, contentType, start)
if appErr != nil {
app.Rollback()
app = sl.appender()
app = sl.appender(sl.ctx)
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
app.Rollback()
app = sl.appender()
app = sl.appender(sl.ctx)
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
}
@ -1161,7 +1161,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// Call sl.append again with an empty scrape to trigger stale markers.
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
app := sl.appender()
app := sl.appender(sl.ctx)
var err error
defer func() {
if err != nil {
@ -1175,7 +1175,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
}()
if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil {
app.Rollback()
app = sl.appender()
app = sl.appender(sl.ctx)
level.Warn(sl.l).Log("msg", "Stale append failed", "err", err)
}
if err = sl.reportStale(app, staleTime); err != nil {

View file

@ -412,7 +412,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok := loop.(*scrapeLoop)
testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped := appl.appender()
wrapped := appl.appender(context.Background())
tl, ok := wrapped.(*timeLimitAppender)
testutil.Assert(t, ok, "Expected timeLimitAppender but got %T", wrapped)
@ -427,7 +427,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok = loop.(*scrapeLoop)
testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped = appl.appender()
wrapped = appl.appender(context.Background())
sl, ok := wrapped.(*limitAppender)
testutil.Assert(t, ok, "Expected limitAppender but got %T", wrapped)
@ -538,7 +538,7 @@ func TestScrapeLoopStop(t *testing.T) {
signal = make(chan struct{}, 1)
appender = &collectResultAppender{}
scraper = &testScraper{}
app = func() storage.Appender { return appender }
app = func(ctx context.Context) storage.Appender { return appender }
)
sl := newScrapeLoop(context.Background(),
@ -603,7 +603,7 @@ func TestScrapeLoopRun(t *testing.T) {
errc = make(chan error)
scraper = &testScraper{}
app = func() storage.Appender { return &nopAppender{} }
app = func(ctx context.Context) storage.Appender { return &nopAppender{} }
)
ctx, cancel := context.WithCancel(context.Background())
@ -701,7 +701,7 @@ func TestScrapeLoopForcedErr(t *testing.T) {
errc = make(chan error)
scraper = &testScraper{}
app = func() storage.Appender { return &nopAppender{} }
app = func(ctx context.Context) storage.Appender { return &nopAppender{} }
)
ctx, cancel := context.WithCancel(context.Background())
@ -760,14 +760,14 @@ func TestScrapeLoopMetadata(t *testing.T) {
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return nopAppender{} },
func(ctx context.Context) storage.Appender { return nopAppender{} },
cache,
0,
true,
)
defer cancel()
slApp := sl.appender()
slApp := sl.appender(ctx)
total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter
# HELP test_metric some help text
# UNIT test_metric metric
@ -816,7 +816,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
)
defer cancel()
slApp := sl.appender()
slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -824,7 +824,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
testutil.Equals(t, 1, added)
testutil.Equals(t, 1, seriesAdded)
slApp = sl.appender()
slApp = sl.appender(ctx)
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
testutil.Ok(t, slApp.Commit())
testutil.Ok(t, err)
@ -838,7 +838,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
var (
signal = make(chan struct{}, 1)
scraper = &testScraper{}
app = func() storage.Appender { return appender }
app = func(ctx context.Context) storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
@ -891,7 +891,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
var (
signal = make(chan struct{}, 1)
scraper = &testScraper{}
app = func() storage.Appender { return appender }
app = func(ctx context.Context) storage.Appender { return appender }
numScrapes = 0
)
@ -950,7 +950,7 @@ func TestScrapeLoopCache(t *testing.T) {
var (
signal = make(chan struct{}, 1)
scraper = &testScraper{}
app = func() storage.Appender { appender.next = s.Appender(); return appender }
app = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender }
)
ctx, cancel := context.WithCancel(context.Background())
@ -1018,13 +1018,13 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
sapp := s.Appender()
sapp := s.Appender(context.Background())
appender := &collectResultAppender{next: sapp}
var (
signal = make(chan struct{}, 1)
scraper = &testScraper{}
app = func() storage.Appender { return appender }
app = func(ctx context.Context) storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
@ -1139,7 +1139,7 @@ func TestScrapeLoopAppend(t *testing.T) {
func(l labels.Labels) labels.Labels {
return mutateReportSampleLabels(l, discoveryLabels)
},
func() storage.Appender { return app },
func(ctx context.Context) storage.Appender { return app },
nil,
0,
true,
@ -1147,7 +1147,7 @@ func TestScrapeLoopAppend(t *testing.T) {
now := time.Now()
slApp := sl.appender()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1180,7 +1180,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func(ctx context.Context) storage.Appender { return app },
nil,
0,
true,
@ -1200,7 +1200,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
sl.cache.addRef(mets, fakeRef, lset, hash)
now := time.Now()
slApp := sl.appender()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(metric), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1229,7 +1229,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
return l
},
nopMutator,
func() storage.Appender { return app },
func(ctx context.Context) storage.Appender { return app },
nil,
0,
true,
@ -1243,7 +1243,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
beforeMetricValue := beforeMetric.GetCounter().GetValue()
now := time.Now()
slApp := sl.appender()
slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
if err != errSampleLimit {
t.Fatalf("Did not see expected sample limit error: %s", err)
@ -1274,7 +1274,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected")
now = time.Now()
slApp = sl.appender()
slApp = sl.appender(context.Background())
total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
if err != errSampleLimit {
t.Fatalf("Did not see expected sample limit error: %s", err)
@ -1298,19 +1298,19 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { capp.next = s.Appender(); return capp },
func(ctx context.Context) storage.Appender { capp.next = s.Appender(ctx); return capp },
nil,
0,
true,
)
now := time.Now()
slApp := sl.appender()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
slApp = sl.appender()
slApp = sl.appender(context.Background())
_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1338,19 +1338,19 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func(ctx context.Context) storage.Appender { return app },
nil,
0,
true,
)
now := time.Now()
slApp := sl.appender()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
slApp = sl.appender()
slApp = sl.appender(context.Background())
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1381,19 +1381,19 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func(ctx context.Context) storage.Appender { return app },
nil,
0,
true,
)
now := time.Now()
slApp := sl.appender()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
slApp = sl.appender()
slApp = sl.appender(context.Background())
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1412,7 +1412,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
var (
scraper = &testScraper{}
appender = &collectResultAppender{}
app = func() storage.Appender { return appender }
app = func(ctx context.Context) storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
@ -1440,7 +1440,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
var (
scraper = &testScraper{}
appender = &collectResultAppender{}
app = func() storage.Appender { return appender }
app = func(ctx context.Context) storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
@ -1494,14 +1494,14 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func(ctx context.Context) storage.Appender { return app },
nil,
0,
true,
)
now := time.Unix(1, 0)
slApp := sl.appender()
slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1526,7 +1526,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender {
func(ctx context.Context) storage.Appender {
return &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
@ -1538,7 +1538,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
)
now := time.Now().Add(20 * time.Minute)
slApp := sl.appender()
slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1711,7 +1711,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
app := s.Appender()
app := s.Appender(context.Background())
capp := &collectResultAppender{next: app}
@ -1719,13 +1719,13 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return capp },
func(ctx context.Context) storage.Appender { return capp },
nil, 0,
true,
)
now := time.Now()
slApp := sl.appender()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1744,7 +1744,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
app := s.Appender()
app := s.Appender(context.Background())
capp := &collectResultAppender{next: app}
@ -1752,13 +1752,13 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return capp },
func(ctx context.Context) storage.Appender { return capp },
nil, 0,
false,
)
now := time.Now()
slApp := sl.appender()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1791,7 +1791,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
defer cancel()
// We add a good and a bad metric to check that both are discarded.
slApp := sl.appender()
slApp := sl.appender(ctx)
_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err)
testutil.Ok(t, slApp.Rollback())
@ -1803,7 +1803,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
testutil.Ok(t, series.Err())
// We add a good metric to check that it is recorded.
slApp = sl.appender()
slApp = sl.appender(ctx)
_, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -1820,10 +1820,10 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
app := s.Appender()
app := s.Appender(context.Background())
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
sl := newScrapeLoop(context.Background(),
&testScraper{},
nil, nil,
func(l labels.Labels) labels.Labels {
@ -1833,14 +1833,14 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
return l
},
nopMutator,
func() storage.Appender { return app },
func(ctx context.Context) storage.Appender { return app },
nil,
0,
true,
)
defer cancel()
slApp := sl.appender()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err)
testutil.Ok(t, slApp.Rollback())
@ -2057,7 +2057,7 @@ func TestScrapeAddFast(t *testing.T) {
)
defer cancel()
slApp := sl.appender()
slApp := sl.appender(ctx)
_, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{})
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
@ -2068,7 +2068,7 @@ func TestScrapeAddFast(t *testing.T) {
v.ref++
}
slApp = sl.appender()
slApp = sl.appender(ctx)
_, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second))
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())

View file

@ -122,11 +122,11 @@ func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQueri
return NewMergeChunkQuerier(primary, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil
}
func (f *fanout) Appender() Appender {
primary := f.primary.Appender()
func (f *fanout) Appender(ctx context.Context) Appender {
primary := f.primary.Appender(ctx)
secondaries := make([]Appender, 0, len(f.secondaries))
for _, storage := range f.secondaries {
secondaries = append(secondaries, storage.Appender())
secondaries = append(secondaries, storage.Appender(ctx))
}
return &fanoutAppender{
logger: f.logger,

View file

@ -31,10 +31,11 @@ func TestSelectSorted(t *testing.T) {
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
inputTotalSize := 0
ctx := context.Background()
priStorage := teststorage.New(t)
defer priStorage.Close()
app1 := priStorage.Appender()
app1 := priStorage.Appender(ctx)
app1.Add(inputLabel, 0, 0)
inputTotalSize++
app1.Add(inputLabel, 1000, 1)
@ -46,7 +47,7 @@ func TestSelectSorted(t *testing.T) {
remoteStorage1 := teststorage.New(t)
defer remoteStorage1.Close()
app2 := remoteStorage1.Appender()
app2 := remoteStorage1.Appender(ctx)
app2.Add(inputLabel, 3000, 3)
inputTotalSize++
app2.Add(inputLabel, 4000, 4)
@ -59,7 +60,7 @@ func TestSelectSorted(t *testing.T) {
remoteStorage2 := teststorage.New(t)
defer remoteStorage2.Close()
app3 := remoteStorage2.Appender()
app3 := remoteStorage2.Appender(ctx)
app3.Add(inputLabel, 6000, 6)
inputTotalSize++
app3.Add(inputLabel, 7000, 7)
@ -100,7 +101,7 @@ func TestSelectSorted(t *testing.T) {
})
t.Run("chunk querier", func(t *testing.T) {
t.Skip("TODO(bwplotka: Unskip when db will implement ChunkQuerier.")
querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000)
querier, err := fanoutStorage.ChunkQuerier(ctx, 0, 8000)
testutil.Ok(t, err)
defer querier.Close()
@ -222,7 +223,7 @@ type errChunkQuerier struct{ errQuerier }
func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) {
return errChunkQuerier{}, nil
}
func (errStorage) Appender() storage.Appender { return nil }
func (errStorage) Appender(_ context.Context) storage.Appender { return nil }
func (errStorage) StartTime() (int64, error) { return 0, nil }
func (errStorage) Close() error { return nil }

View file

@ -33,8 +33,10 @@ var (
// Appendable allows creating appenders.
type Appendable interface {
// Appender returns a new appender for the storage.
Appender() Appender
// Appender returns a new appender for the storage. The implementation
// can choose whether or not to use the context, for deadlines or to check
// for errors.
Appender(ctx context.Context) Appender
}
// SampleAndChunkQueryable allows retrieving samples as well as encoded samples in form of chunks.

View file

@ -169,8 +169,8 @@ func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C
}
// Appender implements storage.Storage.
func (s *Storage) Appender() storage.Appender {
return s.rws.Appender()
func (s *Storage) Appender(ctx context.Context) storage.Appender {
return s.rws.Appender(ctx)
}
// Close the background processing of the storage queues.

View file

@ -14,6 +14,7 @@
package remote
import (
"context"
"fmt"
"sync"
"time"
@ -170,7 +171,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
}
// Appender implements storage.Storage.
func (rws *WriteStorage) Appender() storage.Appender {
func (rws *WriteStorage) Appender(_ context.Context) storage.Appender {
return &timestampTracker{
writeStorage: rws,
}

View file

@ -328,7 +328,7 @@ func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize, nil)
testutil.Ok(tb, err)
app := head.Appender()
app := head.Appender(context.Background())
for _, s := range series {
ref := uint64(0)
it := s.Iterator()

View file

@ -878,7 +878,7 @@ func BenchmarkCompactionFromHead(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil)
testutil.Ok(b, err)
for ln := 0; ln < labelNames; ln++ {
app := h.Appender()
app := h.Appender(context.Background())
for lv := 0; lv < labelValues; lv++ {
app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0)
}
@ -911,7 +911,7 @@ func TestDisableAutoCompactions(t *testing.T) {
// Trigger a compaction to check that it was skipped and
// no new blocks were created when compaction is disabled.
db.DisableCompactions()
app := db.Appender()
app := db.Appender(context.Background())
for i := int64(0); i < 3; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
@ -1027,7 +1027,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
defaultLabel := labels.FromStrings("foo", "bar")
// Add some data to the head that is enough to trigger a compaction.
app := db.Appender()
app := db.Appender(context.Background())
_, err := app.Add(defaultLabel, 1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 2, 0)

View file

@ -710,8 +710,8 @@ func (db *DB) run() {
}
// Appender opens a new appender against the database.
func (db *DB) Appender() storage.Appender {
return dbAppender{db: db, Appender: db.head.Appender()}
func (db *DB) Appender(ctx context.Context) storage.Appender {
return dbAppender{db: db, Appender: db.head.Appender(ctx)}
}
// dbAppender wraps the DB's head appender and triggers compactions on commit

View file

@ -138,7 +138,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
@ -169,9 +170,10 @@ func TestNoPanicAfterWALCorrutpion(t *testing.T) {
// This is to ensure that all samples can be read from the mmaped chunks when the WAL is corrupted.
var expSamples []tsdbutil.Sample
var maxt int64
ctx := context.Background()
{
for {
app := db.Appender()
app := db.Appender(ctx)
_, err := app.Add(labels.FromStrings("foo", "bar"), maxt, 0)
expSamples = append(expSamples, sample{t: maxt, v: 0})
testutil.Ok(t, err)
@ -226,7 +228,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
app := db.Appender(context.Background())
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
@ -248,7 +250,8 @@ func TestDBAppenderAddRef(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app1 := db.Appender()
ctx := context.Background()
app1 := db.Appender(ctx)
ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
testutil.Ok(t, err)
@ -260,7 +263,7 @@ func TestDBAppenderAddRef(t *testing.T) {
err = app1.Commit()
testutil.Ok(t, err)
app2 := db.Appender()
app2 := db.Appender(ctx)
// first ref should already work in next transaction.
err = app2.AddFast(ref1, 125, 0)
@ -302,7 +305,8 @@ func TestAppendEmptyLabelsIgnored(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app1 := db.Appender()
ctx := context.Background()
app1 := db.Appender(ctx)
ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
testutil.Ok(t, err)
@ -354,7 +358,8 @@ Outer:
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
@ -413,12 +418,13 @@ func TestAmendDatapointCausesError(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err)
testutil.Ok(t, app.Rollback())
@ -430,12 +436,13 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
testutil.Ok(t, err)
}
@ -446,12 +453,13 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001))
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002))
testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err)
}
@ -462,7 +470,8 @@ func TestEmptyLabelsetCausesError(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err := app.Add(labels.Labels{}, 0, 0)
testutil.NotOk(t, err)
testutil.Equals(t, "empty labelset: invalid sample", err.Error())
@ -475,7 +484,8 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
}()
// Append AmendedValue.
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
testutil.Ok(t, err)
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2)
@ -493,7 +503,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
}, ssMap)
// Append Out of Order Value.
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3)
testutil.Ok(t, err)
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5)
@ -514,7 +524,8 @@ func TestDB_Snapshot(t *testing.T) {
db := openTestDB(t, nil, nil)
// append data
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
mint := int64(1414141414000)
for i := 0; i < 1000; i++ {
_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
@ -563,7 +574,8 @@ func TestDB_Snapshot(t *testing.T) {
func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
db := openTestDB(t, nil, nil)
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
mint := int64(1414141414000)
for i := 0; i < 1000; i++ {
_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
@ -615,7 +627,8 @@ func TestDB_SnapshotWithDelete(t *testing.T) {
db := openTestDB(t, nil, nil)
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
@ -760,7 +773,8 @@ func TestDB_e2e(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
for _, l := range lbls {
lset := labels.New(l...)
@ -864,7 +878,8 @@ func TestWALFlushedOnDBClose(t *testing.T) {
lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err := app.Add(lbls, 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -938,7 +953,7 @@ func TestWALSegmentSizeOptions(t *testing.T) {
db := openTestDB(t, opts, nil)
for i := int64(0); i < 155; i++ {
app := db.Appender()
app := db.Appender(context.Background())
ref, err := app.Add(labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64())
testutil.Ok(t, err)
for j := int64(1); j <= 78; j++ {
@ -960,7 +975,8 @@ func TestTombstoneClean(t *testing.T) {
db := openTestDB(t, nil, nil)
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
@ -1187,7 +1203,7 @@ func TestSizeRetention(t *testing.T) {
}
// Add some data to the WAL.
headApp := db.Head().Appender()
headApp := db.Head().Appender(context.Background())
for _, m := range headBlocks {
series := genSeries(100, 10, m.MinTime, m.MaxTime)
for _, s := range series {
@ -1287,7 +1303,8 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
labels.FromStrings("labelname", "labelvalue"),
}
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
for _, lbls := range labelpairs {
_, err := app.Add(lbls, 0, 1)
testutil.Ok(t, err)
@ -1467,7 +1484,8 @@ func TestChunkAtBlockBoundary(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
blockRange := db.compactor.(*LeveledCompactor).ranges[0]
label := labels.FromStrings("foo", "bar")
@ -1523,7 +1541,8 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
testutil.Ok(t, db.Close())
}()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
blockRange := db.compactor.(*LeveledCompactor).ranges[0]
label := labels.FromStrings("foo", "bar")
@ -1572,7 +1591,8 @@ func TestInitializeHeadTimestamp(t *testing.T) {
testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime())
// First added sample initializes the writable range.
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err = app.Add(labels.FromStrings("a", "b"), 1000, 1)
testutil.Ok(t, err)
@ -1669,6 +1689,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
func TestNoEmptyBlocks(t *testing.T) {
db := openTestDB(t, nil, []int64{100})
ctx := context.Background()
defer func() {
testutil.Ok(t, db.Close())
}()
@ -1688,7 +1709,7 @@ func TestNoEmptyBlocks(t *testing.T) {
})
t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
app := db.Appender()
app := db.Appender(ctx)
_, err := app.Add(defaultLabel, 1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 2, 0)
@ -1705,7 +1726,7 @@ func TestNoEmptyBlocks(t *testing.T) {
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(defaultLabel, 1, 0)
testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
@ -1730,7 +1751,7 @@ func TestNoEmptyBlocks(t *testing.T) {
t.Run(`When no new block is created from head, and there are some blocks on disk
compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
oldBlocks := db.Blocks()
app := db.Appender()
app := db.Appender(ctx)
currentTime := db.Head().MaxTime()
_, err := app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
@ -1813,7 +1834,8 @@ func TestDB_LabelNames(t *testing.T) {
// Appends samples into the database.
appendSamples := func(db *DB, mint, maxt int64, sampleLabels [][2]string) {
t.Helper()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
for i := mint; i <= maxt; i++ {
for _, tuple := range sampleLabels {
label := labels.FromStrings(tuple[0], tuple[1])
@ -1880,7 +1902,8 @@ func TestCorrectNumTombstones(t *testing.T) {
defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.MustNewMatcher(labels.MatchEqual, defaultLabel[0].Name, defaultLabel[0].Value)
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
for i := int64(0); i < 3; i++ {
for j := int64(0); j < 15; j++ {
_, err := app.Add(defaultLabel, i*blockRange+j, 0)
@ -2283,6 +2306,7 @@ func TestVerticalCompaction(t *testing.T) {
// will not overlap with the first block created by the next compaction.
func TestBlockRanges(t *testing.T) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
ctx := context.Background()
dir, err := ioutil.TempDir("", "test_storage")
testutil.Ok(t, err)
@ -2298,7 +2322,7 @@ func TestBlockRanges(t *testing.T) {
defer func() {
os.RemoveAll(dir)
}()
app := db.Appender()
app := db.Appender(ctx)
lbl := labels.Labels{{Name: "a", Value: "b"}}
_, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
if err == nil {
@ -2327,7 +2351,7 @@ func TestBlockRanges(t *testing.T) {
// Test that wal records are skipped when an existing block covers the same time ranges
// and compaction doesn't create an overlapping block.
app = db.Appender()
app = db.Appender(ctx)
db.DisableCompactions()
_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
testutil.Ok(t, err)
@ -2350,7 +2374,7 @@ func TestBlockRanges(t *testing.T) {
testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks")
testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -2411,7 +2435,7 @@ func TestDBReadOnly(t *testing.T) {
dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir())
testutil.Ok(t, err)
app := dbWritable.Appender()
app := dbWritable.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -2481,6 +2505,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
err error
maxt int
ctx = context.Background()
)
// Bootstrap the db.
@ -2496,7 +2521,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
db, err := Open(dbDir, logger, nil, nil)
testutil.Ok(t, err)
db.DisableCompactions()
app := db.Appender()
app := db.Appender(ctx)
maxt = 1000
for i := 0; i < maxt; i++ {
_, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
@ -2560,12 +2585,13 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
stop := make(chan struct{})
firstInsert := make(chan struct{})
ctx := context.Background()
// Insert data in batches.
go func() {
iter := 0
for {
app := db.Appender()
app := db.Appender(ctx)
for j := 0; j < 100; j++ {
_, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter))
@ -2631,7 +2657,8 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
testutil.Ok(t, err)
defer querierBeforeAdd.Close()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
@ -2929,7 +2956,8 @@ func TestCompactHead(t *testing.T) {
db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
testutil.Ok(t, err)
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
var expSamples []sample
maxt := 100
for i := 0; i < maxt; i++ {

View file

@ -14,6 +14,7 @@
package tsdb
import (
"context"
"fmt"
"math"
"path/filepath"
@ -1007,7 +1008,7 @@ func (a *initAppender) Rollback() error {
}
// Appender returns a new Appender on the database.
func (h *Head) Appender() storage.Appender {
func (h *Head) Appender(_ context.Context) storage.Appender {
h.metrics.activeAppenders.Inc()
// The head cache might not have a starting point yet. The init appender

View file

@ -14,6 +14,7 @@
package tsdb
import (
"context"
"fmt"
"io/ioutil"
"math"
@ -266,14 +267,14 @@ func TestHead_WALMultiRef(t *testing.T) {
testutil.Ok(t, head.Init(0))
app := head.Appender()
app := head.Appender(context.Background())
ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
// Add another sample outside chunk range to mmap a chunk.
app = head.Appender()
app = head.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -281,14 +282,14 @@ func TestHead_WALMultiRef(t *testing.T) {
testutil.Ok(t, head.Truncate(1600))
app = head.Appender()
app = head.Appender(context.Background())
ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
// Add another sample outside chunk range to mmap a chunk.
app = head.Appender()
app = head.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -539,7 +540,7 @@ func TestHeadDeleteSimple(t *testing.T) {
for _, c := range cases {
head, w := newTestHead(t, 1000, compress)
app := head.Appender()
app := head.Appender(context.Background())
for _, smpl := range smplsAll {
_, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err)
@ -553,7 +554,7 @@ func TestHeadDeleteSimple(t *testing.T) {
}
// Add more samples.
app = head.Appender()
app = head.Appender(context.Background())
for _, smpl := range c.addSamples {
_, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err)
@ -620,7 +621,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
}()
numSamples := int64(10)
app := hb.Appender()
app := hb.Appender(context.Background())
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
@ -644,7 +645,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Equals(t, 0, len(res.Warnings()))
// Add again and test for presence.
app = hb.Appender()
app = hb.Appender(context.Background())
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 11, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -670,7 +671,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
hb, w := newTestHead(t, int64(numSamples)*10, false)
for i := 0; i < numSamples; i++ {
app := hb.Appender()
app := hb.Appender(context.Background())
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -762,7 +763,7 @@ func TestDelete_e2e(t *testing.T) {
testutil.Ok(t, hb.Close())
}()
app := hb.Appender()
app := hb.Appender(context.Background())
for _, l := range lbls {
ls := labels.New(l...)
series := []tsdbutil.Sample{}
@ -1174,7 +1175,7 @@ func TestHead_LogRollback(t *testing.T) {
testutil.Ok(t, h.Close())
}()
app := h.Appender()
app := h.Appender(context.Background())
_, err := app.Add(labels.FromStrings("a", "b"), 1, 2)
testutil.Ok(t, err)
@ -1372,7 +1373,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
testutil.Ok(t, h.Close())
}()
add := func(ts int64) {
app := h.Appender()
app := h.Appender(context.Background())
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -1403,7 +1404,7 @@ func TestAddDuplicateLabelName(t *testing.T) {
}()
add := func(labels labels.Labels, labelName string) {
app := h.Appender()
app := h.Appender(context.Background())
_, err := app.Add(labels, 0, 0)
testutil.NotOk(t, err)
testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error())
@ -1586,13 +1587,13 @@ func TestIsolationRollback(t *testing.T) {
testutil.Ok(t, hb.Close())
}()
app := hb.Appender()
app := hb.Appender(context.Background())
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark())
app = hb.Appender()
app = hb.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2)
@ -1600,7 +1601,7 @@ func TestIsolationRollback(t *testing.T) {
testutil.Ok(t, app.Rollback())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark())
app = hb.Appender()
app = hb.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -1613,18 +1614,18 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
testutil.Ok(t, hb.Close())
}()
app1 := hb.Appender()
app1 := hb.Appender(context.Background())
_, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app1.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.")
app1 = hb.Appender()
app1 = hb.Appender(context.Background())
_, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err)
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.")
app2 := hb.Appender()
app2 := hb.Appender(context.Background())
_, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1)
testutil.Ok(t, err)
testutil.Ok(t, app2.Commit())
@ -1667,10 +1668,10 @@ func TestIsolationWithoutAdd(t *testing.T) {
testutil.Ok(t, hb.Close())
}()
app := hb.Appender()
app := hb.Appender(context.Background())
testutil.Ok(t, app.Commit())
app = hb.Appender()
app = hb.Appender(context.Background())
_, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -1692,7 +1693,8 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
}()
db.DisableCompactions()
app := db.Appender()
ctx := context.Background()
app := db.Appender(ctx)
for i := 1; i <= 5; i++ {
_, err = app.Add(labels.FromStrings("a", "b"), int64(i), 99)
testutil.Ok(t, err)
@ -1701,7 +1703,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
// Test out of order metric.
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(labels.FromStrings("a", "b"), 2, 99)
testutil.Equals(t, storage.ErrOutOfOrderSample, err)
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
@ -1716,7 +1718,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
testutil.Ok(t, app.Commit())
// Compact Head to test out of bound metric.
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
@ -1725,7 +1727,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
testutil.Ok(t, db.Compact())
testutil.Assert(t, db.head.minValidTime.Load() > 0, "")
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99)
testutil.Equals(t, storage.ErrOutOfBounds, err)
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples))
@ -1736,7 +1738,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
testutil.Ok(t, app.Commit())
// Some more valid samples for out of order.
app = db.Appender()
app = db.Appender(ctx)
for i := 1; i <= 5; i++ {
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99)
testutil.Ok(t, err)
@ -1744,7 +1746,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
testutil.Ok(t, app.Commit())
// Test out of order metric.
app = db.Appender()
app = db.Appender(ctx)
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99)
testutil.Equals(t, storage.ErrOutOfOrderSample, err)
testutil.Equals(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
@ -1765,7 +1767,7 @@ func testHeadSeriesChunkRace(t *testing.T) {
testutil.Ok(t, h.Close())
}()
testutil.Ok(t, h.Init(0))
app := h.Appender()
app := h.Appender(context.Background())
s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0)
testutil.Ok(t, err)
@ -1814,7 +1816,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
expectedLabelValues = []string{"d", "e", "f"}
)
app := head.Appender()
app := head.Appender(context.Background())
for i, name := range expectedLabelNames {
_, err := app.Add(labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0)
testutil.Ok(t, err)
@ -1859,28 +1861,28 @@ func TestErrReuseAppender(t *testing.T) {
testutil.Ok(t, head.Close())
}()
app := head.Appender()
app := head.Appender(context.Background())
_, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.NotOk(t, app.Commit())
testutil.NotOk(t, app.Rollback())
app = head.Appender()
app = head.Appender(context.Background())
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())
testutil.NotOk(t, app.Rollback())
testutil.NotOk(t, app.Commit())
app = head.Appender()
app = head.Appender(context.Background())
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.NotOk(t, app.Rollback())
testutil.NotOk(t, app.Commit())
app = head.Appender()
app = head.Appender(context.Background())
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())

View file

@ -14,6 +14,7 @@
package tsdb
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -41,7 +42,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
testutil.Ok(b, h.Close())
}()
app := h.Appender()
app := h.Appender(context.Background())
addSeries := func(l labels.Labels) {
app.Add(l, 0, 0)
}
@ -147,7 +148,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil)
testutil.Ok(b, err)
defer h.Close()
app := h.Appender()
app := h.Appender(context.Background())
numSeries := 1000000
for i := 0; i < numSeries; i++ {
app.Add(labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)

View file

@ -1875,7 +1875,7 @@ func TestPostingsForMatchers(t *testing.T) {
testutil.Ok(t, h.Close())
}()
app := h.Appender()
app := h.Appender(context.Background())
app.Add(labels.FromStrings("n", "1"), 0, 0)
app.Add(labels.FromStrings("n", "1", "i", "a"), 0, 0)
app.Add(labels.FromStrings("n", "1", "i", "b"), 0, 0)

View file

@ -38,7 +38,7 @@ func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logg
if err != nil {
return nil, err
}
app := head.Appender()
app := head.Appender(context.TODO())
for _, sample := range samples {
_, err = app.Add(sample.Labels, sample.TimestampMs, sample.Value)
if err != nil {