Merge pull request #13197 from bboreham/tsdb-lint
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

Clean up some issues in tsdb tests
This commit is contained in:
Jan Fajerski 2025-01-27 21:18:09 +01:00 committed by GitHub
commit 2ae706be8f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 185 additions and 180 deletions

View file

@ -1575,12 +1575,13 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
func TestSparseHistogramSpaceSavings(t *testing.T) {
t.Skip()
cases := []struct {
type testcase struct {
numSeriesPerSchema int
numBuckets int
numSpans int
gapBetweenSpans int
}{
}
cases := []testcase{
{1, 15, 1, 0},
{1, 50, 1, 0},
{1, 100, 1, 0},
@ -1692,7 +1693,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
}()
wg.Add(1)
go func() {
go func(c testcase) {
defer wg.Done()
// Ingest histograms the old way.
@ -1740,7 +1741,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
oldULIDs, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil)
require.NoError(t, err)
require.Len(t, oldULIDs, 1)
}()
}(c)
wg.Wait()

View file

@ -426,64 +426,65 @@ func TestDeleteSimple(t *testing.T) {
},
}
Outer:
for _, c := range cases {
db := openTestDB(t, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
t.Run("", func(t *testing.T) {
db := openTestDB(t, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
ctx := context.Background()
app := db.Appender(ctx)
ctx := context.Background()
app := db.Appender(ctx)
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Append(0, labels.FromStrings("a", "b"), i, smpls[i])
}
require.NoError(t, app.Commit())
// TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges.
for _, r := range c.Intervals {
require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
}
// Compare the result.
q, err := db.Querier(0, numSamples)
require.NoError(t, err)
res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
expSamples := make([]chunks.Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts], nil, nil})
}
expss := newMockSeriesSet([]storage.Series{
storage.NewListSeries(labels.FromStrings("a", "b"), expSamples),
})
for {
eok, rok := expss.Next(), res.Next()
require.Equal(t, eok, rok)
if !eok {
require.Empty(t, res.Warnings())
continue Outer
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Append(0, labels.FromStrings("a", "b"), i, smpls[i])
}
sexp := expss.At()
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels())
require.NoError(t, app.Commit())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
// TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges.
for _, r := range c.Intervals {
require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
}
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
}
// Compare the result.
q, err := db.Querier(0, numSamples)
require.NoError(t, err)
res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
expSamples := make([]chunks.Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts], nil, nil})
}
expss := newMockSeriesSet([]storage.Series{
storage.NewListSeries(labels.FromStrings("a", "b"), expSamples),
})
for {
eok, rok := expss.Next(), res.Next()
require.Equal(t, eok, rok)
if !eok {
require.Empty(t, res.Warnings())
break
}
sexp := expss.At()
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
}
})
}
}
@ -759,64 +760,65 @@ func TestDB_SnapshotWithDelete(t *testing.T) {
},
}
Outer:
for _, c := range cases {
// TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges.
for _, r := range c.intervals {
require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
}
// create snapshot
snap := t.TempDir()
require.NoError(t, db.Snapshot(snap, true))
// reopen DB from snapshot
newDB, err := Open(snap, nil, nil, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, newDB.Close()) }()
// Compare the result.
q, err := newDB.Querier(0, numSamples)
require.NoError(t, err)
defer func() { require.NoError(t, q.Close()) }()
res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
expSamples := make([]chunks.Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts], nil, nil})
}
expss := newMockSeriesSet([]storage.Series{
storage.NewListSeries(labels.FromStrings("a", "b"), expSamples),
})
if len(expSamples) == 0 {
require.False(t, res.Next())
continue
}
for {
eok, rok := expss.Next(), res.Next()
require.Equal(t, eok, rok)
if !eok {
require.Empty(t, res.Warnings())
continue Outer
t.Run("", func(t *testing.T) {
// TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges.
for _, r := range c.intervals {
require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
}
sexp := expss.At()
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels())
// create snapshot
snap := t.TempDir()
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.NoError(t, db.Snapshot(snap, true))
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
}
// reopen DB from snapshot
newDB, err := Open(snap, nil, nil, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, newDB.Close()) }()
// Compare the result.
q, err := newDB.Querier(0, numSamples)
require.NoError(t, err)
defer func() { require.NoError(t, q.Close()) }()
res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
expSamples := make([]chunks.Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts], nil, nil})
}
expss := newMockSeriesSet([]storage.Series{
storage.NewListSeries(labels.FromStrings("a", "b"), expSamples),
})
if len(expSamples) == 0 {
require.False(t, res.Next())
return
}
for {
eok, rok := expss.Next(), res.Next()
require.Equal(t, eok, rok)
if !eok {
require.Empty(t, res.Warnings())
break
}
sexp := expss.At()
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
}
})
}
}
@ -2250,49 +2252,51 @@ func TestDB_LabelNames(t *testing.T) {
require.NoError(t, err)
}
for _, tst := range tests {
ctx := context.Background()
db := openTestDB(t, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
t.Run("", func(t *testing.T) {
ctx := context.Background()
db := openTestDB(t, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
appendSamples(db, 0, 4, tst.sampleLabels1)
appendSamples(db, 0, 4, tst.sampleLabels1)
// Testing head.
headIndexr, err := db.head.Index()
require.NoError(t, err)
labelNames, err := headIndexr.LabelNames(ctx)
require.NoError(t, err)
require.Equal(t, tst.exp1, labelNames)
require.NoError(t, headIndexr.Close())
// Testing disk.
err = db.Compact(ctx)
require.NoError(t, err)
// All blocks have same label names, hence check them individually.
// No need to aggregate and check.
for _, b := range db.Blocks() {
blockIndexr, err := b.Index()
// Testing head.
headIndexr, err := db.head.Index()
require.NoError(t, err)
labelNames, err = blockIndexr.LabelNames(ctx)
labelNames, err := headIndexr.LabelNames(ctx)
require.NoError(t, err)
require.Equal(t, tst.exp1, labelNames)
require.NoError(t, blockIndexr.Close())
}
require.NoError(t, headIndexr.Close())
// Adding more samples to head with new label names
// so that we can test (head+disk).LabelNames(ctx) (the union).
appendSamples(db, 5, 9, tst.sampleLabels2)
// Testing disk.
err = db.Compact(ctx)
require.NoError(t, err)
// All blocks have same label names, hence check them individually.
// No need to aggregate and check.
for _, b := range db.Blocks() {
blockIndexr, err := b.Index()
require.NoError(t, err)
labelNames, err = blockIndexr.LabelNames(ctx)
require.NoError(t, err)
require.Equal(t, tst.exp1, labelNames)
require.NoError(t, blockIndexr.Close())
}
// Testing DB (union).
q, err := db.Querier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
var ws annotations.Annotations
labelNames, ws, err = q.LabelNames(ctx, nil)
require.NoError(t, err)
require.Empty(t, ws)
require.NoError(t, q.Close())
require.Equal(t, tst.exp2, labelNames)
// Adding more samples to head with new label names
// so that we can test (head+disk).LabelNames(ctx) (the union).
appendSamples(db, 5, 9, tst.sampleLabels2)
// Testing DB (union).
q, err := db.Querier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
var ws annotations.Annotations
labelNames, ws, err = q.LabelNames(ctx, nil)
require.NoError(t, err)
require.Empty(t, ws)
require.NoError(t, q.Close())
require.Equal(t, tst.exp2, labelNames)
})
}
}

View file

@ -1594,7 +1594,6 @@ func TestDelete_e2e(t *testing.T) {
for i := 0; i < numRanges; i++ {
q, err := NewBlockQuerier(hb, 0, 100000)
require.NoError(t, err)
defer q.Close()
ss := q.Select(context.Background(), true, nil, del.ms...)
// Build the mockSeriesSet.
matchedSeries := make([]storage.Series, 0, len(matched))
@ -1635,6 +1634,7 @@ func TestDelete_e2e(t *testing.T) {
}
require.NoError(t, ss.Err())
require.Empty(t, ss.Warnings())
require.NoError(t, q.Close())
}
}
}

View file

@ -694,9 +694,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
go func() {
defer close(decodedCh)
var err error
dec := record.NewDecoder(syms)
for r.Next() {
var err error
rec := r.Record()
switch dec.Type(rec) {
case record.Samples:

View file

@ -2633,42 +2633,42 @@ func BenchmarkSetMatcher(b *testing.B) {
}
for _, c := range cases {
dir := b.TempDir()
var (
blocks []*Block
prefilledLabels []map[string]string
generatedSeries []storage.Series
)
for i := int64(0); i < int64(c.numBlocks); i++ {
mint := i * int64(c.numSamplesPerSeriesPerBlock)
maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1
if len(prefilledLabels) == 0 {
generatedSeries = genSeries(c.numSeries, 10, mint, maxt)
for _, s := range generatedSeries {
prefilledLabels = append(prefilledLabels, s.Labels().Map())
}
} else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt)
}
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil)
require.NoError(b, err)
blocks = append(blocks, block)
defer block.Close()
}
qblocks := make([]storage.Querier, 0, len(blocks))
for _, blk := range blocks {
q, err := NewBlockQuerier(blk, math.MinInt64, math.MaxInt64)
require.NoError(b, err)
qblocks = append(qblocks, q)
}
sq := storage.NewMergeQuerier(qblocks, nil, storage.ChainedSeriesMerge)
defer sq.Close()
benchMsg := fmt.Sprintf("nSeries=%d,nBlocks=%d,cardinality=%d,pattern=\"%s\"", c.numSeries, c.numBlocks, c.cardinality, c.pattern)
b.Run(benchMsg, func(b *testing.B) {
dir := b.TempDir()
var (
blocks []*Block
prefilledLabels []map[string]string
generatedSeries []storage.Series
)
for i := int64(0); i < int64(c.numBlocks); i++ {
mint := i * int64(c.numSamplesPerSeriesPerBlock)
maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1
if len(prefilledLabels) == 0 {
generatedSeries = genSeries(c.numSeries, 10, mint, maxt)
for _, s := range generatedSeries {
prefilledLabels = append(prefilledLabels, s.Labels().Map())
}
} else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt)
}
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil)
require.NoError(b, err)
blocks = append(blocks, block)
defer block.Close()
}
qblocks := make([]storage.Querier, 0, len(blocks))
for _, blk := range blocks {
q, err := NewBlockQuerier(blk, math.MinInt64, math.MaxInt64)
require.NoError(b, err)
qblocks = append(qblocks, q)
}
sq := storage.NewMergeQuerier(qblocks, nil, storage.ChainedSeriesMerge)
defer sq.Close()
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {