Addressed comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-02-07 16:24:17 +00:00
parent 34426766d8
commit 2cf637fbf5
7 changed files with 30 additions and 22 deletions

View file

@ -67,13 +67,10 @@ func BenchmarkRangeQuery(b *testing.B) {
numIntervals := 8640 + 10000 numIntervals := 8640 + 10000
for s := 0; s < numIntervals; s++ { for s := 0; s < numIntervals; s++ {
a, err := storage.Appender() a := storage.Appender()
if err != nil {
b.Fatal(err)
}
ts := int64(s * 10000) // 10s interval. ts := int64(s * 10000) // 10s interval.
for i, metric := range metrics { for i, metric := range metrics {
err := a.AddFast(metric, refs[i], ts, float64(s)) err := a.AddFast(refs[i], ts, float64(s))
if err != nil { if err != nil {
refs[i], _ = a.Add(metric, ts, float64(s)) refs[i], _ = a.Add(metric, ts, float64(s))
} }

View file

@ -38,15 +38,13 @@ func TestDeriv(t *testing.T) {
} }
engine := NewEngine(opts) engine := NewEngine(opts)
a, err := storage.Appender() a := storage.Appender()
testutil.Ok(t, err)
metric := labels.FromStrings("__name__", "foo") metric := labels.FromStrings("__name__", "foo")
a.Add(metric, 1493712816939, 1.0) a.Add(metric, 1493712816939, 1.0)
a.Add(metric, 1493712846939, 1.0) a.Add(metric, 1493712846939, 1.0)
err = a.Commit() testutil.Ok(t, a.Commit())
testutil.Ok(t, err)
query, err := engine.NewInstantQuery(storage, "deriv(foo[30m])", timestamp.Time(1493712846939)) query, err := engine.NewInstantQuery(storage, "deriv(foo[30m])", timestamp.Time(1493712846939))
testutil.Ok(t, err) testutil.Ok(t, err)

View file

@ -28,8 +28,7 @@ func TestEvaluations(t *testing.T) {
test, err := newTestFromFile(t, fn) test, err := newTestFromFile(t, fn)
testutil.Ok(t, err) testutil.Ok(t, err)
err = test.Run() testutil.Ok(t, test.Run())
testutil.Ok(t, err)
test.Close() test.Close()
} }

View file

@ -99,14 +99,27 @@ func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier,
} }
// Appender provides batched appends against a storage. // Appender provides batched appends against a storage.
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
//
// Operations on the Appender interface are not goroutine-safe.
type Appender interface { type Appender interface {
// Add adds a sample pair for the given series. A reference number is
// returned which can be used to add further samples in the same or later
// transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to AddFast() at any point. Adding the sample via Add() returns a new
// reference number.
// If the reference is 0 it must not be used for caching.
Add(l labels.Labels, t int64, v float64) (uint64, error) Add(l labels.Labels, t int64, v float64) (uint64, error)
// AddFast adds a sample pair for the referenced series. It is generally
// faster than adding a sample by providing its full label set.
AddFast(ref uint64, t int64, v float64) error AddFast(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch. // Commit submits the collected samples and purges the batch.
Commit() error Commit() error
// Rollback rolls back all modifications made in the appender so far.
Rollback() error Rollback() error
} }

View file

@ -75,9 +75,11 @@ type Appender interface {
// Iterator is a simple iterator that can only get the next value. // Iterator is a simple iterator that can only get the next value.
// Iterator iterates over the samples of a time series. // Iterator iterates over the samples of a time series.
type Iterator interface { type Iterator interface {
// Seek advances the iterator forward to the sample with the timestamp t or first value after t. // Next advances the iterator by one.
// Seek has no effect if requested timestamp is the same or lower than the current iterator position. Next() bool
// Seek returns false if there is no such sample with the timestamp equal or larger than t. // Seek advances the iterator forward to the first sample with the timestamp equal or greater than t.
// If current sample found by previous `Next` or `Seek` operation already has this property, Seek has no effect.
// Seek returns true, if such sample exists, false otherwise.
// Iterator is exhausted when the Seek returns false. // Iterator is exhausted when the Seek returns false.
// TODO(bwplotka): Verify above statements on all implementations with unit test. // TODO(bwplotka): Verify above statements on all implementations with unit test.
Seek(t int64) bool Seek(t int64) bool
@ -85,9 +87,8 @@ type Iterator interface {
// At returns (math.MinInt64, 0.0) before the iterator has advanced. // At returns (math.MinInt64, 0.0) before the iterator has advanced.
// TODO(bwplotka): Verify above statement on all implementations with unit test. // TODO(bwplotka): Verify above statement on all implementations with unit test.
At() (int64, float64) At() (int64, float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error. // Err returns the current error.
// Err can return undefined value before the iterator has advanced.
Err() error Err() error
} }

View file

@ -350,7 +350,8 @@ func (db *DBReadOnly) FlushWAL(dir string) error {
context.Background(), context.Background(),
nil, nil,
db.logger, db.logger,
ExponentialBlockRanges(time.Duration(DefaultOptions().MinBlockDuration).Milliseconds(), 3, 5), chunkenc.NewPool(), ExponentialBlockRanges(int64(time.Duration(DefaultOptions().MinBlockDuration))/1e6, 3, 5),
chunkenc.NewPool(),
) )
if err != nil { if err != nil {
return errors.Wrap(err, "create leveled compactor") return errors.Wrap(err, "create leveled compactor")
@ -363,7 +364,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error {
// Querier loads the wal and returns a new querier over the data partition for the given time range. // Querier loads the wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers. // Current implementation doesn't support multiple Queriers.
func (db *DBReadOnly) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
select { select {
case <-db.closed: case <-db.closed:
return nil, ErrClosed return nil, ErrClosed
@ -424,7 +425,7 @@ func (db *DBReadOnly) Querier(_ context.Context, mint, maxt int64) (storage.Quer
head: head, head: head,
} }
return dbWritable.Querier(context.TODO(), mint, maxt) return dbWritable.Querier(ctx, mint, maxt)
} }
// Blocks returns a slice of block readers for persisted blocks. // Blocks returns a slice of block readers for persisted blocks.
@ -503,7 +504,7 @@ func (db *DBReadOnly) Close() error {
return merr.Err() return merr.Err()
} }
// Open returns a new DB in the given directory. If options are empty, default DefaultOptions will be used. // Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
var rngs []int64 var rngs []int64
opts, rngs = validateOpts(opts, nil) opts, rngs = validateOpts(opts, nil)

View file

@ -1344,7 +1344,6 @@ func TestDeletedIterator_WithSeek(t *testing.T) {
cases := []struct { cases := []struct {
r tombstones.Intervals r tombstones.Intervals
seek int64
}{ }{
{r: tombstones.Intervals{{Mint: 1, Maxt: 20}}}, {r: tombstones.Intervals{{Mint: 1, Maxt: 20}}},
{r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 20}, {Mint: 21, Maxt: 23}, {Mint: 25, Maxt: 30}}}, {r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 20}, {Mint: 21, Maxt: 23}, {Mint: 25, Maxt: 30}}},