Address comments

Signed-off-by: Annanay <annanayagarwal@gmail.com>
This commit is contained in:
Annanay 2020-07-30 16:41:13 +05:30
parent f40e4579b7
commit 89129cd39a
9 changed files with 78 additions and 90 deletions

View file

@ -300,7 +300,6 @@ func TestScrapePoolReload(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) { func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{} cfg := &config.ScrapeConfig{}
app := &nopAppendable{} app := &nopAppendable{}
ctx := context.Background()
sp, _ := newScrapePool(cfg, app, 0, nil) sp, _ := newScrapePool(cfg, app, 0, nil)
loop := sp.newLoop(scrapeLoopOptions{ loop := sp.newLoop(scrapeLoopOptions{
@ -309,7 +308,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok := loop.(*scrapeLoop) appl, ok := loop.(*scrapeLoop)
testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop) testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped := appl.appender(ctx) wrapped := appl.appender(context.Background())
tl, ok := wrapped.(*timeLimitAppender) tl, ok := wrapped.(*timeLimitAppender)
testutil.Assert(t, ok, "Expected timeLimitAppender but got %T", wrapped) testutil.Assert(t, ok, "Expected timeLimitAppender but got %T", wrapped)
@ -324,7 +323,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok = loop.(*scrapeLoop) appl, ok = loop.(*scrapeLoop)
testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop) testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped = appl.appender(ctx) wrapped = appl.appender(context.Background())
sl, ok := wrapped.(*limitAppender) sl, ok := wrapped.(*limitAppender)
testutil.Assert(t, ok, "Expected limitAppender but got %T", wrapped) testutil.Assert(t, ok, "Expected limitAppender but got %T", wrapped)
@ -375,9 +374,8 @@ func TestScrapePoolRaces(t *testing.T) {
func TestScrapeLoopStopBeforeRun(t *testing.T) { func TestScrapeLoopStopBeforeRun(t *testing.T) {
scraper := &testScraper{} scraper := &testScraper{}
ctx := context.Background()
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
scraper, scraper,
nil, nil, nil, nil,
nopMutator, nopMutator,
@ -438,9 +436,8 @@ func TestScrapeLoopStop(t *testing.T) {
scraper = &testScraper{} scraper = &testScraper{}
app = func(ctx context.Context) storage.Appender { return appender } app = func(ctx context.Context) storage.Appender { return appender }
) )
ctx := context.Background()
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
scraper, scraper,
nil, nil, nil, nil,
nopMutator, nopMutator,
@ -978,9 +975,8 @@ func TestScrapeLoopAppend(t *testing.T) {
discoveryLabels := &Target{ discoveryLabels := &Target{
labels: labels.FromStrings(test.discoveryLabels...), labels: labels.FromStrings(test.discoveryLabels...),
} }
ctx := context.Background()
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
nil, nil, nil, nil, nil, nil,
func(l labels.Labels) labels.Labels { func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil) return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil)
@ -996,7 +992,7 @@ func TestScrapeLoopAppend(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now) _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1024,9 +1020,8 @@ func TestScrapeLoopAppend(t *testing.T) {
func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
// collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next. // collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next.
app := &collectResultAppender{} app := &collectResultAppender{}
ctx := context.Background()
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
nil, nil, nil, nil, nil, nil,
nopMutator, nopMutator,
nopMutator, nopMutator,
@ -1050,7 +1045,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
sl.cache.addRef(mets, fakeRef, lset, hash) sl.cache.addRef(mets, fakeRef, lset, hash)
now := time.Now() now := time.Now()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(metric), "", now) _, _, _, err := sl.append(slApp, []byte(metric), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1069,9 +1064,8 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
func TestScrapeLoopAppendSampleLimit(t *testing.T) { func TestScrapeLoopAppendSampleLimit(t *testing.T) {
resApp := &collectResultAppender{} resApp := &collectResultAppender{}
app := &limitAppender{Appender: resApp, limit: 1} app := &limitAppender{Appender: resApp, limit: 1}
ctx := context.Background()
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
nil, nil, nil, nil, nil, nil,
func(l labels.Labels) labels.Labels { func(l labels.Labels) labels.Labels {
if l.Has("deleteme") { if l.Has("deleteme") {
@ -1094,7 +1088,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
beforeMetricValue := beforeMetric.GetCounter().GetValue() beforeMetricValue := beforeMetric.GetCounter().GetValue()
now := time.Now() now := time.Now()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
if err != errSampleLimit { if err != errSampleLimit {
t.Fatalf("Did not see expected sample limit error: %s", err) t.Fatalf("Did not see expected sample limit error: %s", err)
@ -1125,7 +1119,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected") testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected")
now = time.Now() now = time.Now()
slApp = sl.appender(ctx) slApp = sl.appender(context.Background())
total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
if err != errSampleLimit { if err != errSampleLimit {
t.Fatalf("Did not see expected sample limit error: %s", err) t.Fatalf("Did not see expected sample limit error: %s", err)
@ -1144,9 +1138,8 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
defer s.Close() defer s.Close()
capp := &collectResultAppender{} capp := &collectResultAppender{}
ctx := context.Background()
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
nil, nil, nil, nil, nil, nil,
nopMutator, nopMutator,
nopMutator, nopMutator,
@ -1157,12 +1150,12 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
) )
now := time.Now() now := time.Now()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
slApp = sl.appender(ctx) slApp = sl.appender(context.Background())
_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) _, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1185,9 +1178,8 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) {
app := &collectResultAppender{} app := &collectResultAppender{}
ctx := context.Background()
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
nil, nil, nil, nil, nil, nil,
nopMutator, nopMutator,
nopMutator, nopMutator,
@ -1198,12 +1190,12 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
) )
now := time.Now() now := time.Now()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now) _, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
slApp = sl.appender(ctx) slApp = sl.appender(context.Background())
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1230,8 +1222,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
app := &collectResultAppender{} app := &collectResultAppender{}
ctx := context.Background() sl := newScrapeLoop(context.Background(),
sl := newScrapeLoop(ctx,
nil, nil, nil, nil, nil, nil,
nopMutator, nopMutator,
nopMutator, nopMutator,
@ -1242,12 +1233,12 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
) )
now := time.Now() now := time.Now()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now) _, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
slApp = sl.appender(ctx) slApp = sl.appender(context.Background())
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1342,9 +1333,8 @@ func (app *errorAppender) AddFast(ref uint64, t int64, v float64) error {
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
app := &errorAppender{} app := &errorAppender{}
ctx := context.Background()
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
nil, nil,
nil, nil, nil, nil,
nopMutator, nopMutator,
@ -1356,7 +1346,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
) )
now := time.Unix(1, 0) now := time.Unix(1, 0)
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1376,8 +1366,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
app := &collectResultAppender{} app := &collectResultAppender{}
ctx := context.Background() sl := newScrapeLoop(context.Background(),
sl := newScrapeLoop(ctx,
nil, nil,
nil, nil, nil, nil,
nopMutator, nopMutator,
@ -1394,7 +1383,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
) )
now := time.Now().Add(20 * time.Minute) now := time.Now().Add(20 * time.Minute)
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now) total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1567,12 +1556,11 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
s := teststorage.New(t) s := teststorage.New(t)
defer s.Close() defer s.Close()
ctx := context.Background() app := s.Appender(context.Background())
app := s.Appender(ctx)
capp := &collectResultAppender{next: app} capp := &collectResultAppender{next: app}
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
nil, nil, nil, nil, nil, nil,
nopMutator, nopMutator,
nopMutator, nopMutator,
@ -1582,7 +1570,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
) )
now := time.Now() now := time.Now()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1601,12 +1589,11 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
s := teststorage.New(t) s := teststorage.New(t)
defer s.Close() defer s.Close()
ctx := context.Background() app := s.Appender(context.Background())
app := s.Appender(ctx)
capp := &collectResultAppender{next: app} capp := &collectResultAppender{next: app}
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
nil, nil, nil, nil, nil, nil,
nopMutator, nopMutator,
nopMutator, nopMutator,
@ -1616,7 +1603,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
) )
now := time.Now() now := time.Now()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit()) testutil.Ok(t, slApp.Commit())
@ -1678,11 +1665,10 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
s := teststorage.New(t) s := teststorage.New(t)
defer s.Close() defer s.Close()
ctx := context.Background() app := s.Appender(context.Background())
app := s.Appender(ctx)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, sl := newScrapeLoop(context.Background(),
&testScraper{}, &testScraper{},
nil, nil, nil, nil,
func(l labels.Labels) labels.Labels { func(l labels.Labels) labels.Labels {
@ -1699,7 +1685,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
) )
defer cancel() defer cancel()
slApp := sl.appender(ctx) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{}) _, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err) testutil.NotOk(t, err)
testutil.Ok(t, slApp.Rollback()) testutil.Ok(t, slApp.Rollback())

View file

@ -169,8 +169,8 @@ func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C
} }
// Appender implements storage.Storage. // Appender implements storage.Storage.
func (s *Storage) Appender(_ context.Context) storage.Appender { func (s *Storage) Appender(ctx context.Context) storage.Appender {
return s.rws.Appender() return s.rws.Appender(ctx)
} }
// Close the background processing of the storage queues. // Close the background processing of the storage queues.

View file

@ -14,6 +14,7 @@
package remote package remote
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -170,7 +171,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
} }
// Appender implements storage.Storage. // Appender implements storage.Storage.
func (rws *WriteStorage) Appender() storage.Appender { func (rws *WriteStorage) Appender(_ context.Context) storage.Appender {
return &timestampTracker{ return &timestampTracker{
writeStorage: rws, writeStorage: rws,
} }

View file

@ -328,7 +328,7 @@ func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize, nil) head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize, nil)
testutil.Ok(tb, err) testutil.Ok(tb, err)
app := head.Appender() app := head.Appender(context.Background())
for _, s := range series { for _, s := range series {
ref := uint64(0) ref := uint64(0)
it := s.Iterator() it := s.Iterator()

View file

@ -711,7 +711,7 @@ func (db *DB) run() {
// Appender opens a new appender against the database. // Appender opens a new appender against the database.
func (db *DB) Appender(ctx context.Context) storage.Appender { func (db *DB) Appender(ctx context.Context) storage.Appender {
return dbAppender{db: db, Appender: db.head.Appender()} return dbAppender{db: db, Appender: db.head.Appender(ctx)}
} }
// dbAppender wraps the DB's head appender and triggers compactions on commit // dbAppender wraps the DB's head appender and triggers compactions on commit

View file

@ -1203,7 +1203,7 @@ func TestSizeRetention(t *testing.T) {
} }
// Add some data to the WAL. // Add some data to the WAL.
headApp := db.Head().Appender() headApp := db.Head().Appender(context.Background())
for _, m := range headBlocks { for _, m := range headBlocks {
series := genSeries(100, 10, m.MinTime, m.MaxTime) series := genSeries(100, 10, m.MinTime, m.MaxTime)
for _, s := range series { for _, s := range series {

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"fmt" "fmt"
"math" "math"
"path/filepath" "path/filepath"
@ -1009,7 +1010,7 @@ func (a *initAppender) Rollback() error {
} }
// Appender returns a new Appender on the database. // Appender returns a new Appender on the database.
func (h *Head) Appender() storage.Appender { func (h *Head) Appender(_ context.Context) storage.Appender {
h.metrics.activeAppenders.Inc() h.metrics.activeAppenders.Inc()
// The head cache might not have a starting point yet. The init appender // The head cache might not have a starting point yet. The init appender

View file

@ -267,14 +267,14 @@ func TestHead_WALMultiRef(t *testing.T) {
testutil.Ok(t, head.Init(0)) testutil.Ok(t, head.Init(0))
app := head.Appender() app := head.Appender(context.Background())
ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1) ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
// Add another sample outside chunk range to mmap a chunk. // Add another sample outside chunk range to mmap a chunk.
app = head.Appender() app = head.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2) _, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -282,14 +282,14 @@ func TestHead_WALMultiRef(t *testing.T) {
testutil.Ok(t, head.Truncate(1600)) testutil.Ok(t, head.Truncate(1600))
app = head.Appender() app = head.Appender(context.Background())
ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3) ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Equals(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) testutil.Equals(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
// Add another sample outside chunk range to mmap a chunk. // Add another sample outside chunk range to mmap a chunk.
app = head.Appender() app = head.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4) _, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -540,7 +540,7 @@ func TestHeadDeleteSimple(t *testing.T) {
for _, c := range cases { for _, c := range cases {
head, w := newTestHead(t, 1000, compress) head, w := newTestHead(t, 1000, compress)
app := head.Appender() app := head.Appender(context.Background())
for _, smpl := range smplsAll { for _, smpl := range smplsAll {
_, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -554,7 +554,7 @@ func TestHeadDeleteSimple(t *testing.T) {
} }
// Add more samples. // Add more samples.
app = head.Appender() app = head.Appender(context.Background())
for _, smpl := range c.addSamples { for _, smpl := range c.addSamples {
_, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -621,7 +621,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
}() }()
numSamples := int64(10) numSamples := int64(10)
app := hb.Appender() app := hb.Appender(context.Background())
smpls := make([]float64, numSamples) smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ { for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64() smpls[i] = rand.Float64()
@ -645,7 +645,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Equals(t, 0, len(res.Warnings())) testutil.Equals(t, 0, len(res.Warnings()))
// Add again and test for presence. // Add again and test for presence.
app = hb.Appender() app = hb.Appender(context.Background())
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 11, 1) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 11, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -671,7 +671,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
hb, w := newTestHead(t, int64(numSamples)*10, false) hb, w := newTestHead(t, int64(numSamples)*10, false)
for i := 0; i < numSamples; i++ { for i := 0; i < numSamples; i++ {
app := hb.Appender() app := hb.Appender(context.Background())
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -763,7 +763,7 @@ func TestDelete_e2e(t *testing.T) {
testutil.Ok(t, hb.Close()) testutil.Ok(t, hb.Close())
}() }()
app := hb.Appender() app := hb.Appender(context.Background())
for _, l := range lbls { for _, l := range lbls {
ls := labels.New(l...) ls := labels.New(l...)
series := []tsdbutil.Sample{} series := []tsdbutil.Sample{}
@ -1114,7 +1114,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h.initTime(0) h.initTime(0)
app := h.appender() app := h.Appender(context.Background())
lset := labels.FromStrings("a", "1") lset := labels.FromStrings("a", "1")
_, err := app.Add(lset, 2100, 1) _, err := app.Add(lset, 2100, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1144,7 +1144,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h.initTime(0) h.initTime(0)
app := h.appender() app := h.Appender(context.Background())
lset := labels.FromStrings("a", "1") lset := labels.FromStrings("a", "1")
_, err := app.Add(lset, 2100, 1) _, err := app.Add(lset, 2100, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1175,7 +1175,7 @@ func TestHead_LogRollback(t *testing.T) {
testutil.Ok(t, h.Close()) testutil.Ok(t, h.Close())
}() }()
app := h.Appender() app := h.Appender(context.Background())
_, err := app.Add(labels.FromStrings("a", "b"), 1, 2) _, err := app.Add(labels.FromStrings("a", "b"), 1, 2)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1373,7 +1373,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
testutil.Ok(t, h.Close()) testutil.Ok(t, h.Close())
}() }()
add := func(ts int64) { add := func(ts int64) {
app := h.Appender() app := h.Appender(context.Background())
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -1404,7 +1404,7 @@ func TestAddDuplicateLabelName(t *testing.T) {
}() }()
add := func(labels labels.Labels, labelName string) { add := func(labels labels.Labels, labelName string) {
app := h.Appender() app := h.Appender(context.Background())
_, err := app.Add(labels, 0, 0) _, err := app.Add(labels, 0, 0)
testutil.NotOk(t, err) testutil.NotOk(t, err)
testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error()) testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error())
@ -1458,7 +1458,7 @@ func TestMemSeriesIsolation(t *testing.T) {
if h.MinTime() == math.MaxInt64 { if h.MinTime() == math.MaxInt64 {
app = &initAppender{head: h} app = &initAppender{head: h}
} else { } else {
a := h.appender() a := h.Appender(context.Background())
a.cleanupAppendIDsBelow = 0 a.cleanupAppendIDsBelow = 0
app = a app = a
} }
@ -1489,7 +1489,7 @@ func TestMemSeriesIsolation(t *testing.T) {
testutil.Equals(t, 999, lastValue(hb, 999)) testutil.Equals(t, 999, lastValue(hb, 999))
// Cleanup appendIDs below 500. // Cleanup appendIDs below 500.
app := hb.appender() app := hb.Appender(context.Background())
app.cleanupAppendIDsBelow = 500 app.cleanupAppendIDsBelow = 500
_, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1508,7 +1508,7 @@ func TestMemSeriesIsolation(t *testing.T) {
// Cleanup appendIDs below 1000, which means the sample buffer is // Cleanup appendIDs below 1000, which means the sample buffer is
// the only thing with appendIDs. // the only thing with appendIDs.
app = hb.appender() app = hb.Appender(context.Background())
app.cleanupAppendIDsBelow = 1000 app.cleanupAppendIDsBelow = 1000
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1522,7 +1522,7 @@ func TestMemSeriesIsolation(t *testing.T) {
i++ i++
// Cleanup appendIDs below 1001, but with a rollback. // Cleanup appendIDs below 1001, but with a rollback.
app = hb.appender() app = hb.Appender(context.Background())
app.cleanupAppendIDsBelow = 1001 app.cleanupAppendIDsBelow = 1001
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1556,7 +1556,7 @@ func TestMemSeriesIsolation(t *testing.T) {
// Cleanup appendIDs below 1000, which means the sample buffer is // Cleanup appendIDs below 1000, which means the sample buffer is
// the only thing with appendIDs. // the only thing with appendIDs.
app = hb.appender() app = hb.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
i++ i++
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1569,7 +1569,7 @@ func TestMemSeriesIsolation(t *testing.T) {
testutil.Equals(t, 1001, lastValue(hb, 1003)) testutil.Equals(t, 1001, lastValue(hb, 1003))
// Cleanup appendIDs below 1002, but with a rollback. // Cleanup appendIDs below 1002, but with a rollback.
app = hb.appender() app = hb.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Rollback()) testutil.Ok(t, app.Rollback())
@ -1587,13 +1587,13 @@ func TestIsolationRollback(t *testing.T) {
testutil.Ok(t, hb.Close()) testutil.Ok(t, hb.Close())
}() }()
app := hb.Appender() app := hb.Appender(context.Background())
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark())
app = hb.Appender() app = hb.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1) _, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2) _, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2)
@ -1601,7 +1601,7 @@ func TestIsolationRollback(t *testing.T) {
testutil.Ok(t, app.Rollback()) testutil.Ok(t, app.Rollback())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark()) testutil.Equals(t, uint64(2), hb.iso.lowWatermark())
app = hb.Appender() app = hb.Appender(context.Background())
_, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3) _, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -1614,18 +1614,18 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
testutil.Ok(t, hb.Close()) testutil.Ok(t, hb.Close())
}() }()
app1 := hb.Appender() app1 := hb.Appender(context.Background())
_, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0) _, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app1.Commit()) testutil.Ok(t, app1.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.") testutil.Equals(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.")
app1 = hb.Appender() app1 = hb.Appender(context.Background())
_, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1) _, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.") testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.")
app2 := hb.Appender() app2 := hb.Appender(context.Background())
_, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1) _, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app2.Commit()) testutil.Ok(t, app2.Commit())
@ -1668,10 +1668,10 @@ func TestIsolationWithoutAdd(t *testing.T) {
testutil.Ok(t, hb.Close()) testutil.Ok(t, hb.Close())
}() }()
app := hb.Appender() app := hb.Appender(context.Background())
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
app = hb.Appender() app = hb.Appender(context.Background())
_, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1) _, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -1767,7 +1767,7 @@ func testHeadSeriesChunkRace(t *testing.T) {
testutil.Ok(t, h.Close()) testutil.Ok(t, h.Close())
}() }()
testutil.Ok(t, h.Init(0)) testutil.Ok(t, h.Init(0))
app := h.Appender() app := h.Appender(context.Background())
s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0) s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1816,7 +1816,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
expectedLabelValues = []string{"d", "e", "f"} expectedLabelValues = []string{"d", "e", "f"}
) )
app := head.Appender() app := head.Appender(context.Background())
for i, name := range expectedLabelNames { for i, name := range expectedLabelNames {
_, err := app.Add(labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0) _, err := app.Add(labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1861,28 +1861,28 @@ func TestErrReuseAppender(t *testing.T) {
testutil.Ok(t, head.Close()) testutil.Ok(t, head.Close())
}() }()
app := head.Appender() app := head.Appender(context.Background())
_, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0) _, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.NotOk(t, app.Commit()) testutil.NotOk(t, app.Commit())
testutil.NotOk(t, app.Rollback()) testutil.NotOk(t, app.Rollback())
app = head.Appender() app = head.Appender(context.Background())
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Rollback()) testutil.Ok(t, app.Rollback())
testutil.NotOk(t, app.Rollback()) testutil.NotOk(t, app.Rollback())
testutil.NotOk(t, app.Commit()) testutil.NotOk(t, app.Commit())
app = head.Appender() app = head.Appender(context.Background())
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.NotOk(t, app.Rollback()) testutil.NotOk(t, app.Rollback())
testutil.NotOk(t, app.Commit()) testutil.NotOk(t, app.Commit())
app = head.Appender() app = head.Appender(context.Background())
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Rollback()) testutil.Ok(t, app.Rollback())

View file

@ -38,7 +38,7 @@ func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logg
if err != nil { if err != nil {
return nil, err return nil, err
} }
app := head.Appender() app := head.Appender(context.TODO())
for _, sample := range samples { for _, sample := range samples {
_, err = app.Add(sample.Labels, sample.TimestampMs, sample.Value) _, err = app.Add(sample.Labels, sample.TimestampMs, sample.Value)
if err != nil { if err != nil {