Fixed race in Chunks method. (#6515)

Added regression test.

Fixes #6512

Before (not deterministic result due to concurrency):
```
=== RUN   TestChunkReader_ConcurrentRead
--- FAIL: TestChunkReader_ConcurrentRead (0.01s)
    db_test.go:2702: unexpected error: checksum mismatch expected:597ad276, actual:597ad276
    db_test.go:2702: unexpected error: checksum mismatch expected:dd0cdbc2, actual:dd0cdbc2
FAIL
```

After succuess on multiple runs.


Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2019-12-24 22:55:22 +01:00 committed by GitHub
parent 9398b9ac1b
commit 3b8ef6386c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 22 deletions

View file

@ -460,12 +460,10 @@ type Reader struct {
cs []io.Closer // Closers for resources behind the byte slices. cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader. size int64 // The total size of bytes in the reader.
pool chunkenc.Pool pool chunkenc.Pool
crc32 hash.Hash
buf [binary.MaxVarintLen32]byte
} }
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs, crc32: newCRC32()} cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64 var totalSize int64
for i, b := range cr.bs { for i, b := range cr.bs {
@ -541,6 +539,7 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
// Get the lower 4 bytes. // Get the lower 4 bytes.
// These contain the segment offset where the data for this chunk starts. // These contain the segment offset where the data for this chunk starts.
chkStart = int((ref << 32) >> 32) chkStart = int((ref << 32) >> 32)
chkCRC32 = newCRC32()
) )
if sgmIndex >= len(s.bs) { if sgmIndex >= len(s.bs) {
@ -569,12 +568,12 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len()) return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
} }
sum := sgmBytes.Range(chkEnd-crc32.Size, chkEnd) sum := sgmBytes.Range(chkDataEnd, chkEnd)
s.crc32.Reset() if _, err := chkCRC32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil {
if _, err := s.crc32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil {
return nil, err return nil, err
} }
if act := s.crc32.Sum(s.buf[:0]); !bytes.Equal(act, sum) {
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act) return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act)
} }

View file

@ -25,6 +25,7 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
@ -2485,9 +2486,9 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
testutil.Equals(t, 1000.0, sum) testutil.Equals(t, 1000.0, sum)
} }
// TestChunkWriter ensures that chunk segment are cut at the set segment size and // TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and
// that the resulted segments includes the expected chunks data. // that the resulted segments includes the expected chunks data.
func TestChunkWriter(t *testing.T) { func TestChunkWriter_ReadAfterWrite(t *testing.T) {
chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}) chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}})
chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}) chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}})
chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}) chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}})
@ -2611,22 +2612,19 @@ func TestChunkWriter(t *testing.T) {
for i, test := range tests { for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_chunk_witer") tempDir, err := ioutil.TempDir("", "test_chunk_writer")
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
chunkw, err := chunks.NewWriterWithSegSize(tmpdir, chunks.SegmentHeaderSize+int64(test.segmentSize)) chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize))
testutil.Ok(t, err) testutil.Ok(t, err)
for _, chks := range test.chks { for _, chks := range test.chks {
chunkw.WriteChunks(chks...) testutil.Ok(t, chunkw.WriteChunks(chks...))
} }
testutil.Ok(t, chunkw.Close()) testutil.Ok(t, chunkw.Close())
files, err := ioutil.ReadDir(tmpdir) files, err := ioutil.ReadDir(tempDir)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch") testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch")
@ -2655,7 +2653,7 @@ func TestChunkWriter(t *testing.T) {
testutil.Equals(t, sizeExp, sizeAct) testutil.Equals(t, sizeExp, sizeAct)
// Check the content of the chunks. // Check the content of the chunks.
r, err := chunks.NewDirReader(tmpdir, nil) r, err := chunks.NewDirReader(tempDir, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
for _, chks := range test.chks { for _, chks := range test.chks {
@ -2668,3 +2666,43 @@ func TestChunkWriter(t *testing.T) {
}) })
} }
} }
// TestChunkReader_ConcurrentReads checks that the chunk result can be read concurrently.
// Regression test for https://github.com/prometheus/prometheus/pull/6514.
func TestChunkReader_ConcurrentReads(t *testing.T) {
chks := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}),
}
tempDir, err := ioutil.TempDir("", "test_chunk_writer")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
chunkw, err := chunks.NewWriter(tempDir)
testutil.Ok(t, err)
testutil.Ok(t, chunkw.WriteChunks(chks...))
testutil.Ok(t, chunkw.Close())
r, err := chunks.NewDirReader(tempDir, nil)
testutil.Ok(t, err)
var wg sync.WaitGroup
for _, chk := range chks {
for i := 0; i < 100; i++ {
wg.Add(1)
go func(chunk chunks.Meta) {
defer wg.Done()
chkAct, err := r.Chunk(chunk.Ref)
testutil.Ok(t, err)
testutil.Equals(t, chunk.Chunk.Bytes(), chkAct.Bytes())
}(chk)
}
wg.Wait()
}
}