From 2cf637fbf53f8771044420dfc904d45b505987f3 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 7 Feb 2020 16:24:17 +0000 Subject: [PATCH] Addressed comments. Signed-off-by: Bartlomiej Plotka --- promql/bench_test.go | 7 ++----- promql/functions_test.go | 6 ++---- promql/promql_test.go | 3 +-- storage/interface.go | 13 +++++++++++++ tsdb/chunkenc/chunk.go | 11 ++++++----- tsdb/db.go | 9 +++++---- tsdb/querier_test.go | 3 +-- 7 files changed, 30 insertions(+), 22 deletions(-) diff --git a/promql/bench_test.go b/promql/bench_test.go index 58f12139e..b985bd30b 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -67,13 +67,10 @@ func BenchmarkRangeQuery(b *testing.B) { numIntervals := 8640 + 10000 for s := 0; s < numIntervals; s++ { - a, err := storage.Appender() - if err != nil { - b.Fatal(err) - } + a := storage.Appender() ts := int64(s * 10000) // 10s interval. for i, metric := range metrics { - err := a.AddFast(metric, refs[i], ts, float64(s)) + err := a.AddFast(refs[i], ts, float64(s)) if err != nil { refs[i], _ = a.Add(metric, ts, float64(s)) } diff --git a/promql/functions_test.go b/promql/functions_test.go index 760d6e382..a6aa1cc13 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -38,15 +38,13 @@ func TestDeriv(t *testing.T) { } engine := NewEngine(opts) - a, err := storage.Appender() - testutil.Ok(t, err) + a := storage.Appender() metric := labels.FromStrings("__name__", "foo") a.Add(metric, 1493712816939, 1.0) a.Add(metric, 1493712846939, 1.0) - err = a.Commit() - testutil.Ok(t, err) + testutil.Ok(t, a.Commit()) query, err := engine.NewInstantQuery(storage, "deriv(foo[30m])", timestamp.Time(1493712846939)) testutil.Ok(t, err) diff --git a/promql/promql_test.go b/promql/promql_test.go index bb90d9f53..6dd637f47 100644 --- a/promql/promql_test.go +++ b/promql/promql_test.go @@ -28,8 +28,7 @@ func TestEvaluations(t *testing.T) { test, err := newTestFromFile(t, fn) testutil.Ok(t, err) - err = test.Run() - testutil.Ok(t, err) + testutil.Ok(t, test.Run()) test.Close() } diff --git a/storage/interface.go b/storage/interface.go index 6d91ff424..32022c27d 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -99,14 +99,27 @@ func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, } // Appender provides batched appends against a storage. +// It must be completed with a call to Commit or Rollback and must not be reused afterwards. +// +// Operations on the Appender interface are not goroutine-safe. type Appender interface { + // Add adds a sample pair for the given series. A reference number is + // returned which can be used to add further samples in the same or later + // transactions. + // Returned reference numbers are ephemeral and may be rejected in calls + // to AddFast() at any point. Adding the sample via Add() returns a new + // reference number. + // If the reference is 0 it must not be used for caching. Add(l labels.Labels, t int64, v float64) (uint64, error) + // AddFast adds a sample pair for the referenced series. It is generally + // faster than adding a sample by providing its full label set. AddFast(ref uint64, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error + // Rollback rolls back all modifications made in the appender so far. Rollback() error } diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index e7cb16ede..1e21003d3 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -75,9 +75,11 @@ type Appender interface { // Iterator is a simple iterator that can only get the next value. // Iterator iterates over the samples of a time series. type Iterator interface { - // Seek advances the iterator forward to the sample with the timestamp t or first value after t. - // Seek has no effect if requested timestamp is the same or lower than the current iterator position. - // Seek returns false if there is no such sample with the timestamp equal or larger than t. + // Next advances the iterator by one. + Next() bool + // Seek advances the iterator forward to the first sample with the timestamp equal or greater than t. + // If current sample found by previous `Next` or `Seek` operation already has this property, Seek has no effect. + // Seek returns true, if such sample exists, false otherwise. // Iterator is exhausted when the Seek returns false. // TODO(bwplotka): Verify above statements on all implementations with unit test. Seek(t int64) bool @@ -85,9 +87,8 @@ type Iterator interface { // At returns (math.MinInt64, 0.0) before the iterator has advanced. // TODO(bwplotka): Verify above statement on all implementations with unit test. At() (int64, float64) - // Next advances the iterator by one. - Next() bool // Err returns the current error. + // Err can return undefined value before the iterator has advanced. Err() error } diff --git a/tsdb/db.go b/tsdb/db.go index df1d70b49..03825f93b 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -350,7 +350,8 @@ func (db *DBReadOnly) FlushWAL(dir string) error { context.Background(), nil, db.logger, - ExponentialBlockRanges(time.Duration(DefaultOptions().MinBlockDuration).Milliseconds(), 3, 5), chunkenc.NewPool(), + ExponentialBlockRanges(int64(time.Duration(DefaultOptions().MinBlockDuration))/1e6, 3, 5), + chunkenc.NewPool(), ) if err != nil { return errors.Wrap(err, "create leveled compactor") @@ -363,7 +364,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error { // Querier loads the wal and returns a new querier over the data partition for the given time range. // Current implementation doesn't support multiple Queriers. -func (db *DBReadOnly) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { +func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { select { case <-db.closed: return nil, ErrClosed @@ -424,7 +425,7 @@ func (db *DBReadOnly) Querier(_ context.Context, mint, maxt int64) (storage.Quer head: head, } - return dbWritable.Querier(context.TODO(), mint, maxt) + return dbWritable.Querier(ctx, mint, maxt) } // Blocks returns a slice of block readers for persisted blocks. @@ -503,7 +504,7 @@ func (db *DBReadOnly) Close() error { return merr.Err() } -// Open returns a new DB in the given directory. If options are empty, default DefaultOptions will be used. +// Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used. func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { var rngs []int64 opts, rngs = validateOpts(opts, nil) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 66c02b90b..25bf3d062 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1343,8 +1343,7 @@ func TestDeletedIterator_WithSeek(t *testing.T) { } cases := []struct { - r tombstones.Intervals - seek int64 + r tombstones.Intervals }{ {r: tombstones.Intervals{{Mint: 1, Maxt: 20}}}, {r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 20}, {Mint: 21, Maxt: 23}, {Mint: 25, Maxt: 30}}},