Fix queries on blocks for sparse histograms and add unit test (#9209)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-08-16 18:52:29 +05:30 committed by GitHub
parent 42f576aa18
commit eedb86783e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 10 deletions

View file

@ -468,7 +468,11 @@ func (c *chainSampleIterator) Seek(t int64) bool {
}
if len(c.h) > 0 {
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
c.lastt, _ = c.curr.At()
if c.curr.ChunkEncoding() == chunkenc.EncSHS {
c.lastt, _ = c.curr.AtHistogram()
} else {
c.lastt, _ = c.curr.At()
}
return true
}
c.curr = nil
@ -516,7 +520,11 @@ func (c *chainSampleIterator) Next() bool {
var currt int64
for {
if c.curr.Next() {
currt, _ = c.curr.At()
if c.curr.ChunkEncoding() == chunkenc.EncSHS {
currt, _ = c.curr.AtHistogram()
} else {
currt, _ = c.curr.At()
}
if currt == c.lastt {
// Ignoring sample for the same timestamp.
continue
@ -528,7 +536,13 @@ func (c *chainSampleIterator) Next() bool {
}
// Check current iterator with the top of the heap.
if nextt, _ := c.h[0].At(); currt < nextt {
var nextt int64
if c.h[0].ChunkEncoding() == chunkenc.EncSHS {
nextt, _ = c.h[0].AtHistogram()
} else {
nextt, _ = c.h[0].At()
}
if currt < nextt {
// Current iterator has smaller timestamp than the heap.
break
}
@ -541,7 +555,11 @@ func (c *chainSampleIterator) Next() bool {
}
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
currt, _ = c.curr.At()
if c.curr.ChunkEncoding() == chunkenc.EncSHS {
currt, _ = c.curr.AtHistogram()
} else {
currt, _ = c.curr.At()
}
if currt != c.lastt {
break
}
@ -565,8 +583,17 @@ func (h samplesIteratorHeap) Len() int { return len(h) }
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h samplesIteratorHeap) Less(i, j int) bool {
at, _ := h[i].At()
bt, _ := h[j].At()
var at, bt int64
if h[i].ChunkEncoding() == chunkenc.EncSHS {
at, _ = h[i].AtHistogram()
} else {
at, _ = h[i].At()
}
if h[j].ChunkEncoding() == chunkenc.EncSHS {
bt, _ = h[j].AtHistogram()
} else {
bt, _ = h[j].At()
}
return at < bt
}

View file

@ -1692,7 +1692,7 @@ func generateCustomHistograms(numHists, numBuckets, numSpans, gapBetweenSpans, s
return r
}
func TestSparseHistogramCompaction(t *testing.T) {
func TestSparseHistogramCompactionAndQuery(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
require.NoError(t, err)
t.Cleanup(func() {
@ -1709,6 +1709,12 @@ func TestSparseHistogramCompaction(t *testing.T) {
})
db.DisableCompactions()
type timedHist struct {
t int64
h histogram.SparseHistogram
}
expHists := make(map[string][]timedHist)
series1Histograms := generateHistograms(20)
series2Histograms := generateHistograms(20)
idx1, idx2 := -1, -1
@ -1721,16 +1727,54 @@ func TestSparseHistogramCompaction(t *testing.T) {
idx2++
_, err = app.AppendHistogram(0, lbls2, ts, series2Histograms[idx2])
require.NoError(t, err)
l1, l2 := lbls1.String(), lbls2.String()
expHists[l1] = append(expHists[l1], timedHist{t: ts, h: series1Histograms[idx1]})
expHists[l2] = append(expHists[l2], timedHist{t: ts, h: series2Histograms[idx2]})
}
// Add histograms to create 3 blocks via compaction.
testQuery := func() {
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
defer func() {
require.NoError(t, q.Close())
}()
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*"))
actHists := make(map[string][]timedHist)
for ss.Next() {
s := ss.At()
it := s.Iterator()
for it.Next() {
ts, h := it.AtHistogram()
actHists[s.Labels().String()] = append(actHists[s.Labels().String()], timedHist{ts, h.Copy()})
}
require.NoError(t, it.Err())
}
require.NoError(t, ss.Err())
require.Equal(t, expHists, actHists)
}
// Add histograms to create 1 block via compaction.
app := db.Appender(context.Background())
for ts := int64(0); ts <= 4*DefaultBlockDuration; ts += DefaultBlockDuration / 2 {
for ts := int64(0); ts <= 2*DefaultBlockDuration; ts += DefaultBlockDuration / 2 {
addNextHists(ts, app)
}
require.NoError(t, app.Commit())
testQuery() // Only the head block.
require.NoError(t, db.Compact())
require.Equal(t, 1, len(db.Blocks()))
testQuery() // 1 persistent block and the head block.
// Add histograms to create 2 more blocks via compaction.
app = db.Appender(context.Background())
for ts := 5 * DefaultBlockDuration / 2; ts <= 4*DefaultBlockDuration; ts += DefaultBlockDuration / 2 {
addNextHists(ts, app)
}
require.NoError(t, app.Commit())
require.NoError(t, db.Compact())
require.Equal(t, 3, len(db.Blocks()))
testQuery() // >1 persistent block (and the head block).
// Another block triggers compaction of the first 3 blocks into 1 block.
app = db.Appender(context.Background())
@ -1740,6 +1784,7 @@ func TestSparseHistogramCompaction(t *testing.T) {
require.NoError(t, app.Commit())
require.NoError(t, db.Compact())
require.Equal(t, 2, len(db.Blocks()))
testQuery()
require.Equal(t, int64(0), db.blocks[0].MinTime())
require.Equal(t, 3*DefaultBlockDuration, db.blocks[0].MaxTime())
@ -1747,13 +1792,40 @@ func TestSparseHistogramCompaction(t *testing.T) {
require.Equal(t, 4*DefaultBlockDuration, db.blocks[1].MaxTime())
// Add tombstones to the first block to make sure that the deletion works for histograms on compaction.
err = db.Delete(0, 2*DefaultBlockDuration, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*"))
delTime := 2 * DefaultBlockDuration
err = db.Delete(0, delTime, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*"))
require.NoError(t, err)
require.Equal(t, uint64(2), db.blocks[0].Meta().Stats.NumTombstones)
// Truncate expected histograms to test the query after deletion.
for k, v := range expHists {
oldCount := len(v)
for i := 0; i < len(v); i++ {
if v[i].t > delTime {
expHists[k] = expHists[k][i:]
break
}
}
require.Less(t, len(expHists[k]), oldCount)
require.Greater(t, len(expHists[k]), 0)
}
testQuery() // Query with tombstones on persistent block.
oldULID := db.blocks[0].Meta().ULID
require.NoError(t, db.Compact())
require.Equal(t, 2, len(db.Blocks()))
newULID := db.blocks[0].Meta().ULID
require.NotEqual(t, oldULID, newULID)
require.Equal(t, uint64(0), db.blocks[0].Meta().Stats.NumTombstones)
testQuery()
// Adding tombstones to head and testing query for that.
// Last sample was ts=5*DefaultBlockDuration, so a tombstone just to cover that.
err = db.Delete((5*DefaultBlockDuration)-1, (5*DefaultBlockDuration)+1, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*"))
require.NoError(t, err)
// Remove last sample from expected.
for k := range expHists {
expHists[k] = expHists[k][:len(expHists[k])-1]
require.Greater(t, len(expHists[k]), 0)
}
testQuery()
}