mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Fix memSafeIterator.Seek() (#8748)
* Add range query test cases This includes a couple of failing ones that double count some points due to the iterator seek bug. Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com> Signed-off-by: Fiona Liao <fiona.y.liao@gmail.com> * Add Seek() implementation for memSafeIterator Previously, calling memSafeIterator.Seek() would call the Seek() method on its embedded iterator. This was causing the embedded iterator and the memSafeIterator to get out of sync because when the embedded Seek() moved to the next element of the embedded iterator, memSafeIterator didn't "know" about it. memSafeIterator has to "know" when the embedded iterator has moved to be able to work out when it should be reading from its buffer rather than the embedded iterator. Used same logic as for xorIterator.Seek() (which in runtime is used as the embedded iterator) - return false if the iterator has an error and try to move to next element if the required time hasn't been reached, or if no elements have been read yet. The memSafeIterator.Next() method is being called so memSafeIterator.i is always accurate. Signed-off-by: Fiona Liao <fiona.y.liao@gmail.com> * Add tsdb package test Signed-off-by: Fiona Liao <fiona.y.liao@gmail.com> Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
parent
5047d36a77
commit
9b83d8330a
|
@ -2327,3 +2327,111 @@ func TestEngineOptsValidation(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRangeQuery(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Load string
|
||||
Query string
|
||||
Result parser.Value
|
||||
Start time.Time
|
||||
End time.Time
|
||||
Interval time.Duration
|
||||
}{
|
||||
{
|
||||
Name: "sum_over_time with all values",
|
||||
Load: `load 30s
|
||||
bar 0 1 10 100 1000`,
|
||||
Query: "sum_over_time(bar[30s])",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}},
|
||||
Metric: labels.Labels{}},
|
||||
},
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(120, 0),
|
||||
Interval: 60 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "sum_over_time with trailing values",
|
||||
Load: `load 30s
|
||||
bar 0 1 10 100 1000 0 0 0 0`,
|
||||
Query: "sum_over_time(bar[30s])",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}},
|
||||
Metric: labels.Labels{}},
|
||||
},
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(120, 0),
|
||||
Interval: 60 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "sum_over_time with all values long",
|
||||
Load: `load 30s
|
||||
bar 0 1 10 100 1000 10000 100000 1000000 10000000`,
|
||||
Query: "sum_over_time(bar[30s])",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}, {V: 110000, T: 180000}, {V: 11000000, T: 240000}},
|
||||
Metric: labels.Labels{}},
|
||||
},
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(240, 0),
|
||||
Interval: 60 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "sum_over_time with all values random",
|
||||
Load: `load 30s
|
||||
bar 5 17 42 2 7 905 51`,
|
||||
Query: "sum_over_time(bar[30s])",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 5, T: 0}, {V: 59, T: 60000}, {V: 9, T: 120000}, {V: 956, T: 180000}},
|
||||
Metric: labels.Labels{}},
|
||||
},
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(180, 0),
|
||||
Interval: 60 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "metric query",
|
||||
Load: `load 30s
|
||||
metric 1+1x4`,
|
||||
Query: "metric",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 3, T: 60000}, {V: 5, T: 120000}},
|
||||
Metric: labels.Labels{labels.Label{Name: "__name__", Value: "metric"}}},
|
||||
},
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(120, 0),
|
||||
Interval: 1 * time.Minute,
|
||||
},
|
||||
{
|
||||
Name: "metric query with trailing values",
|
||||
Load: `load 30s
|
||||
metric 1+1x8`,
|
||||
Query: "metric",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 3, T: 60000}, {V: 5, T: 120000}},
|
||||
Metric: labels.Labels{labels.Label{Name: "__name__", Value: "metric"}}},
|
||||
},
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(120, 0),
|
||||
Interval: 1 * time.Minute,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.Name, func(t *testing.T) {
|
||||
test, err := NewTest(t, c.Load)
|
||||
require.NoError(t, err)
|
||||
defer test.Close()
|
||||
|
||||
err = test.Run()
|
||||
require.NoError(t, err)
|
||||
|
||||
qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval)
|
||||
require.NoError(t, err)
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
require.NoError(t, res.Err)
|
||||
require.Equal(t, c.Result, res.Value)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
17
tsdb/head.go
17
tsdb/head.go
|
@ -2538,6 +2538,23 @@ type memSafeIterator struct {
|
|||
buf [4]sample
|
||||
}
|
||||
|
||||
func (it *memSafeIterator) Seek(t int64) bool {
|
||||
if it.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
ts, _ := it.At()
|
||||
|
||||
for t > ts || it.i == -1 {
|
||||
if !it.Next() {
|
||||
return false
|
||||
}
|
||||
ts, _ = it.At()
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (it *memSafeIterator) Next() bool {
|
||||
if it.i+1 >= it.stopAfter {
|
||||
return false
|
||||
|
|
|
@ -2067,3 +2067,71 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
|
|||
require.Equal(b, 9, len(actualValues))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "iterator_seek")
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, os.RemoveAll(dir))
|
||||
}()
|
||||
// This is usually taken from the Head, but passing manually here.
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 500, nil)
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
|
||||
it := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
|
||||
_, ok := it.(*memSafeIterator)
|
||||
require.True(t, ok)
|
||||
|
||||
// First point.
|
||||
ok = it.Seek(0)
|
||||
require.True(t, ok)
|
||||
ts, val := it.At()
|
||||
require.Equal(t, int64(0), ts)
|
||||
require.Equal(t, float64(0), val)
|
||||
|
||||
// Advance one point.
|
||||
ok = it.Next()
|
||||
require.True(t, ok)
|
||||
ts, val = it.At()
|
||||
require.Equal(t, int64(1), ts)
|
||||
require.Equal(t, float64(1), val)
|
||||
|
||||
// Seeking an older timestamp shouldn't cause the iterator to go backwards.
|
||||
ok = it.Seek(0)
|
||||
require.True(t, ok)
|
||||
ts, val = it.At()
|
||||
require.Equal(t, int64(1), ts)
|
||||
require.Equal(t, float64(1), val)
|
||||
|
||||
// Seek into the buffer.
|
||||
ok = it.Seek(3)
|
||||
require.True(t, ok)
|
||||
ts, val = it.At()
|
||||
require.Equal(t, int64(3), ts)
|
||||
require.Equal(t, float64(3), val)
|
||||
|
||||
// Iterate through the rest of the buffer.
|
||||
for i := 4; i < 7; i++ {
|
||||
ok = it.Next()
|
||||
require.True(t, ok)
|
||||
ts, val = it.At()
|
||||
require.Equal(t, int64(i), ts)
|
||||
require.Equal(t, float64(i), val)
|
||||
}
|
||||
|
||||
// Run out of elements in the iterator.
|
||||
ok = it.Next()
|
||||
require.False(t, ok)
|
||||
ok = it.Seek(7)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue