Merge pull request #348 from BenoitKnecht/fix-block-boundaries

Make sure blocks don't overlap to avoid outsider chunks
This commit is contained in:
Fabian Reinartz 2018-07-05 12:54:34 +02:00 committed by GitHub
commit 77db94c07e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 122 additions and 21 deletions

View file

@ -447,7 +447,7 @@ Outer:
}
for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = Intervals{{tmin, tmax}}
@ -539,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error {
return nil
}
// Returns true if the block overlaps [mint, maxt].
func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
// The block itself is a half-open interval
// [pb.meta.MinTime, pb.meta.MaxTime).
return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
}
func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint {
a = mint

View file

@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error {
return nil
}
// Returns true if the chunk overlaps [mint, maxt].
func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
return cm.MinTime <= maxt && mint <= cm.MaxTime
}
var (
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")

View file

@ -592,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
for i, chk := range chks {
if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
continue
}

19
db.go
View file

@ -360,7 +360,13 @@ func (db *DB) compact() (changes bool, err error) {
head := &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
// We remove 1 millisecond from maxt because block
// intervals are half-open: [b.MinTime, b.MaxTime). But
// chunk intervals are closed: [c.MinTime, c.MaxTime];
// so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value
// from the block interval here.
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
return changes, errors.Wrap(err, "persist head block")
@ -756,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
if b.OverlapsClosedInterval(mint, maxt) {
blocks = append(blocks, b)
}
}
@ -799,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
if b.OverlapsClosedInterval(mint, maxt) {
g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
@ -848,11 +852,6 @@ func (db *DB) CleanTombstones() (err error) {
return errors.Wrap(db.reload(), "reload blocks")
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax
}
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false

View file

@ -26,6 +26,8 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
)
@ -1096,3 +1098,90 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
{Min: 8, Max: 9}: {nc1[8], nc1[9]}, // 7-10, 8-9
}, OverlappingBlocks(nc1))
}
// Regression test for https://github.com/prometheus/tsdb/issues/347
func TestChunkAtBlockBoundary(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
app := db.Appender()
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
for i := int64(0); i < 3; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(label, i*blockRange+1000, 0)
testutil.Ok(t, err)
}
err := app.Commit()
testutil.Ok(t, err)
_, err = db.compact()
testutil.Ok(t, err)
for _, block := range db.blocks {
r, err := block.Index()
testutil.Ok(t, err)
defer r.Close()
meta := block.Meta()
p, err := r.Postings(index.AllPostingsKey())
testutil.Ok(t, err)
var (
lset labels.Labels
chks []chunks.Meta
)
chunkCount := 0
for p.Next() {
err = r.Series(p.At(), &lset, &chks)
testutil.Ok(t, err)
for _, c := range chks {
testutil.Assert(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime,
"chunk spans beyond block boundaries: [block.MinTime=%d, block.MaxTime=%d]; [chunk.MinTime=%d, chunk.MaxTime=%d]",
meta.MinTime, meta.MaxTime, c.MinTime, c.MaxTime)
chunkCount++
}
}
testutil.Assert(t, chunkCount == 1, "expected 1 chunk in block %s, got %d", meta.ULID, chunkCount)
}
}
func TestQuerierWithBoundaryChunks(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
app := db.Appender()
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
for i := int64(0); i < 5; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
}
err := app.Commit()
testutil.Ok(t, err)
_, err = db.compact()
testutil.Ok(t, err)
testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB")
q, err := db.Querier(blockRange, 2*blockRange)
testutil.Ok(t, err)
defer q.Close()
// The requested interval covers 2 blocks, so the querier should contain 2 blocks.
count := len(q.(*querier).blocks)
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
}

18
head.go
View file

@ -735,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
s.Lock()
c := s.chunk(int(cid))
// This means that the chunk has been garbage collected.
if c == nil {
// This means that the chunk has been garbage collected or is outside
// the specified range.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, ErrNotFound
}
mint, maxt := c.minTime, c.maxTime
s.Unlock()
// Do not expose chunks that are outside of the specified range.
if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) {
return nil, ErrNotFound
}
return &safeChunk{
Chunk: c.chunk,
s: s,
@ -852,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
for i, c := range s.chunks {
// Do not expose chunks that are outside of the specified range.
if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
}
*chks = append(*chks, chunks.Meta{
@ -1291,6 +1286,11 @@ type memChunk struct {
minTime, maxTime int64
}
// Returns true if the chunk overlaps [mint, maxt].
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}
type memSafeIterator struct {
chunkenc.Iterator