From f4fbe47254856612559153051a87e76b083f6eaf Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 27 Nov 2023 15:13:01 +0000 Subject: [PATCH 1/3] tsdb tests: avoid capture-by-reference in goroutines Only one version of the variable is captured; this is a source of race conditions. Signed-off-by: Bryan Boreham --- tsdb/compact_test.go | 9 +++++---- tsdb/head_wal.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 310fe8f25a..6e3db15eb4 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1575,12 +1575,13 @@ func TestHeadCompactionWithHistograms(t *testing.T) { func TestSparseHistogramSpaceSavings(t *testing.T) { t.Skip() - cases := []struct { + type testcase struct { numSeriesPerSchema int numBuckets int numSpans int gapBetweenSpans int - }{ + } + cases := []testcase{ {1, 15, 1, 0}, {1, 50, 1, 0}, {1, 100, 1, 0}, @@ -1692,7 +1693,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { }() wg.Add(1) - go func() { + go func(c testcase) { defer wg.Done() // Ingest histograms the old way. @@ -1740,7 +1741,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { oldULIDs, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) require.NoError(t, err) require.Len(t, oldULIDs, 1) - }() + }(c) wg.Wait() diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index e9557c59f6..0afe84a875 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -694,9 +694,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch go func() { defer close(decodedCh) - var err error dec := record.NewDecoder(syms) for r.Next() { + var err error rec := r.Record() switch dec.Type(rec) { case record.Samples: From 2f615a200d8591db573f9993fd2690720193030f Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 27 Nov 2023 15:15:29 +0000 Subject: [PATCH 2/3] tsdb tests: restrict some 'defer' operations 'defer' only runs at the end of the function, so introduce some more functions / move the start, so that 'defer' can run at the end of the logical block. Signed-off-by: Bryan Boreham --- tsdb/db_test.go | 284 ++++++++++++++++++++++--------------------- tsdb/querier_test.go | 68 +++++------ 2 files changed, 178 insertions(+), 174 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index b858e6f524..7dc60a7304 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -426,64 +426,65 @@ func TestDeleteSimple(t *testing.T) { }, } -Outer: for _, c := range cases { - db := openTestDB(t, nil, nil) - defer func() { - require.NoError(t, db.Close()) - }() + t.Run("", func(t *testing.T) { + db := openTestDB(t, nil, nil) + defer func() { + require.NoError(t, db.Close()) + }() - ctx := context.Background() - app := db.Appender(ctx) + ctx := context.Background() + app := db.Appender(ctx) - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - app.Append(0, labels.FromStrings("a", "b"), i, smpls[i]) - } - - require.NoError(t, app.Commit()) - - // TODO(gouthamve): Reset the tombstones somehow. - // Delete the ranges. - for _, r := range c.Intervals { - require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) - } - - // Compare the result. - q, err := db.Querier(0, numSamples) - require.NoError(t, err) - - res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) - - expSamples := make([]chunks.Sample, 0, len(c.remaint)) - for _, ts := range c.remaint { - expSamples = append(expSamples, sample{ts, smpls[ts], nil, nil}) - } - - expss := newMockSeriesSet([]storage.Series{ - storage.NewListSeries(labels.FromStrings("a", "b"), expSamples), - }) - - for { - eok, rok := expss.Next(), res.Next() - require.Equal(t, eok, rok) - - if !eok { - require.Empty(t, res.Warnings()) - continue Outer + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Append(0, labels.FromStrings("a", "b"), i, smpls[i]) } - sexp := expss.At() - sres := res.At() - require.Equal(t, sexp.Labels(), sres.Labels()) + require.NoError(t, app.Commit()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) + // TODO(gouthamve): Reset the tombstones somehow. + // Delete the ranges. + for _, r := range c.Intervals { + require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + } - require.Equal(t, errExp, errRes) - require.Equal(t, smplExp, smplRes) - } + // Compare the result. + q, err := db.Querier(0, numSamples) + require.NoError(t, err) + + res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + + expSamples := make([]chunks.Sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamples = append(expSamples, sample{ts, smpls[ts], nil, nil}) + } + + expss := newMockSeriesSet([]storage.Series{ + storage.NewListSeries(labels.FromStrings("a", "b"), expSamples), + }) + + for { + eok, rok := expss.Next(), res.Next() + require.Equal(t, eok, rok) + + if !eok { + require.Empty(t, res.Warnings()) + break + } + sexp := expss.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels()) + + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) + + require.Equal(t, errExp, errRes) + require.Equal(t, smplExp, smplRes) + } + }) } } @@ -759,64 +760,65 @@ func TestDB_SnapshotWithDelete(t *testing.T) { }, } -Outer: for _, c := range cases { - // TODO(gouthamve): Reset the tombstones somehow. - // Delete the ranges. - for _, r := range c.intervals { - require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) - } - - // create snapshot - snap := t.TempDir() - - require.NoError(t, db.Snapshot(snap, true)) - - // reopen DB from snapshot - newDB, err := Open(snap, nil, nil, nil, nil) - require.NoError(t, err) - defer func() { require.NoError(t, newDB.Close()) }() - - // Compare the result. - q, err := newDB.Querier(0, numSamples) - require.NoError(t, err) - defer func() { require.NoError(t, q.Close()) }() - - res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) - - expSamples := make([]chunks.Sample, 0, len(c.remaint)) - for _, ts := range c.remaint { - expSamples = append(expSamples, sample{ts, smpls[ts], nil, nil}) - } - - expss := newMockSeriesSet([]storage.Series{ - storage.NewListSeries(labels.FromStrings("a", "b"), expSamples), - }) - - if len(expSamples) == 0 { - require.False(t, res.Next()) - continue - } - - for { - eok, rok := expss.Next(), res.Next() - require.Equal(t, eok, rok) - - if !eok { - require.Empty(t, res.Warnings()) - continue Outer + t.Run("", func(t *testing.T) { + // TODO(gouthamve): Reset the tombstones somehow. + // Delete the ranges. + for _, r := range c.intervals { + require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) } - sexp := expss.At() - sres := res.At() - require.Equal(t, sexp.Labels(), sres.Labels()) + // create snapshot + snap := t.TempDir() - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) + require.NoError(t, db.Snapshot(snap, true)) - require.Equal(t, errExp, errRes) - require.Equal(t, smplExp, smplRes) - } + // reopen DB from snapshot + newDB, err := Open(snap, nil, nil, nil, nil) + require.NoError(t, err) + defer func() { require.NoError(t, newDB.Close()) }() + + // Compare the result. + q, err := newDB.Querier(0, numSamples) + require.NoError(t, err) + defer func() { require.NoError(t, q.Close()) }() + + res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + + expSamples := make([]chunks.Sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamples = append(expSamples, sample{ts, smpls[ts], nil, nil}) + } + + expss := newMockSeriesSet([]storage.Series{ + storage.NewListSeries(labels.FromStrings("a", "b"), expSamples), + }) + + if len(expSamples) == 0 { + require.False(t, res.Next()) + return + } + + for { + eok, rok := expss.Next(), res.Next() + require.Equal(t, eok, rok) + + if !eok { + require.Empty(t, res.Warnings()) + break + } + sexp := expss.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels()) + + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) + + require.Equal(t, errExp, errRes) + require.Equal(t, smplExp, smplRes) + } + }) } } @@ -2250,49 +2252,51 @@ func TestDB_LabelNames(t *testing.T) { require.NoError(t, err) } for _, tst := range tests { - ctx := context.Background() - db := openTestDB(t, nil, nil) - defer func() { - require.NoError(t, db.Close()) - }() + t.Run("", func(t *testing.T) { + ctx := context.Background() + db := openTestDB(t, nil, nil) + defer func() { + require.NoError(t, db.Close()) + }() - appendSamples(db, 0, 4, tst.sampleLabels1) + appendSamples(db, 0, 4, tst.sampleLabels1) - // Testing head. - headIndexr, err := db.head.Index() - require.NoError(t, err) - labelNames, err := headIndexr.LabelNames(ctx) - require.NoError(t, err) - require.Equal(t, tst.exp1, labelNames) - require.NoError(t, headIndexr.Close()) - - // Testing disk. - err = db.Compact(ctx) - require.NoError(t, err) - // All blocks have same label names, hence check them individually. - // No need to aggregate and check. - for _, b := range db.Blocks() { - blockIndexr, err := b.Index() + // Testing head. + headIndexr, err := db.head.Index() require.NoError(t, err) - labelNames, err = blockIndexr.LabelNames(ctx) + labelNames, err := headIndexr.LabelNames(ctx) require.NoError(t, err) require.Equal(t, tst.exp1, labelNames) - require.NoError(t, blockIndexr.Close()) - } + require.NoError(t, headIndexr.Close()) - // Adding more samples to head with new label names - // so that we can test (head+disk).LabelNames(ctx) (the union). - appendSamples(db, 5, 9, tst.sampleLabels2) + // Testing disk. + err = db.Compact(ctx) + require.NoError(t, err) + // All blocks have same label names, hence check them individually. + // No need to aggregate and check. + for _, b := range db.Blocks() { + blockIndexr, err := b.Index() + require.NoError(t, err) + labelNames, err = blockIndexr.LabelNames(ctx) + require.NoError(t, err) + require.Equal(t, tst.exp1, labelNames) + require.NoError(t, blockIndexr.Close()) + } - // Testing DB (union). - q, err := db.Querier(math.MinInt64, math.MaxInt64) - require.NoError(t, err) - var ws annotations.Annotations - labelNames, ws, err = q.LabelNames(ctx, nil) - require.NoError(t, err) - require.Empty(t, ws) - require.NoError(t, q.Close()) - require.Equal(t, tst.exp2, labelNames) + // Adding more samples to head with new label names + // so that we can test (head+disk).LabelNames(ctx) (the union). + appendSamples(db, 5, 9, tst.sampleLabels2) + + // Testing DB (union). + q, err := db.Querier(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + var ws annotations.Annotations + labelNames, ws, err = q.LabelNames(ctx, nil) + require.NoError(t, err) + require.Empty(t, ws) + require.NoError(t, q.Close()) + require.Equal(t, tst.exp2, labelNames) + }) } } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 2d83b04ba3..9a7743ec12 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2633,42 +2633,42 @@ func BenchmarkSetMatcher(b *testing.B) { } for _, c := range cases { - dir := b.TempDir() - - var ( - blocks []*Block - prefilledLabels []map[string]string - generatedSeries []storage.Series - ) - for i := int64(0); i < int64(c.numBlocks); i++ { - mint := i * int64(c.numSamplesPerSeriesPerBlock) - maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1 - if len(prefilledLabels) == 0 { - generatedSeries = genSeries(c.numSeries, 10, mint, maxt) - for _, s := range generatedSeries { - prefilledLabels = append(prefilledLabels, s.Labels().Map()) - } - } else { - generatedSeries = populateSeries(prefilledLabels, mint, maxt) - } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil) - require.NoError(b, err) - blocks = append(blocks, block) - defer block.Close() - } - - qblocks := make([]storage.Querier, 0, len(blocks)) - for _, blk := range blocks { - q, err := NewBlockQuerier(blk, math.MinInt64, math.MaxInt64) - require.NoError(b, err) - qblocks = append(qblocks, q) - } - - sq := storage.NewMergeQuerier(qblocks, nil, storage.ChainedSeriesMerge) - defer sq.Close() - benchMsg := fmt.Sprintf("nSeries=%d,nBlocks=%d,cardinality=%d,pattern=\"%s\"", c.numSeries, c.numBlocks, c.cardinality, c.pattern) b.Run(benchMsg, func(b *testing.B) { + dir := b.TempDir() + + var ( + blocks []*Block + prefilledLabels []map[string]string + generatedSeries []storage.Series + ) + for i := int64(0); i < int64(c.numBlocks); i++ { + mint := i * int64(c.numSamplesPerSeriesPerBlock) + maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1 + if len(prefilledLabels) == 0 { + generatedSeries = genSeries(c.numSeries, 10, mint, maxt) + for _, s := range generatedSeries { + prefilledLabels = append(prefilledLabels, s.Labels().Map()) + } + } else { + generatedSeries = populateSeries(prefilledLabels, mint, maxt) + } + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil) + require.NoError(b, err) + blocks = append(blocks, block) + defer block.Close() + } + + qblocks := make([]storage.Querier, 0, len(blocks)) + for _, blk := range blocks { + q, err := NewBlockQuerier(blk, math.MinInt64, math.MaxInt64) + require.NoError(b, err) + qblocks = append(qblocks, q) + } + + sq := storage.NewMergeQuerier(qblocks, nil, storage.ChainedSeriesMerge) + defer sq.Close() + b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { From 6ba25ba93fc9c6f0b1eebed7f30c476f90eca1d6 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 27 Nov 2023 15:17:08 +0000 Subject: [PATCH 3/3] tsdb tests: avoid 'defer' till end of function 'defer' only runs at the end of the function, so explicitly close the querier after we finish with it. Also check it didn't error. Signed-off-by: Bryan Boreham --- tsdb/head_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 748922ac6b..33b54a756f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1594,7 +1594,6 @@ func TestDelete_e2e(t *testing.T) { for i := 0; i < numRanges; i++ { q, err := NewBlockQuerier(hb, 0, 100000) require.NoError(t, err) - defer q.Close() ss := q.Select(context.Background(), true, nil, del.ms...) // Build the mockSeriesSet. matchedSeries := make([]storage.Series, 0, len(matched)) @@ -1635,6 +1634,7 @@ func TestDelete_e2e(t *testing.T) { } require.NoError(t, ss.Err()) require.Empty(t, ss.Warnings()) + require.NoError(t, q.Close()) } } }