mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Add context to Appender interface
Signed-off-by: Annanay <annanayagarwal@gmail.com>
This commit is contained in:
parent
841b13641c
commit
7f98a744e5
|
@ -984,9 +984,9 @@ func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (stor
|
|||
}
|
||||
|
||||
// Appender implements the Storage interface.
|
||||
func (s *readyStorage) Appender() storage.Appender {
|
||||
func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
|
||||
if x := s.get(); x != nil {
|
||||
return x.Appender()
|
||||
return x.Appender(ctx)
|
||||
}
|
||||
return notReadyAppender{}
|
||||
}
|
||||
|
|
|
@ -263,7 +263,7 @@ func TestTimeMetrics(t *testing.T) {
|
|||
"prometheus_tsdb_head_max_time_seconds",
|
||||
))
|
||||
|
||||
app := db.Appender()
|
||||
app := db.Appender(context.Background())
|
||||
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1)
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1)
|
||||
|
|
|
@ -199,7 +199,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in
|
|||
total := uint64(0)
|
||||
|
||||
for i := 0; i < scrapeCount; i++ {
|
||||
app := b.storage.Appender()
|
||||
app := b.storage.Appender(context.TODO())
|
||||
ts += timeDelta
|
||||
|
||||
for _, s := range scrape {
|
||||
|
|
|
@ -68,7 +68,7 @@ func BenchmarkRangeQuery(b *testing.B) {
|
|||
numIntervals := 8640 + 10000
|
||||
|
||||
for s := 0; s < numIntervals; s++ {
|
||||
a := storage.Appender()
|
||||
a := storage.Appender(context.Background())
|
||||
ts := int64(s * 10000) // 10s interval.
|
||||
for i, metric := range metrics {
|
||||
err := a.AddFast(refs[i], ts, float64(s))
|
||||
|
|
|
@ -40,7 +40,7 @@ func TestDeriv(t *testing.T) {
|
|||
}
|
||||
engine := NewEngine(opts)
|
||||
|
||||
a := storage.Appender()
|
||||
a := storage.Appender(context.Background())
|
||||
|
||||
metric := labels.FromStrings("__name__", "foo")
|
||||
a.Add(metric, 1493712816939, 1.0)
|
||||
|
|
|
@ -433,7 +433,7 @@ func (t *Test) exec(tc testCommand) error {
|
|||
t.clear()
|
||||
|
||||
case *loadCmd:
|
||||
app := t.storage.Appender()
|
||||
app := t.storage.Appender(t.context)
|
||||
if err := cmd.append(app); err != nil {
|
||||
app.Rollback()
|
||||
return err
|
||||
|
@ -644,7 +644,7 @@ func (ll *LazyLoader) clear() {
|
|||
|
||||
// appendTill appends the defined time series to the storage till the given timestamp (in milliseconds).
|
||||
func (ll *LazyLoader) appendTill(ts int64) error {
|
||||
app := ll.storage.Appender()
|
||||
app := ll.storage.Appender(ll.Context())
|
||||
for h, smpls := range ll.loadCmd.defs {
|
||||
m := ll.loadCmd.metrics[h]
|
||||
for i, s := range smpls {
|
||||
|
|
|
@ -352,7 +352,7 @@ func (g *Group) run(ctx context.Context) {
|
|||
select {
|
||||
case <-g.managerDone:
|
||||
case <-time.After(2 * g.interval):
|
||||
g.cleanupStaleSeries(now)
|
||||
g.cleanupStaleSeries(ctx, now)
|
||||
}
|
||||
}(time.Now())
|
||||
}()
|
||||
|
@ -588,7 +588,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
numDuplicates = 0
|
||||
)
|
||||
|
||||
app := g.opts.Appendable.Appender()
|
||||
app := g.opts.Appendable.Appender(ctx)
|
||||
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
|
||||
defer func() {
|
||||
if err := app.Commit(); err != nil {
|
||||
|
@ -636,14 +636,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
}
|
||||
}(i, rule)
|
||||
}
|
||||
g.cleanupStaleSeries(ts)
|
||||
g.cleanupStaleSeries(ctx, ts)
|
||||
}
|
||||
|
||||
func (g *Group) cleanupStaleSeries(ts time.Time) {
|
||||
func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
|
||||
if len(g.staleSeries) == 0 {
|
||||
return
|
||||
}
|
||||
app := g.opts.Appendable.Appender()
|
||||
app := g.opts.Appendable.Appender(ctx)
|
||||
for _, s := range g.staleSeries {
|
||||
// Rule that produced series no longer configured, mark it stale.
|
||||
_, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
|
||||
|
|
|
@ -541,7 +541,7 @@ func TestStaleness(t *testing.T) {
|
|||
})
|
||||
|
||||
// A time series that has two samples and then goes stale.
|
||||
app := st.Appender()
|
||||
app := st.Appender(context.Background())
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
|
||||
|
@ -865,7 +865,7 @@ func TestNotify(t *testing.T) {
|
|||
Opts: opts,
|
||||
})
|
||||
|
||||
app := storage.Appender()
|
||||
app := storage.Appender(context.Background())
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3)
|
||||
|
|
|
@ -14,13 +14,14 @@
|
|||
package scrape
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
type nopAppendable struct{}
|
||||
|
||||
func (a nopAppendable) Appender() storage.Appender {
|
||||
func (a nopAppendable) Appender(_ context.Context) storage.Appender {
|
||||
return nopAppender{}
|
||||
}
|
||||
|
||||
|
|
|
@ -230,7 +230,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
|
|||
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
|
||||
},
|
||||
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
|
||||
func() storage.Appender { return appender(app.Appender(), opts.limit) },
|
||||
func(ctx context.Context) storage.Appender { return appender(app.Appender(ctx), opts.limit) },
|
||||
cache,
|
||||
jitterSeed,
|
||||
opts.honorTimestamps,
|
||||
|
@ -611,7 +611,7 @@ type scrapeLoop struct {
|
|||
jitterSeed uint64
|
||||
honorTimestamps bool
|
||||
|
||||
appender func() storage.Appender
|
||||
appender func(ctx context.Context) storage.Appender
|
||||
sampleMutator labelsMutator
|
||||
reportSampleMutator labelsMutator
|
||||
|
||||
|
@ -873,7 +873,7 @@ func newScrapeLoop(ctx context.Context,
|
|||
buffers *pool.Pool,
|
||||
sampleMutator labelsMutator,
|
||||
reportSampleMutator labelsMutator,
|
||||
appender func() storage.Appender,
|
||||
appender func(ctx context.Context) storage.Appender,
|
||||
cache *scrapeCache,
|
||||
jitterSeed uint64,
|
||||
honorTimestamps bool,
|
||||
|
@ -986,7 +986,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
}
|
||||
}
|
||||
|
||||
app := sl.appender()
|
||||
app := sl.appender(scrapeCtx)
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
|
@ -1003,13 +1003,13 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
total, added, seriesAdded, appErr := sl.append(app, b, contentType, start)
|
||||
if appErr != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender()
|
||||
app = sl.appender(scrapeCtx)
|
||||
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
|
||||
// The append failed, probably due to a parse error or sample limit.
|
||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender()
|
||||
app = sl.appender(scrapeCtx)
|
||||
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
@ -1066,7 +1066,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
|||
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||
// If the target has since been recreated and scraped, the
|
||||
// stale markers will be out of order and ignored.
|
||||
app := sl.appender()
|
||||
app := sl.appender(sl.ctx)
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
|
@ -1080,7 +1080,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
|||
}()
|
||||
if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender()
|
||||
app = sl.appender(sl.ctx)
|
||||
level.Warn(sl.l).Log("msg", "Stale append failed", "err", err)
|
||||
}
|
||||
if err = sl.reportStale(app, staleTime); err != nil {
|
||||
|
|
|
@ -300,6 +300,7 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
func TestScrapePoolAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{}
|
||||
app := &nopAppendable{}
|
||||
ctx := context.Background()
|
||||
sp, _ := newScrapePool(cfg, app, 0, nil)
|
||||
|
||||
loop := sp.newLoop(scrapeLoopOptions{
|
||||
|
@ -308,7 +309,7 @@ func TestScrapePoolAppender(t *testing.T) {
|
|||
appl, ok := loop.(*scrapeLoop)
|
||||
testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop)
|
||||
|
||||
wrapped := appl.appender()
|
||||
wrapped := appl.appender(ctx)
|
||||
|
||||
tl, ok := wrapped.(*timeLimitAppender)
|
||||
testutil.Assert(t, ok, "Expected timeLimitAppender but got %T", wrapped)
|
||||
|
@ -323,7 +324,7 @@ func TestScrapePoolAppender(t *testing.T) {
|
|||
appl, ok = loop.(*scrapeLoop)
|
||||
testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop)
|
||||
|
||||
wrapped = appl.appender()
|
||||
wrapped = appl.appender(ctx)
|
||||
|
||||
sl, ok := wrapped.(*limitAppender)
|
||||
testutil.Assert(t, ok, "Expected limitAppender but got %T", wrapped)
|
||||
|
@ -374,8 +375,9 @@ func TestScrapePoolRaces(t *testing.T) {
|
|||
|
||||
func TestScrapeLoopStopBeforeRun(t *testing.T) {
|
||||
scraper := &testScraper{}
|
||||
ctx := context.Background()
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
scraper,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
|
@ -434,10 +436,11 @@ func TestScrapeLoopStop(t *testing.T) {
|
|||
signal = make(chan struct{}, 1)
|
||||
appender = &collectResultAppender{}
|
||||
scraper = &testScraper{}
|
||||
app = func() storage.Appender { return appender }
|
||||
app = func(ctx context.Context) storage.Appender { return appender }
|
||||
)
|
||||
ctx := context.Background()
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
scraper,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
|
@ -499,7 +502,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
errc = make(chan error)
|
||||
|
||||
scraper = &testScraper{}
|
||||
app = func() storage.Appender { return &nopAppender{} }
|
||||
app = func(ctx context.Context) storage.Appender { return &nopAppender{} }
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -605,14 +608,14 @@ func TestScrapeLoopMetadata(t *testing.T) {
|
|||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return nopAppender{} },
|
||||
func(ctx context.Context) storage.Appender { return nopAppender{} },
|
||||
cache,
|
||||
0,
|
||||
true,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter
|
||||
# HELP test_metric some help text
|
||||
# UNIT test_metric metric
|
||||
|
@ -661,7 +664,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
|
|||
)
|
||||
defer cancel()
|
||||
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -669,7 +672,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
|
|||
testutil.Equals(t, 1, added)
|
||||
testutil.Equals(t, 1, seriesAdded)
|
||||
|
||||
slApp = sl.appender()
|
||||
slApp = sl.appender(ctx)
|
||||
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
testutil.Ok(t, err)
|
||||
|
@ -683,7 +686,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
|
|||
var (
|
||||
signal = make(chan struct{}, 1)
|
||||
scraper = &testScraper{}
|
||||
app = func() storage.Appender { return appender }
|
||||
app = func(ctx context.Context) storage.Appender { return appender }
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -736,7 +739,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
|
|||
var (
|
||||
signal = make(chan struct{}, 1)
|
||||
scraper = &testScraper{}
|
||||
app = func() storage.Appender { return appender }
|
||||
app = func(ctx context.Context) storage.Appender { return appender }
|
||||
numScrapes = 0
|
||||
)
|
||||
|
||||
|
@ -795,7 +798,7 @@ func TestScrapeLoopCache(t *testing.T) {
|
|||
var (
|
||||
signal = make(chan struct{}, 1)
|
||||
scraper = &testScraper{}
|
||||
app = func() storage.Appender { appender.next = s.Appender(); return appender }
|
||||
app = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender }
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -863,13 +866,13 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
|
|||
s := teststorage.New(t)
|
||||
defer s.Close()
|
||||
|
||||
sapp := s.Appender()
|
||||
sapp := s.Appender(context.Background())
|
||||
|
||||
appender := &collectResultAppender{next: sapp}
|
||||
var (
|
||||
signal = make(chan struct{}, 1)
|
||||
scraper = &testScraper{}
|
||||
app = func() storage.Appender { return appender }
|
||||
app = func(ctx context.Context) storage.Appender { return appender }
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -975,8 +978,9 @@ func TestScrapeLoopAppend(t *testing.T) {
|
|||
discoveryLabels := &Target{
|
||||
labels: labels.FromStrings(test.discoveryLabels...),
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil, nil, nil,
|
||||
func(l labels.Labels) labels.Labels {
|
||||
return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil)
|
||||
|
@ -984,7 +988,7 @@ func TestScrapeLoopAppend(t *testing.T) {
|
|||
func(l labels.Labels) labels.Labels {
|
||||
return mutateReportSampleLabels(l, discoveryLabels)
|
||||
},
|
||||
func() storage.Appender { return app },
|
||||
func(ctx context.Context) storage.Appender { return app },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
|
@ -992,7 +996,7 @@ func TestScrapeLoopAppend(t *testing.T) {
|
|||
|
||||
now := time.Now()
|
||||
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1020,12 +1024,13 @@ func TestScrapeLoopAppend(t *testing.T) {
|
|||
func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
|
||||
// collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next.
|
||||
app := &collectResultAppender{}
|
||||
ctx := context.Background()
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil, nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return app },
|
||||
func(ctx context.Context) storage.Appender { return app },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
|
@ -1045,7 +1050,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
|
|||
sl.cache.addRef(mets, fakeRef, lset, hash)
|
||||
now := time.Now()
|
||||
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte(metric), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1064,8 +1069,9 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
|
|||
func TestScrapeLoopAppendSampleLimit(t *testing.T) {
|
||||
resApp := &collectResultAppender{}
|
||||
app := &limitAppender{Appender: resApp, limit: 1}
|
||||
ctx := context.Background()
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil, nil, nil,
|
||||
func(l labels.Labels) labels.Labels {
|
||||
if l.Has("deleteme") {
|
||||
|
@ -1074,7 +1080,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
|
|||
return l
|
||||
},
|
||||
nopMutator,
|
||||
func() storage.Appender { return app },
|
||||
func(ctx context.Context) storage.Appender { return app },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
|
@ -1088,7 +1094,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
|
|||
beforeMetricValue := beforeMetric.GetCounter().GetValue()
|
||||
|
||||
now := time.Now()
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
|
||||
if err != errSampleLimit {
|
||||
t.Fatalf("Did not see expected sample limit error: %s", err)
|
||||
|
@ -1119,7 +1125,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
|
|||
testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected")
|
||||
|
||||
now = time.Now()
|
||||
slApp = sl.appender()
|
||||
slApp = sl.appender(ctx)
|
||||
total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
|
||||
if err != errSampleLimit {
|
||||
t.Fatalf("Did not see expected sample limit error: %s", err)
|
||||
|
@ -1138,24 +1144,25 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
|
|||
defer s.Close()
|
||||
|
||||
capp := &collectResultAppender{}
|
||||
ctx := context.Background()
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil, nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { capp.next = s.Appender(); return capp },
|
||||
func(ctx context.Context) storage.Appender { capp.next = s.Appender(ctx); return capp },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
||||
slApp = sl.appender()
|
||||
slApp = sl.appender(ctx)
|
||||
_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1178,24 +1185,25 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
|
|||
|
||||
func TestScrapeLoopAppendStaleness(t *testing.T) {
|
||||
app := &collectResultAppender{}
|
||||
ctx := context.Background()
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil, nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return app },
|
||||
func(ctx context.Context) storage.Appender { return app },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
||||
slApp = sl.appender()
|
||||
slApp = sl.appender(ctx)
|
||||
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1222,23 +1230,24 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
|
|||
|
||||
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
|
||||
app := &collectResultAppender{}
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
ctx := context.Background()
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil, nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return app },
|
||||
func(ctx context.Context) storage.Appender { return app },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
||||
slApp = sl.appender()
|
||||
slApp = sl.appender(ctx)
|
||||
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1257,7 +1266,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
|
|||
var (
|
||||
scraper = &testScraper{}
|
||||
appender = &collectResultAppender{}
|
||||
app = func() storage.Appender { return appender }
|
||||
app = func(ctx context.Context) storage.Appender { return appender }
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -1285,7 +1294,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
|
|||
var (
|
||||
scraper = &testScraper{}
|
||||
appender = &collectResultAppender{}
|
||||
app = func() storage.Appender { return appender }
|
||||
app = func(ctx context.Context) storage.Appender { return appender }
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -1333,20 +1342,21 @@ func (app *errorAppender) AddFast(ref uint64, t int64, v float64) error {
|
|||
|
||||
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
|
||||
app := &errorAppender{}
|
||||
ctx := context.Background()
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return app },
|
||||
func(ctx context.Context) storage.Appender { return app },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
)
|
||||
|
||||
now := time.Unix(1, 0)
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1366,12 +1376,13 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
|
|||
|
||||
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
|
||||
app := &collectResultAppender{}
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
ctx := context.Background()
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender {
|
||||
func(ctx context.Context) storage.Appender {
|
||||
return &timeLimitAppender{
|
||||
Appender: app,
|
||||
maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
|
||||
|
@ -1383,7 +1394,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
|
|||
)
|
||||
|
||||
now := time.Now().Add(20 * time.Minute)
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1556,21 +1567,22 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
|
|||
s := teststorage.New(t)
|
||||
defer s.Close()
|
||||
|
||||
app := s.Appender()
|
||||
ctx := context.Background()
|
||||
app := s.Appender(ctx)
|
||||
|
||||
capp := &collectResultAppender{next: app}
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil, nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return capp },
|
||||
func(ctx context.Context) storage.Appender { return capp },
|
||||
nil, 0,
|
||||
true,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1589,21 +1601,22 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
|
|||
s := teststorage.New(t)
|
||||
defer s.Close()
|
||||
|
||||
app := s.Appender()
|
||||
ctx := context.Background()
|
||||
app := s.Appender(ctx)
|
||||
|
||||
capp := &collectResultAppender{next: app}
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
sl := newScrapeLoop(ctx,
|
||||
nil, nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return capp },
|
||||
func(ctx context.Context) storage.Appender { return capp },
|
||||
nil, 0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1636,7 +1649,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// We add a good and a bad metric to check that both are discarded.
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
|
||||
testutil.NotOk(t, err)
|
||||
testutil.Ok(t, slApp.Rollback())
|
||||
|
@ -1648,7 +1661,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
testutil.Ok(t, series.Err())
|
||||
|
||||
// We add a good metric to check that it is recorded.
|
||||
slApp = sl.appender()
|
||||
slApp = sl.appender(ctx)
|
||||
_, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1665,7 +1678,8 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
|
|||
s := teststorage.New(t)
|
||||
defer s.Close()
|
||||
|
||||
app := s.Appender()
|
||||
ctx := context.Background()
|
||||
app := s.Appender(ctx)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sl := newScrapeLoop(ctx,
|
||||
|
@ -1678,14 +1692,14 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
|
|||
return l
|
||||
},
|
||||
nopMutator,
|
||||
func() storage.Appender { return app },
|
||||
func(ctx context.Context) storage.Appender { return app },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
|
||||
testutil.NotOk(t, err)
|
||||
testutil.Ok(t, slApp.Rollback())
|
||||
|
@ -1901,7 +1915,7 @@ func TestScrapeAddFast(t *testing.T) {
|
|||
)
|
||||
defer cancel()
|
||||
|
||||
slApp := sl.appender()
|
||||
slApp := sl.appender(ctx)
|
||||
_, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{})
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
@ -1912,7 +1926,7 @@ func TestScrapeAddFast(t *testing.T) {
|
|||
v.ref++
|
||||
}
|
||||
|
||||
slApp = sl.appender()
|
||||
slApp = sl.appender(ctx)
|
||||
_, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, slApp.Commit())
|
||||
|
|
|
@ -122,11 +122,11 @@ func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQueri
|
|||
return NewMergeChunkQuerier(primary, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil
|
||||
}
|
||||
|
||||
func (f *fanout) Appender() Appender {
|
||||
primary := f.primary.Appender()
|
||||
func (f *fanout) Appender(ctx context.Context) Appender {
|
||||
primary := f.primary.Appender(ctx)
|
||||
secondaries := make([]Appender, 0, len(f.secondaries))
|
||||
for _, storage := range f.secondaries {
|
||||
secondaries = append(secondaries, storage.Appender())
|
||||
secondaries = append(secondaries, storage.Appender(ctx))
|
||||
}
|
||||
return &fanoutAppender{
|
||||
logger: f.logger,
|
||||
|
|
|
@ -31,10 +31,11 @@ func TestSelectSorted(t *testing.T) {
|
|||
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
||||
|
||||
inputTotalSize := 0
|
||||
ctx := context.Background()
|
||||
|
||||
priStorage := teststorage.New(t)
|
||||
defer priStorage.Close()
|
||||
app1 := priStorage.Appender()
|
||||
app1 := priStorage.Appender(ctx)
|
||||
app1.Add(inputLabel, 0, 0)
|
||||
inputTotalSize++
|
||||
app1.Add(inputLabel, 1000, 1)
|
||||
|
@ -46,7 +47,7 @@ func TestSelectSorted(t *testing.T) {
|
|||
|
||||
remoteStorage1 := teststorage.New(t)
|
||||
defer remoteStorage1.Close()
|
||||
app2 := remoteStorage1.Appender()
|
||||
app2 := remoteStorage1.Appender(ctx)
|
||||
app2.Add(inputLabel, 3000, 3)
|
||||
inputTotalSize++
|
||||
app2.Add(inputLabel, 4000, 4)
|
||||
|
@ -59,7 +60,7 @@ func TestSelectSorted(t *testing.T) {
|
|||
remoteStorage2 := teststorage.New(t)
|
||||
defer remoteStorage2.Close()
|
||||
|
||||
app3 := remoteStorage2.Appender()
|
||||
app3 := remoteStorage2.Appender(ctx)
|
||||
app3.Add(inputLabel, 6000, 6)
|
||||
inputTotalSize++
|
||||
app3.Add(inputLabel, 7000, 7)
|
||||
|
@ -100,7 +101,7 @@ func TestSelectSorted(t *testing.T) {
|
|||
})
|
||||
t.Run("chunk querier", func(t *testing.T) {
|
||||
t.Skip("TODO(bwplotka: Unskip when db will implement ChunkQuerier.")
|
||||
querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000)
|
||||
querier, err := fanoutStorage.ChunkQuerier(ctx, 0, 8000)
|
||||
testutil.Ok(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
|
@ -222,7 +223,7 @@ type errChunkQuerier struct{ errQuerier }
|
|||
func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) {
|
||||
return errChunkQuerier{}, nil
|
||||
}
|
||||
func (errStorage) Appender() storage.Appender { return nil }
|
||||
func (errStorage) Appender(_ context.Context) storage.Appender { return nil }
|
||||
func (errStorage) StartTime() (int64, error) { return 0, nil }
|
||||
func (errStorage) Close() error { return nil }
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ var (
|
|||
// Appendable allows creating appenders.
|
||||
type Appendable interface {
|
||||
// Appender returns a new appender for the storage.
|
||||
Appender() Appender
|
||||
Appender(ctx context.Context) Appender
|
||||
}
|
||||
|
||||
// SampleAndChunkQueryable allows retrieving samples as well as encoded samples in form of chunks.
|
||||
|
|
|
@ -169,7 +169,7 @@ func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C
|
|||
}
|
||||
|
||||
// Appender implements storage.Storage.
|
||||
func (s *Storage) Appender() storage.Appender {
|
||||
func (s *Storage) Appender(_ context.Context) storage.Appender {
|
||||
return s.rws.Appender()
|
||||
}
|
||||
|
||||
|
|
|
@ -911,7 +911,8 @@ func TestDisableAutoCompactions(t *testing.T) {
|
|||
// Trigger a compaction to check that it was skipped and
|
||||
// no new blocks were created when compaction is disabled.
|
||||
db.DisableCompactions()
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for i := int64(0); i < 3; i++ {
|
||||
_, err := app.Add(label, i*blockRange, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -1027,7 +1028,8 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
|
|||
defaultLabel := labels.FromStrings("foo", "bar")
|
||||
|
||||
// Add some data to the head that is enough to trigger a compaction.
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(defaultLabel, 1, 0)
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(defaultLabel, 2, 0)
|
||||
|
|
|
@ -710,7 +710,7 @@ func (db *DB) run() {
|
|||
}
|
||||
|
||||
// Appender opens a new appender against the database.
|
||||
func (db *DB) Appender() storage.Appender {
|
||||
func (db *DB) Appender(ctx context.Context) storage.Appender {
|
||||
return dbAppender{db: db, Appender: db.head.Appender()}
|
||||
}
|
||||
|
||||
|
|
108
tsdb/db_test.go
108
tsdb/db_test.go
|
@ -138,7 +138,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -169,9 +170,10 @@ func TestNoPanicAfterWALCorrutpion(t *testing.T) {
|
|||
// This is to ensure that all samples can be read from the mmaped chunks when the WAL is corrupted.
|
||||
var expSamples []tsdbutil.Sample
|
||||
var maxt int64
|
||||
ctx := context.Background()
|
||||
{
|
||||
for {
|
||||
app := db.Appender()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), maxt, 0)
|
||||
expSamples = append(expSamples, sample{t: maxt, v: 0})
|
||||
testutil.Ok(t, err)
|
||||
|
@ -226,7 +228,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
app := db.Appender(context.Background())
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
|
@ -248,7 +250,8 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app1 := db.Appender()
|
||||
ctx := context.Background()
|
||||
app1 := db.Appender(ctx)
|
||||
|
||||
ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -260,7 +263,7 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
err = app1.Commit()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
app2 := db.Appender()
|
||||
app2 := db.Appender(ctx)
|
||||
|
||||
// first ref should already work in next transaction.
|
||||
err = app2.AddFast(ref1, 125, 0)
|
||||
|
@ -302,7 +305,8 @@ func TestAppendEmptyLabelsIgnored(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app1 := db.Appender()
|
||||
ctx := context.Background()
|
||||
app1 := db.Appender(ctx)
|
||||
|
||||
ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -354,7 +358,8 @@ Outer:
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
|
@ -413,12 +418,13 @@ func TestAmendDatapointCausesError(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 0)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
|
||||
testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err)
|
||||
testutil.Ok(t, app.Rollback())
|
||||
|
@ -430,12 +436,13 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
@ -446,12 +453,13 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002))
|
||||
testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err)
|
||||
}
|
||||
|
@ -462,7 +470,8 @@ func TestEmptyLabelsetCausesError(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{}, 0, 0)
|
||||
testutil.NotOk(t, err)
|
||||
testutil.Equals(t, "empty labelset: invalid sample", err.Error())
|
||||
|
@ -475,7 +484,8 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Append AmendedValue.
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2)
|
||||
|
@ -493,7 +503,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
|||
}, ssMap)
|
||||
|
||||
// Append Out of Order Value.
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3)
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5)
|
||||
|
@ -514,7 +524,8 @@ func TestDB_Snapshot(t *testing.T) {
|
|||
db := openTestDB(t, nil, nil)
|
||||
|
||||
// append data
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
mint := int64(1414141414000)
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
|
||||
|
@ -563,7 +574,8 @@ func TestDB_Snapshot(t *testing.T) {
|
|||
func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
|
||||
db := openTestDB(t, nil, nil)
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
mint := int64(1414141414000)
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
|
||||
|
@ -615,7 +627,8 @@ func TestDB_SnapshotWithDelete(t *testing.T) {
|
|||
|
||||
db := openTestDB(t, nil, nil)
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
|
@ -760,7 +773,8 @@ func TestDB_e2e(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
@ -864,7 +878,8 @@ func TestWALFlushedOnDBClose(t *testing.T) {
|
|||
|
||||
lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(lbls, 0, 1)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
@ -938,7 +953,7 @@ func TestWALSegmentSizeOptions(t *testing.T) {
|
|||
db := openTestDB(t, opts, nil)
|
||||
|
||||
for i := int64(0); i < 155; i++ {
|
||||
app := db.Appender()
|
||||
app := db.Appender(context.Background())
|
||||
ref, err := app.Add(labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64())
|
||||
testutil.Ok(t, err)
|
||||
for j := int64(1); j <= 78; j++ {
|
||||
|
@ -960,7 +975,8 @@ func TestTombstoneClean(t *testing.T) {
|
|||
|
||||
db := openTestDB(t, nil, nil)
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
|
@ -1287,7 +1303,8 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
|
|||
labels.FromStrings("labelname", "labelvalue"),
|
||||
}
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for _, lbls := range labelpairs {
|
||||
_, err := app.Add(lbls, 0, 1)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -1467,7 +1484,8 @@ func TestChunkAtBlockBoundary(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
blockRange := db.compactor.(*LeveledCompactor).ranges[0]
|
||||
label := labels.FromStrings("foo", "bar")
|
||||
|
@ -1523,7 +1541,8 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
|
|||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
blockRange := db.compactor.(*LeveledCompactor).ranges[0]
|
||||
label := labels.FromStrings("foo", "bar")
|
||||
|
@ -1572,7 +1591,8 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
|||
testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime())
|
||||
|
||||
// First added sample initializes the writable range.
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), 1000, 1)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
|
@ -1669,6 +1689,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
|||
|
||||
func TestNoEmptyBlocks(t *testing.T) {
|
||||
db := openTestDB(t, nil, []int64{100})
|
||||
ctx := context.Background()
|
||||
defer func() {
|
||||
testutil.Ok(t, db.Close())
|
||||
}()
|
||||
|
@ -1688,7 +1709,7 @@ func TestNoEmptyBlocks(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
|
||||
app := db.Appender()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(defaultLabel, 1, 0)
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(defaultLabel, 2, 0)
|
||||
|
@ -1705,7 +1726,7 @@ func TestNoEmptyBlocks(t *testing.T) {
|
|||
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
|
||||
testutil.Equals(t, 0, len(actBlocks))
|
||||
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(defaultLabel, 1, 0)
|
||||
testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
|
||||
|
||||
|
@ -1730,7 +1751,7 @@ func TestNoEmptyBlocks(t *testing.T) {
|
|||
t.Run(`When no new block is created from head, and there are some blocks on disk
|
||||
compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
|
||||
oldBlocks := db.Blocks()
|
||||
app := db.Appender()
|
||||
app := db.Appender(ctx)
|
||||
currentTime := db.Head().MaxTime()
|
||||
_, err := app.Add(defaultLabel, currentTime, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -1813,7 +1834,8 @@ func TestDB_LabelNames(t *testing.T) {
|
|||
// Appends samples into the database.
|
||||
appendSamples := func(db *DB, mint, maxt int64, sampleLabels [][2]string) {
|
||||
t.Helper()
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for i := mint; i <= maxt; i++ {
|
||||
for _, tuple := range sampleLabels {
|
||||
label := labels.FromStrings(tuple[0], tuple[1])
|
||||
|
@ -1880,7 +1902,8 @@ func TestCorrectNumTombstones(t *testing.T) {
|
|||
defaultLabel := labels.FromStrings("foo", "bar")
|
||||
defaultMatcher := labels.MustNewMatcher(labels.MatchEqual, defaultLabel[0].Name, defaultLabel[0].Value)
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for i := int64(0); i < 3; i++ {
|
||||
for j := int64(0); j < 15; j++ {
|
||||
_, err := app.Add(defaultLabel, i*blockRange+j, 0)
|
||||
|
@ -2283,6 +2306,7 @@ func TestVerticalCompaction(t *testing.T) {
|
|||
// will not overlap with the first block created by the next compaction.
|
||||
func TestBlockRanges(t *testing.T) {
|
||||
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
||||
ctx := context.Background()
|
||||
|
||||
dir, err := ioutil.TempDir("", "test_storage")
|
||||
testutil.Ok(t, err)
|
||||
|
@ -2298,7 +2322,7 @@ func TestBlockRanges(t *testing.T) {
|
|||
defer func() {
|
||||
os.RemoveAll(dir)
|
||||
}()
|
||||
app := db.Appender()
|
||||
app := db.Appender(ctx)
|
||||
lbl := labels.Labels{{Name: "a", Value: "b"}}
|
||||
_, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
|
||||
if err == nil {
|
||||
|
@ -2327,7 +2351,7 @@ func TestBlockRanges(t *testing.T) {
|
|||
|
||||
// Test that wal records are skipped when an existing block covers the same time ranges
|
||||
// and compaction doesn't create an overlapping block.
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
db.DisableCompactions()
|
||||
_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
|
||||
testutil.Ok(t, err)
|
||||
|
@ -2350,7 +2374,7 @@ func TestBlockRanges(t *testing.T) {
|
|||
testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks")
|
||||
testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")
|
||||
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
@ -2411,7 +2435,7 @@ func TestDBReadOnly(t *testing.T) {
|
|||
|
||||
dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir())
|
||||
testutil.Ok(t, err)
|
||||
app := dbWritable.Appender()
|
||||
app := dbWritable.Appender(context.Background())
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
@ -2481,6 +2505,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
|||
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
||||
err error
|
||||
maxt int
|
||||
ctx = context.Background()
|
||||
)
|
||||
|
||||
// Bootstrap the db.
|
||||
|
@ -2496,7 +2521,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
|||
db, err := Open(dbDir, logger, nil, nil)
|
||||
testutil.Ok(t, err)
|
||||
db.DisableCompactions()
|
||||
app := db.Appender()
|
||||
app := db.Appender(ctx)
|
||||
maxt = 1000
|
||||
for i := 0; i < maxt; i++ {
|
||||
_, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
|
||||
|
@ -2560,12 +2585,13 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
|
|||
|
||||
stop := make(chan struct{})
|
||||
firstInsert := make(chan struct{})
|
||||
ctx := context.Background()
|
||||
|
||||
// Insert data in batches.
|
||||
go func() {
|
||||
iter := 0
|
||||
for {
|
||||
app := db.Appender()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
for j := 0; j < 100; j++ {
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter))
|
||||
|
@ -2631,7 +2657,8 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer querierBeforeAdd.Close()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
|
@ -2929,7 +2956,8 @@ func TestCompactHead(t *testing.T) {
|
|||
|
||||
db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
|
||||
testutil.Ok(t, err)
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
var expSamples []sample
|
||||
maxt := 100
|
||||
for i := 0; i < maxt; i++ {
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
|
@ -1692,7 +1693,8 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
}()
|
||||
db.DisableCompactions()
|
||||
|
||||
app := db.Appender()
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for i := 1; i <= 5; i++ {
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), int64(i), 99)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -1701,7 +1703,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
|
||||
// Test out of order metric.
|
||||
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), 2, 99)
|
||||
testutil.Equals(t, storage.ErrOutOfOrderSample, err)
|
||||
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
|
@ -1716,7 +1718,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
testutil.Ok(t, app.Commit())
|
||||
|
||||
// Compact Head to test out of bound metric.
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
@ -1725,7 +1727,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
testutil.Ok(t, db.Compact())
|
||||
testutil.Assert(t, db.head.minValidTime > 0, "")
|
||||
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime-2, 99)
|
||||
testutil.Equals(t, storage.ErrOutOfBounds, err)
|
||||
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples))
|
||||
|
@ -1736,7 +1738,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
testutil.Ok(t, app.Commit())
|
||||
|
||||
// Some more valid samples for out of order.
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
for i := 1; i <= 5; i++ {
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+int64(i), 99)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -1744,7 +1746,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
testutil.Ok(t, app.Commit())
|
||||
|
||||
// Test out of order metric.
|
||||
app = db.Appender()
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+2, 99)
|
||||
testutil.Equals(t, storage.ErrOutOfOrderSample, err)
|
||||
testutil.Equals(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
|
|
Loading…
Reference in a new issue