diff --git a/tsdb/block_test.go b/tsdb/block_test.go index d0f0a44413..584037c577 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "errors" + "hash/crc32" "io/ioutil" "math/rand" "os" @@ -88,14 +89,16 @@ func TestCreateBlock(t *testing.T) { func TestCorruptedChunk(t *testing.T) { for name, test := range map[string]struct { corrFunc func(f *os.File) // Func that applies the corruption. - expErr error + openErr error + queryErr error }{ "invalid header size": { func(f *os.File) { err := f.Truncate(1) testutil.Ok(t, err) }, - errors.New("invalid chunk header in segment 0: invalid size"), + errors.New("invalid segment header in segment 0: invalid size"), + nil, }, "invalid magic number": { func(f *os.File) { @@ -111,6 +114,7 @@ func TestCorruptedChunk(t *testing.T) { testutil.Equals(t, chunks.MagicChunksSize, n) }, errors.New("invalid magic number 0"), + nil, }, "invalid chunk format version": { func(f *os.File) { @@ -126,6 +130,45 @@ func TestCorruptedChunk(t *testing.T) { testutil.Equals(t, chunks.ChunksFormatVersionSize, n) }, errors.New("invalid chunk format version 0"), + nil, + }, + "chunk not enough bytes to read the chunk length": { + func(f *os.File) { + // Truncate one byte after the segment header. + err := f.Truncate(chunks.SegmentHeaderSize + 1) + testutil.Ok(t, err) + }, + nil, + errors.New("segment doesn't include enough bytes to read the chunk size data field - required:13, available:9"), + }, + "chunk not enough bytes to read the data": { + func(f *os.File) { + fi, err := f.Stat() + testutil.Ok(t, err) + + err = f.Truncate(fi.Size() - 1) + testutil.Ok(t, err) + }, + nil, + errors.New("segment doesn't include enough bytes to read the chunk - required:26, available:25"), + }, + "checksum mismatch": { + func(f *os.File) { + fi, err := f.Stat() + testutil.Ok(t, err) + + // Get the chunk data end offset. + chkEndOffset := int(fi.Size()) - crc32.Size + + // Seek to the last byte of chunk data and modify it. + _, err = f.Seek(int64(chkEndOffset-1), 0) + testutil.Ok(t, err) + n, err := f.Write([]byte("x")) + testutil.Ok(t, err) + testutil.Equals(t, n, 1) + }, + nil, + errors.New("checksum mismatch expected:cfc0526c, actual:34815eae"), }, } { t.Run(name, func(t *testing.T) { @@ -135,7 +178,8 @@ func TestCorruptedChunk(t *testing.T) { testutil.Ok(t, os.RemoveAll(tmpdir)) }() - blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) + series := newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{sample{1, 1}}) + blockDir := createBlock(t, tmpdir, []Series{series}) files, err := sequenceFiles(chunkDir(blockDir)) testutil.Ok(t, err) testutil.Assert(t, len(files) > 0, "No chunk created.") @@ -147,8 +191,21 @@ func TestCorruptedChunk(t *testing.T) { test.corrFunc(f) testutil.Ok(t, f.Close()) - _, err = OpenBlock(nil, blockDir, nil) - testutil.Equals(t, test.expErr.Error(), err.Error()) + // Check open err. + b, err := OpenBlock(nil, blockDir, nil) + if test.openErr != nil { + testutil.Equals(t, test.openErr.Error(), err.Error()) + return + } + + querier, err := NewBlockQuerier(b, 0, 1) + testutil.Ok(t, err) + set, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + testutil.Ok(t, err) + + // Check query err. + testutil.Equals(t, false, set.Next()) + testutil.Equals(t, test.queryErr.Error(), set.Err().Error()) }) } } diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 05ff7bc1b5..418541f588 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -15,6 +15,7 @@ package chunks import ( "bufio" + "bytes" "encoding/binary" "fmt" "hash" @@ -31,22 +32,32 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" ) +// Segment header fields constants. const ( // MagicChunks is 4 bytes at the head of a series file. MagicChunks = 0x85BD40DD // MagicChunksSize is the size in bytes of MagicChunks. - MagicChunksSize = 4 + MagicChunksSize = 4 + chunksFormatV1 = 1 + ChunksFormatVersionSize = 1 + segmentHeaderPaddingSize = 3 + // SegmentHeaderSize defines the total size of the header part. + SegmentHeaderSize = MagicChunksSize + ChunksFormatVersionSize + segmentHeaderPaddingSize +) - chunksFormatV1 = 1 - ChunksFormatVersionSize = 1 - - chunkHeaderSize = MagicChunksSize + ChunksFormatVersionSize +// Chunk fields constants. +const ( + // MaxChunkLengthFieldSize defines the maximum size of the data length part. + MaxChunkLengthFieldSize = binary.MaxVarintLen32 + // ChunkEncodingSize defines the size of the chunk encoding part. + ChunkEncodingSize = 1 ) // Meta holds information about a chunk of data. type Meta struct { // Ref and Chunk hold either a reference that can be used to retrieve // chunk data or the data itself. + // When it is a reference it is the segment offset at which the chunk bytes start. // Generally, only one of them is set. Ref uint64 Chunk chunkenc.Chunk @@ -104,11 +115,27 @@ type Writer struct { } const ( - defaultChunkSegmentSize = 512 * 1024 * 1024 + // DefaultChunkSegmentSize is the default chunks segment size. + DefaultChunkSegmentSize = 512 * 1024 * 1024 ) -// NewWriter returns a new writer against the given directory. +// NewWriterWithSegSize returns a new writer against the given directory +// and allows setting a custom size for the segments. +func NewWriterWithSegSize(dir string, segmentSize int64) (*Writer, error) { + return newWriter(dir, segmentSize) +} + +// NewWriter returns a new writer against the given directory +// using the default segment size. func NewWriter(dir string) (*Writer, error) { + return newWriter(dir, DefaultChunkSegmentSize) +} + +func newWriter(dir string, segmentSize int64) (*Writer, error) { + if segmentSize <= 0 { + segmentSize = DefaultChunkSegmentSize + } + if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } @@ -116,13 +143,12 @@ func NewWriter(dir string) (*Writer, error) { if err != nil { return nil, err } - cw := &Writer{ + return &Writer{ dirFile: dirFile, n: 0, crc32: newCRC32(), - segmentSize: defaultChunkSegmentSize, - } - return cw, nil + segmentSize: segmentSize, + }, nil } func (w *Writer) tail() *os.File { @@ -180,13 +206,15 @@ func (w *Writer) cut() error { } // Write header metadata for new file. - metab := make([]byte, 8) + metab := make([]byte, SegmentHeaderSize) binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks) metab[4] = chunksFormatV1 - if _, err := f.Write(metab); err != nil { + n, err := f.Write(metab) + if err != nil { return err } + w.n = int64(n) w.files = append(w.files, f) if w.wbuf != nil { @@ -194,7 +222,6 @@ func (w *Writer) cut() error { } else { w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) } - w.n = 8 return nil } @@ -284,27 +311,87 @@ func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) { return newChunk, nil } +// WriteChunks writes as many chunks as possible to the current segment, +// cuts a new segment when the current segment is full and +// writes the rest of the chunks in the new segment. func (w *Writer) WriteChunks(chks ...Meta) error { - // Calculate maximum space we need and cut a new segment in case - // we don't fit into the current one. - maxLen := int64(binary.MaxVarintLen32) // The number of chunks. - for _, c := range chks { - maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. - maxLen += int64(len(c.Chunk.Bytes())) - maxLen += 4 // The 4 bytes of crc32 - } - newsz := w.n + maxLen + var ( + batchSize = int64(0) + batchStart = 0 + batches = make([][]Meta, 1) + batchID = 0 + firstBatch = true + ) - if w.wbuf == nil || newsz > w.segmentSize && maxLen <= w.segmentSize { + for i, chk := range chks { + // Each chunk contains: data length + encoding + the data itself + crc32 + chkSize := int64(MaxChunkLengthFieldSize) // The data length is a variable length field so use the maximum possible value. + chkSize += ChunkEncodingSize // The chunk encoding. + chkSize += int64(len(chk.Chunk.Bytes())) // The data itself. + chkSize += crc32.Size // The 4 bytes of crc32. + batchSize += chkSize + + // Cut a new batch when it is not the first chunk(to avoid empty segments) and + // the batch is too large to fit in the current segment. + cutNewBatch := (i != 0) && (batchSize+SegmentHeaderSize > w.segmentSize) + + // When the segment already has some data than + // the first batch size calculation should account for that. + if firstBatch && w.n > SegmentHeaderSize { + cutNewBatch = batchSize+w.n > w.segmentSize + if cutNewBatch { + firstBatch = false + } + } + + if cutNewBatch { + batchStart = i + batches = append(batches, []Meta{}) + batchID++ + batchSize = chkSize + } + batches[batchID] = chks[batchStart : i+1] + } + + // Create a new segment when one doesn't already exist. + if w.n == 0 { if err := w.cut(); err != nil { return err } } + for i, chks := range batches { + if err := w.writeChunks(chks); err != nil { + return err + } + // Cut a new segment only when there are more chunks to write. + // Avoid creating a new empty segment at the end of the write. + if i < len(batches)-1 { + if err := w.cut(); err != nil { + return err + } + } + } + return nil +} + +// writeChunks writes the chunks into the current segment irrespective +// of the configured segment size limit. A segment should have been already +// started before calling this. +func (w *Writer) writeChunks(chks []Meta) error { + if len(chks) == 0 { + return nil + } + var seq = uint64(w.seq()) << 32 for i := range chks { chk := &chks[i] + // The reference is set to the segment index and the offset where + // the data starts for this chunk. + // + // The upper 4 bytes are for the segment index and + // the lower 4 bytes are for the segment offset where to start reading this chunk. chk.Ref = seq | uint64(w.n) n := binary.PutUvarint(w.buf[:], uint64(len(chk.Chunk.Bytes()))) @@ -328,7 +415,6 @@ func (w *Writer) WriteChunks(chks ...Meta) error { return err } } - return nil } @@ -368,19 +454,23 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // Reader implements a ChunkReader for a serialized byte stream // of series data. type Reader struct { - bs []ByteSlice // The underlying bytes holding the encoded series data. - cs []io.Closer // Closers for resources behind the byte slices. - size int64 // The total size of bytes in the reader. - pool chunkenc.Pool + // The underlying bytes holding the encoded series data. + // Each slice holds the data for a different segment. + bs []ByteSlice + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. + pool chunkenc.Pool + crc32 hash.Hash + buf [binary.MaxVarintLen32]byte } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { - cr := Reader{pool: pool, bs: bs, cs: cs} + cr := Reader{pool: pool, bs: bs, cs: cs, crc32: newCRC32()} var totalSize int64 for i, b := range cr.bs { - if b.Len() < chunkHeaderSize { - return nil, errors.Wrapf(errInvalidSize, "invalid chunk header in segment %d", i) + if b.Len() < SegmentHeaderSize { + return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i) } // Verify magic number. if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks { @@ -445,28 +535,52 @@ func (s *Reader) Size() int64 { // Chunk returns a chunk from a given reference. func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( - sgmSeq = int(ref >> 32) - sgmOffset = int((ref << 32) >> 32) + // Get the upper 4 bytes. + // These contain the segment index. + sgmIndex = int(ref >> 32) + // Get the lower 4 bytes. + // These contain the segment offset where the data for this chunk starts. + chkStart = int((ref << 32) >> 32) ) - if sgmSeq >= len(s.bs) { - return nil, errors.Errorf("reference sequence %d out of range", sgmSeq) - } - chkS := s.bs[sgmSeq] - if sgmOffset >= chkS.Len() { - return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len()) + if sgmIndex >= len(s.bs) { + return nil, errors.Errorf("segment index %d out of range", sgmIndex) + } + + sgmBytes := s.bs[sgmIndex] + + if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() { + return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len()) } // With the minimum chunk length this should never cause us reading // over the end of the slice. - chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32) - - chkLen, n := binary.Uvarint(chk) + c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize) + chkDataLen, n := binary.Uvarint(c) if n <= 0 { return nil, errors.Errorf("reading chunk length failed with %d", n) } - chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen)) - return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen]) + chkEncStart := chkStart + n + chkEnd := chkEncStart + ChunkEncodingSize + int(chkDataLen) + crc32.Size + chkDataStart := chkEncStart + ChunkEncodingSize + chkDataEnd := chkEnd - crc32.Size + + if chkEnd > sgmBytes.Len() { + return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len()) + } + + sum := sgmBytes.Range(chkEnd-crc32.Size, chkEnd) + s.crc32.Reset() + if _, err := s.crc32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil { + return nil, err + } + if act := s.crc32.Sum(s.buf[:0]); !bytes.Equal(act, sum) { + return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act) + } + + chkData := sgmBytes.Range(chkDataStart, chkDataEnd) + chkEnc := sgmBytes.Range(chkEncStart, chkEncStart+ChunkEncodingSize)[0] + return s.pool.Get(chunkenc.Encoding(chkEnc), chkData) } func nextSequenceFile(dir string) (string, int, error) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 6135d3a850..39df39cdac 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -14,7 +14,9 @@ package tsdb import ( + "encoding/binary" "fmt" + "hash/crc32" "io/ioutil" "math" "math/rand" @@ -22,6 +24,7 @@ import ( "path" "path/filepath" "sort" + "strconv" "testing" "time" @@ -2480,3 +2483,187 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { testutil.Ok(t, seriesSet.Err()) testutil.Equals(t, 1000.0, sum) } + +// TestChunkWriter ensures that chunk segment are cut at the set segment size and +// that the resulted segments includes the expected chunks data. +func TestChunkWriter(t *testing.T) { + chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}) + chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}) + chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}) + chk4 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}) + chk5 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}) + chunkSize := len(chk1.Chunk.Bytes()) + chunks.MaxChunkLengthFieldSize + chunks.ChunkEncodingSize + crc32.Size + + tests := []struct { + chks [][]chunks.Meta + segmentSize, + expSegmentsCount int + expSegmentSizes []int + }{ + // 0:Last chunk ends at the segment boundary so + // all chunks should fit in a single segment. + { + chks: [][]chunks.Meta{ + []chunks.Meta{ + chk1, + chk2, + chk3, + }, + }, + segmentSize: 3 * chunkSize, + expSegmentSizes: []int{3 * chunkSize}, + expSegmentsCount: 1, + }, + // 1:Two chunks can fit in a single segment so the last one should result in a new segment. + { + chks: [][]chunks.Meta{ + []chunks.Meta{ + chk1, + chk2, + chk3, + chk4, + chk5, + }, + }, + segmentSize: 2 * chunkSize, + expSegmentSizes: []int{2 * chunkSize, 2 * chunkSize, chunkSize}, + expSegmentsCount: 3, + }, + // 2:When the segment size is smaller than the size of 2 chunks + // the last segment should still create a new segment. + { + chks: [][]chunks.Meta{ + []chunks.Meta{ + chk1, + chk2, + chk3, + }, + }, + segmentSize: 2*chunkSize - 1, + expSegmentSizes: []int{chunkSize, chunkSize, chunkSize}, + expSegmentsCount: 3, + }, + // 3:When the segment is smaller than a single chunk + // it should still be written by ignoring the max segment size. + { + chks: [][]chunks.Meta{ + []chunks.Meta{ + chk1, + }, + }, + segmentSize: chunkSize - 1, + expSegmentSizes: []int{chunkSize}, + expSegmentsCount: 1, + }, + // 4:All chunks are bigger than the max segment size, but + // these should still be written even when this will result in bigger segment than the set size. + // Each segment will hold a single chunk. + { + chks: [][]chunks.Meta{ + []chunks.Meta{ + chk1, + chk2, + chk3, + }, + }, + segmentSize: 1, + expSegmentSizes: []int{chunkSize, chunkSize, chunkSize}, + expSegmentsCount: 3, + }, + // 5:Adding multiple batches of chunks. + { + chks: [][]chunks.Meta{ + []chunks.Meta{ + chk1, + chk2, + chk3, + }, + []chunks.Meta{ + chk4, + chk5, + }, + }, + segmentSize: 3 * chunkSize, + expSegmentSizes: []int{3 * chunkSize, 2 * chunkSize}, + expSegmentsCount: 2, + }, + // 6:Adding multiple batches of chunks. + { + chks: [][]chunks.Meta{ + []chunks.Meta{ + chk1, + }, + []chunks.Meta{ + chk2, + chk3, + }, + []chunks.Meta{ + chk4, + }, + }, + segmentSize: 2 * chunkSize, + expSegmentSizes: []int{2 * chunkSize, 2 * chunkSize}, + expSegmentsCount: 2, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + + tmpdir, err := ioutil.TempDir("", "test_chunk_witer") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() + + chunkw, err := chunks.NewWriterWithSegSize(tmpdir, chunks.SegmentHeaderSize+int64(test.segmentSize)) + testutil.Ok(t, err) + + for _, chks := range test.chks { + chunkw.WriteChunks(chks...) + } + + testutil.Ok(t, chunkw.Close()) + + files, err := ioutil.ReadDir(tmpdir) + testutil.Ok(t, err) + testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch") + + // Verify that all data is written to the segments. + sizeExp := 0 + sizeAct := 0 + + for _, chks := range test.chks { + for _, chk := range chks { + l := make([]byte, binary.MaxVarintLen32) + sizeExp += binary.PutUvarint(l, uint64(len(chk.Chunk.Bytes()))) // The length field. + sizeExp += chunks.ChunkEncodingSize + sizeExp += len(chk.Chunk.Bytes()) // The data itself. + sizeExp += crc32.Size // The 4 bytes of crc32 + } + } + sizeExp += test.expSegmentsCount * chunks.SegmentHeaderSize // The segment header bytes. + + for i, f := range files { + size := int(f.Size()) + // Verify that the segment is the same or smaller than the expected size. + testutil.Assert(t, chunks.SegmentHeaderSize+test.expSegmentSizes[i] >= size, "Segment:%v should NOT be bigger than:%v actual:%v", i, chunks.SegmentHeaderSize+test.expSegmentSizes[i], size) + + sizeAct += size + } + testutil.Equals(t, sizeExp, sizeAct) + + // Check the content of the chunks. + r, err := chunks.NewDirReader(tmpdir, nil) + testutil.Ok(t, err) + + for _, chks := range test.chks { + for _, chkExp := range chks { + chkAct, err := r.Chunk(chkExp.Ref) + testutil.Ok(t, err) + testutil.Equals(t, chkExp.Chunk.Bytes(), chkAct.Bytes()) + } + } + }) + } +}