From fd284d4084cd10da76e65427fb2c8e73c86ad253 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 9 Jan 2023 17:15:35 +0100 Subject: [PATCH] Fix bug in populateBlock when it could return with some asyncBlockWriter still running. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- tsdb/async_block_writer.go | 1 + tsdb/compact.go | 12 ++++- tsdb/compact_test.go | 93 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 1 deletion(-) diff --git a/tsdb/async_block_writer.go b/tsdb/async_block_writer.go index 7747a52cb..57575cd3f 100644 --- a/tsdb/async_block_writer.go +++ b/tsdb/async_block_writer.go @@ -16,6 +16,7 @@ import ( var errAsyncBlockWriterNotRunning = errors.New("asyncBlockWriter doesn't run anymore") // asyncBlockWriter runs a background goroutine that writes series and chunks to the block asynchronously. +// All calls on asyncBlockWriter must be done from single goroutine, it is not safe for concurrent usage from multiple goroutines. type asyncBlockWriter struct { chunkPool chunkenc.Pool // Where to return chunks after writing. diff --git a/tsdb/compact.go b/tsdb/compact.go index dd59ff435..fff2e7523 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -1053,8 +1053,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, blockWriters := make([]*asyncBlockWriter, len(outBlocks)) for ix := range outBlocks { blockWriters[ix] = newAsyncBlockWriter(c.chunkPool, outBlocks[ix].chunkw, outBlocks[ix].indexw, sema) - defer blockWriters[ix].closeAsync() // Make sure to close writer to stop goroutine. } + defer func() { + // Stop all async writers. + for ix := range outBlocks { + blockWriters[ix].closeAsync() + } + + // And wait until they have finished, to make sure that they no longer update chunk or index writers. + for ix := range outBlocks { + _, _ = blockWriters[ix].waitFinished() + } + }() var chksIter chunks.Iterator diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 67c554008..bd582a7ca 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -31,6 +31,7 @@ import ( "github.com/pkg/errors" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -2276,3 +2277,95 @@ func TestLeveledCompactor_plan_overlapping_disabled(t *testing.T) { } } } + +func TestAsyncBlockWriterSuccess(t *testing.T) { + cw, err := chunks.NewWriter(t.TempDir()) + require.NoError(t, err) + + const series = 100 + // prepare index, add all symbols + iw, err := index.NewWriter(context.Background(), filepath.Join(t.TempDir(), indexFilename)) + + require.NoError(t, iw.AddSymbol("__name__")) + for ix := 0; ix < series; ix++ { + s := fmt.Sprintf("s_%3d", ix) + require.NoError(t, iw.AddSymbol(s)) + } + + // async block writer expects index writer ready to receive series. + abw := newAsyncBlockWriter(chunkenc.NewPool(), cw, iw, semaphore.NewWeighted(int64(1))) + + for ix := 0; ix < series; ix++ { + s := fmt.Sprintf("s_%3d", ix) + require.NoError(t, abw.addSeries(labels.FromStrings("__name__", s), []chunks.Meta{{Chunk: randomChunk(t), MinTime: 0, MaxTime: math.MaxInt64}})) + } + + // signal that no more series are coming + abw.closeAsync() + + // We can do this repeatedly. + abw.closeAsync() + abw.closeAsync() + + // wait for result + stats, err := abw.waitFinished() + require.NoError(t, err) + require.Equal(t, uint64(series), stats.NumSeries) + require.Equal(t, uint64(series), stats.NumChunks) + + // We get the same result on subsequent calls to waitFinished. + for i := 0; i < 5; i++ { + newstats, err := abw.waitFinished() + require.NoError(t, err) + require.Equal(t, stats, newstats) + + // We can call close async again, as long as it's on the same goroutine. + abw.closeAsync() + } +} + +func TestAsyncBlockWriterFailure(t *testing.T) { + cw, err := chunks.NewWriter(t.TempDir()) + require.NoError(t, err) + + // We don't write symbols to this index writer, so adding series next will fail. + iw, err := index.NewWriter(context.Background(), filepath.Join(t.TempDir(), indexFilename)) + + // async block writer expects index writer ready to receive series. + abw := newAsyncBlockWriter(chunkenc.NewPool(), cw, iw, semaphore.NewWeighted(int64(1))) + + // Adding single series doesn't fail, as it just puts it onto the queue. + require.NoError(t, abw.addSeries(labels.FromStrings("__name__", "test"), []chunks.Meta{{Chunk: randomChunk(t), MinTime: 0, MaxTime: math.MaxInt64}})) + + // Signal that no more series are coming. + abw.closeAsync() + + // We can do this repeatedly. + abw.closeAsync() + abw.closeAsync() + + // Wait for result, this time we get error due to missing symbols. + _, err = abw.waitFinished() + require.Error(t, err) + require.ErrorContains(t, err, "unknown symbol") + + // We get the same error on each repeated call to waitFinished. + for i := 0; i < 5; i++ { + _, nerr := abw.waitFinished() + require.Equal(t, err, nerr) + + // We can call close async again, as long as it's on the same goroutine. + abw.closeAsync() + } +} + +func randomChunk(t *testing.T) chunkenc.Chunk { + chunk := chunkenc.NewXORChunk() + l := rand.Int() % 120 + app, err := chunk.Appender() + require.NoError(t, err) + for i := 0; i < l; i++ { + app.Append(rand.Int63(), rand.Float64()) + } + return chunk +}