Re-encode chunks that are still being appended to when snapshoti… (#641)

* re encode all head chunks outside the time range.

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>
This commit is contained in:
Krasi Georgiev 2019-07-03 13:47:31 +03:00 committed by GitHub
parent dc657df591
commit 31f7990d1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 118 additions and 14 deletions

View file

@ -1,6 +1,7 @@
## master / unreleased ## master / unreleased
- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609) - [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
- [BUGFIX] Re-calculate block size when calling `block.Delete`. - [BUGFIX] Re-calculate block size when calling `block.Delete`.
- [BUGFIX] Re-encode all head chunks at compaction that are open (being appended to) or outside the Maxt block range. This avoids writing out corrupt data. It happens when snapshotting with the head included.
- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before. - [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before.
## 0.8.0 ## 0.8.0

View file

@ -16,6 +16,7 @@ package tsdb
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"errors" "errors"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -56,7 +57,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir)) testutil.Ok(t, os.RemoveAll(tmpdir))
}() }()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
b, err := OpenBlock(nil, blockDir, nil) b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Equals(t, false, b.meta.Compaction.Failed)
@ -133,7 +134,7 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir)) testutil.Ok(t, os.RemoveAll(tmpdir))
}() }()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
files, err := sequenceFiles(chunkDir(blockDir)) files, err := sequenceFiles(chunkDir(blockDir))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Assert(t, len(files) > 0, "No chunk created.") testutil.Assert(t, len(files) > 0, "No chunk created.")
@ -213,7 +214,9 @@ func createBlock(tb testing.TB, dir string, series []Series) string {
testutil.Ok(tb, os.MkdirAll(dir, 0777)) testutil.Ok(tb, os.MkdirAll(dir, 0777))
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil)
testutil.Ok(tb, err) testutil.Ok(tb, err)
return filepath.Join(dir, ulid.String()) return filepath.Join(dir, ulid.String())
} }
@ -265,7 +268,7 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
} }
samples := make([]tsdbutil.Sample, 0, maxt-mint+1) samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t <= maxt; t++ { for t := mint; t < maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()}) samples = append(samples, sample{t: t, v: rand.Float64()})
} }
series[i] = newSeries(lbls, samples) series[i] = newSeries(lbls, samples)

View file

@ -51,7 +51,9 @@ type Meta struct {
Ref uint64 Ref uint64
Chunk chunkenc.Chunk Chunk chunkenc.Chunk
MinTime, MaxTime int64 // time range the data covers // Time range the data covers.
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
MinTime, MaxTime int64
} }
// writeHash writes the chunk encoding and raw data into the provided hash. // writeHash writes the chunk encoding and raw data into the provided hash.
@ -218,7 +220,7 @@ func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
// So never overlaps with newChks[last-1] or anything before that. // So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime { if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c) newChks = append(newChks, c)
last += 1 last++
continue continue
} }
nc := &newChks[last] nc := &newChks[last]

View file

@ -757,6 +757,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
for i, chk := range chks { for i, chk := range chks {
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block.
//
// Block time range is half-open: [meta.MinTime, meta.MaxTime) and
// chunks are closed hence the chk.MaxTime >= meta.MaxTime check.
//
// TODO think how to avoid the typecasting to verify when it is head block.
if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime {
dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})
} else
// Sanity check for disk blocks.
// chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that.
if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime { if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d", return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime) chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
@ -774,12 +789,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
var (
t int64
v float64
)
for it.Next() { for it.Next() {
ts, v := it.At() t, v = it.At()
app.Append(ts, v) app.Append(t, v)
}
if err := it.Err(); err != nil {
return errors.Wrap(err, "iterate chunk while re-encoding")
} }
chks[i].Chunk = newChunk chks[i].Chunk = newChunk
chks[i].MaxTime = t
} }
} }

14
db.go
View file

@ -943,9 +943,21 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
if !withHead { if !withHead {
return nil return nil
} }
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil)
mint := db.head.MinTime()
maxt := db.head.MaxTime()
head := &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
}
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
if _, err := db.compactor.Write(dir, head, mint, maxt+1, nil); err != nil {
return errors.Wrap(err, "snapshot head block") return errors.Wrap(err, "snapshot head block")
} }
return nil
}
// Querier returns a new querier over the data partition for the given time range. // Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier. // A goroutine must not handle more than one open Querier.

View file

@ -489,6 +489,62 @@ func TestDB_Snapshot(t *testing.T) {
testutil.Equals(t, 1000.0, sum) testutil.Equals(t, 1000.0, sum)
} }
// TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples
// that are outside the set block time range.
// See https://github.com/prometheus/prometheus/issues/5105
func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
db, delete := openTestDB(t, nil)
defer delete()
app := db.Appender()
mint := int64(1414141414000)
for i := 0; i < 1000; i++ {
_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
testutil.Ok(t, app.Rollback())
snap, err := ioutil.TempDir("", "snap")
testutil.Ok(t, err)
// Hackingly introduce "race", by having lower max time then maxTime in last chunk.
db.head.maxTime = db.head.maxTime - 10
defer func() {
testutil.Ok(t, os.RemoveAll(snap))
}()
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())
// Reopen DB from snapshot.
db, err = Open(snap, nil, nil, nil)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, db.Close()) }()
querier, err := db.Querier(mint, mint+1000)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, querier.Close()) }()
// Sum values.
seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
testutil.Ok(t, err)
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
for series.Next() {
_, v := series.At()
sum += v
}
testutil.Ok(t, series.Err())
}
testutil.Ok(t, seriesSet.Err())
// Since we snapshotted with MaxTime - 10, so expect 10 less samples.
testutil.Equals(t, 1000.0-10, sum)
}
func TestDB_SnapshotWithDelete(t *testing.T) { func TestDB_SnapshotWithDelete(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
@ -930,7 +986,7 @@ func TestTombstoneCleanFail(t *testing.T) {
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2 totalBlocks := 2
for i := 0; i < totalBlocks; i++ { for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 0)) blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1))
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction. // Add some some fake tombstones to trigger the compaction.
@ -974,7 +1030,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
} }
block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 0)), nil) block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil)
testutil.Ok(c.t, err) testutil.Ok(c.t, err)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)

View file

@ -1298,9 +1298,15 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
if !c.OverlapsClosedInterval(h.mint, h.maxt) { if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue continue
} }
// Set the head chunks as open (being appended to).
maxTime := c.maxTime
if s.headChunk == c {
maxTime = math.MaxInt64
}
*chks = append(*chks, chunks.Meta{ *chks = append(*chks, chunks.Meta{
MinTime: c.minTime, MinTime: c.minTime,
MaxTime: c.maxTime, MaxTime: maxTime,
Ref: packChunkID(s.ref, uint64(s.chunkID(i))), Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
}) })
} }