diff --git a/CHANGELOG.md b/CHANGELOG.md index 12156d9df..d0795087a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ ## master / unreleased - [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609) - [BUGFIX] Re-calculate block size when calling `block.Delete`. -- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before. + - [BUGFIX] Re-encode all head chunks at compaction that are open (being appended to) or outside the Maxt block range. This avoids writing out corrupt data. It happens when snapshotting with the head included. + - [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before. ## 0.8.0 - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. diff --git a/block_test.go b/block_test.go index 6718051a7..c56d08c44 100644 --- a/block_test.go +++ b/block_test.go @@ -16,6 +16,7 @@ package tsdb import ( "context" "encoding/binary" + "errors" "io/ioutil" "math/rand" @@ -56,7 +57,7 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, os.RemoveAll(tmpdir)) }() - blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) + blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) @@ -133,7 +134,7 @@ func TestCorruptedChunk(t *testing.T) { testutil.Ok(t, os.RemoveAll(tmpdir)) }() - blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) + blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) files, err := sequenceFiles(chunkDir(blockDir)) testutil.Ok(t, err) testutil.Assert(t, len(files) > 0, "No chunk created.") @@ -213,7 +214,9 @@ func createBlock(tb testing.TB, dir string, series []Series) string { testutil.Ok(tb, os.MkdirAll(dir, 0777)) - ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) testutil.Ok(tb, err) return filepath.Join(dir, ulid.String()) } @@ -265,7 +268,7 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series { lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) } samples := make([]tsdbutil.Sample, 0, maxt-mint+1) - for t := mint; t <= maxt; t++ { + for t := mint; t < maxt; t++ { samples = append(samples, sample{t: t, v: rand.Float64()}) } series[i] = newSeries(lbls, samples) diff --git a/chunks/chunks.go b/chunks/chunks.go index 70cb119c5..9ce8c57da 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -51,7 +51,9 @@ type Meta struct { Ref uint64 Chunk chunkenc.Chunk - MinTime, MaxTime int64 // time range the data covers + // Time range the data covers. + // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. + MinTime, MaxTime int64 } // writeHash writes the chunk encoding and raw data into the provided hash. @@ -218,7 +220,7 @@ func MergeOverlappingChunks(chks []Meta) ([]Meta, error) { // So never overlaps with newChks[last-1] or anything before that. if c.MinTime > newChks[last].MaxTime { newChks = append(newChks, c) - last += 1 + last++ continue } nc := &newChks[last] diff --git a/compact.go b/compact.go index 10368250d..e19b7ed76 100644 --- a/compact.go +++ b/compact.go @@ -757,6 +757,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for i, chk := range chks { + // Re-encode head chunks that are still open (being appended to) or + // outside the compacted MaxTime range. + // The chunk.Bytes() method is not safe for open chunks hence the re-encoding. + // This happens when snapshotting the head block. + // + // Block time range is half-open: [meta.MinTime, meta.MaxTime) and + // chunks are closed hence the chk.MaxTime >= meta.MaxTime check. + // + // TODO think how to avoid the typecasting to verify when it is head block. + if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime { + dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) + + } else + // Sanity check for disk blocks. + // chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that. if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime { return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d", chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime) @@ -774,12 +789,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} + + var ( + t int64 + v float64 + ) for it.Next() { - ts, v := it.At() - app.Append(ts, v) + t, v = it.At() + app.Append(t, v) + } + if err := it.Err(); err != nil { + return errors.Wrap(err, "iterate chunk while re-encoding") } chks[i].Chunk = newChunk + chks[i].MaxTime = t } } diff --git a/db.go b/db.go index 257a258f1..e07f7d3e7 100644 --- a/db.go +++ b/db.go @@ -943,8 +943,20 @@ func (db *DB) Snapshot(dir string, withHead bool) error { if !withHead { return nil } - _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil) - return errors.Wrap(err, "snapshot head block") + + mint := db.head.MinTime() + maxt := db.head.MaxTime() + head := &rangeHead{ + head: db.head, + mint: mint, + maxt: maxt, + } + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + if _, err := db.compactor.Write(dir, head, mint, maxt+1, nil); err != nil { + return errors.Wrap(err, "snapshot head block") + } + return nil } // Querier returns a new querier over the data partition for the given time range. diff --git a/db_test.go b/db_test.go index 27ab82cdf..715450ee8 100644 --- a/db_test.go +++ b/db_test.go @@ -489,6 +489,62 @@ func TestDB_Snapshot(t *testing.T) { testutil.Equals(t, 1000.0, sum) } +// TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples +// that are outside the set block time range. +// See https://github.com/prometheus/prometheus/issues/5105 +func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { + db, delete := openTestDB(t, nil) + defer delete() + + app := db.Appender() + mint := int64(1414141414000) + for i := 0; i < 1000; i++ { + _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + testutil.Ok(t, app.Rollback()) + + snap, err := ioutil.TempDir("", "snap") + testutil.Ok(t, err) + + // Hackingly introduce "race", by having lower max time then maxTime in last chunk. + db.head.maxTime = db.head.maxTime - 10 + + defer func() { + testutil.Ok(t, os.RemoveAll(snap)) + }() + testutil.Ok(t, db.Snapshot(snap, true)) + testutil.Ok(t, db.Close()) + + // Reopen DB from snapshot. + db, err = Open(snap, nil, nil, nil) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, db.Close()) }() + + querier, err := db.Querier(mint, mint+1000) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, querier.Close()) }() + + // Sum values. + seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + testutil.Ok(t, err) + + sum := 0.0 + for seriesSet.Next() { + series := seriesSet.At().Iterator() + for series.Next() { + _, v := series.At() + sum += v + } + testutil.Ok(t, series.Err()) + } + testutil.Ok(t, seriesSet.Err()) + + // Since we snapshotted with MaxTime - 10, so expect 10 less samples. + testutil.Equals(t, 1000.0-10, sum) +} + func TestDB_SnapshotWithDelete(t *testing.T) { numSamples := int64(10) @@ -930,7 +986,7 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 0)) + blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1)) block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. @@ -974,7 +1030,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 0)), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) diff --git a/head.go b/head.go index 44c71c74c..5e2eae858 100644 --- a/head.go +++ b/head.go @@ -1298,9 +1298,15 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks if !c.OverlapsClosedInterval(h.mint, h.maxt) { continue } + // Set the head chunks as open (being appended to). + maxTime := c.maxTime + if s.headChunk == c { + maxTime = math.MaxInt64 + } + *chks = append(*chks, chunks.Meta{ MinTime: c.minTime, - MaxTime: c.maxTime, + MaxTime: maxTime, Ref: packChunkID(s.ref, uint64(s.chunkID(i))), }) }