Fix bug in populateBlock when it could return with some asyncBlockWriter still running.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
This commit is contained in:
Peter Štibraný 2023-01-09 17:15:35 +01:00
parent f3d1f7756f
commit fd284d4084
3 changed files with 105 additions and 1 deletions

View file

@ -16,6 +16,7 @@ import (
var errAsyncBlockWriterNotRunning = errors.New("asyncBlockWriter doesn't run anymore")
// asyncBlockWriter runs a background goroutine that writes series and chunks to the block asynchronously.
// All calls on asyncBlockWriter must be done from single goroutine, it is not safe for concurrent usage from multiple goroutines.
type asyncBlockWriter struct {
chunkPool chunkenc.Pool // Where to return chunks after writing.

View file

@ -1053,8 +1053,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
blockWriters := make([]*asyncBlockWriter, len(outBlocks))
for ix := range outBlocks {
blockWriters[ix] = newAsyncBlockWriter(c.chunkPool, outBlocks[ix].chunkw, outBlocks[ix].indexw, sema)
defer blockWriters[ix].closeAsync() // Make sure to close writer to stop goroutine.
}
defer func() {
// Stop all async writers.
for ix := range outBlocks {
blockWriters[ix].closeAsync()
}
// And wait until they have finished, to make sure that they no longer update chunk or index writers.
for ix := range outBlocks {
_, _ = blockWriters[ix].waitFinished()
}
}()
var chksIter chunks.Iterator

View file

@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@ -2276,3 +2277,95 @@ func TestLeveledCompactor_plan_overlapping_disabled(t *testing.T) {
}
}
}
func TestAsyncBlockWriterSuccess(t *testing.T) {
cw, err := chunks.NewWriter(t.TempDir())
require.NoError(t, err)
const series = 100
// prepare index, add all symbols
iw, err := index.NewWriter(context.Background(), filepath.Join(t.TempDir(), indexFilename))
require.NoError(t, iw.AddSymbol("__name__"))
for ix := 0; ix < series; ix++ {
s := fmt.Sprintf("s_%3d", ix)
require.NoError(t, iw.AddSymbol(s))
}
// async block writer expects index writer ready to receive series.
abw := newAsyncBlockWriter(chunkenc.NewPool(), cw, iw, semaphore.NewWeighted(int64(1)))
for ix := 0; ix < series; ix++ {
s := fmt.Sprintf("s_%3d", ix)
require.NoError(t, abw.addSeries(labels.FromStrings("__name__", s), []chunks.Meta{{Chunk: randomChunk(t), MinTime: 0, MaxTime: math.MaxInt64}}))
}
// signal that no more series are coming
abw.closeAsync()
// We can do this repeatedly.
abw.closeAsync()
abw.closeAsync()
// wait for result
stats, err := abw.waitFinished()
require.NoError(t, err)
require.Equal(t, uint64(series), stats.NumSeries)
require.Equal(t, uint64(series), stats.NumChunks)
// We get the same result on subsequent calls to waitFinished.
for i := 0; i < 5; i++ {
newstats, err := abw.waitFinished()
require.NoError(t, err)
require.Equal(t, stats, newstats)
// We can call close async again, as long as it's on the same goroutine.
abw.closeAsync()
}
}
func TestAsyncBlockWriterFailure(t *testing.T) {
cw, err := chunks.NewWriter(t.TempDir())
require.NoError(t, err)
// We don't write symbols to this index writer, so adding series next will fail.
iw, err := index.NewWriter(context.Background(), filepath.Join(t.TempDir(), indexFilename))
// async block writer expects index writer ready to receive series.
abw := newAsyncBlockWriter(chunkenc.NewPool(), cw, iw, semaphore.NewWeighted(int64(1)))
// Adding single series doesn't fail, as it just puts it onto the queue.
require.NoError(t, abw.addSeries(labels.FromStrings("__name__", "test"), []chunks.Meta{{Chunk: randomChunk(t), MinTime: 0, MaxTime: math.MaxInt64}}))
// Signal that no more series are coming.
abw.closeAsync()
// We can do this repeatedly.
abw.closeAsync()
abw.closeAsync()
// Wait for result, this time we get error due to missing symbols.
_, err = abw.waitFinished()
require.Error(t, err)
require.ErrorContains(t, err, "unknown symbol")
// We get the same error on each repeated call to waitFinished.
for i := 0; i < 5; i++ {
_, nerr := abw.waitFinished()
require.Equal(t, err, nerr)
// We can call close async again, as long as it's on the same goroutine.
abw.closeAsync()
}
}
func randomChunk(t *testing.T) chunkenc.Chunk {
chunk := chunkenc.NewXORChunk()
l := rand.Int() % 120
app, err := chunk.Appender()
require.NoError(t, err)
for i := 0; i < l; i++ {
app.Append(rand.Int63(), rand.Float64())
}
return chunk
}