Make interval overlap comparisons more explicit

Blocks are half-open intervals [a, b), while all other intervals
(chunks, head, ...) are closed intervals [a, b].

Make that distinction explicit by defining `OverlapsClosedInterval()`
methods for blocks and chunks, and using them in place of the more
generic `intervalOverlap()` function.

This change also fixes `db.Querier()` and `db.Delete()`, which could
previously return one extraneous block at the end of the specified
interval.

Signed-off-by: Benoît Knecht <benoit.knecht@fsfe.org>
This commit is contained in:
Benoît Knecht 2018-07-02 10:23:36 +02:00
parent 4ed6b9ed72
commit 1e1b2e163d
5 changed files with 26 additions and 20 deletions

View file

@ -447,7 +447,7 @@ Outer:
} }
for _, chk := range chks { for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond. // Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = Intervals{{tmin, tmax}} stones[p.At()] = Intervals{{tmin, tmax}}
@ -539,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error {
return nil return nil
} }
// Returns true if the block overlaps [mint, maxt].
func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
// The block itself is a half-open interval
// [pb.meta.MinTime, pb.meta.MaxTime).
return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
}
func clampInterval(a, b, mint, maxt int64) (int64, int64) { func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint { if a < mint {
a = mint a = mint

View file

@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error {
return nil return nil
} }
// Returns true if the chunk overlaps [mint, maxt].
func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
return cm.MinTime <= maxt && mint <= cm.MaxTime
}
var ( var (
errInvalidSize = fmt.Errorf("invalid size") errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag") errInvalidFlag = fmt.Errorf("invalid flag")

View file

@ -592,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if len(dranges) > 0 { if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values. // Re-encode the chunk to not have deleted values.
for i, chk := range chks { for i, chk := range chks {
if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
continue continue
} }

11
db.go
View file

@ -762,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
defer db.mtx.RUnlock() defer db.mtx.RUnlock()
for _, b := range db.blocks { for _, b := range db.blocks {
m := b.Meta() if b.OverlapsClosedInterval(mint, maxt) {
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b) blocks = append(blocks, b)
} }
} }
@ -805,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
defer db.mtx.RUnlock() defer db.mtx.RUnlock()
for _, b := range db.blocks { for _, b := range db.blocks {
m := b.Meta() if b.OverlapsClosedInterval(mint, maxt) {
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b *Block) func() error { g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) } return func() error { return b.Delete(mint, maxt, ms...) }
}(b)) }(b))
@ -854,11 +852,6 @@ func (db *DB) CleanTombstones() (err error) {
return errors.Wrap(db.reload(), "reload blocks") return errors.Wrap(db.reload(), "reload blocks")
} }
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax
}
func isBlockDir(fi os.FileInfo) bool { func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() { if !fi.IsDir() {
return false return false

18
head.go
View file

@ -735,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
s.Lock() s.Lock()
c := s.chunk(int(cid)) c := s.chunk(int(cid))
// This means that the chunk has been garbage collected. // This means that the chunk has been garbage collected or is outside
if c == nil { // the specified range.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock() s.Unlock()
return nil, ErrNotFound return nil, ErrNotFound
} }
mint, maxt := c.minTime, c.maxTime
s.Unlock() s.Unlock()
// Do not expose chunks that are outside of the specified range.
if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) {
return nil, ErrNotFound
}
return &safeChunk{ return &safeChunk{
Chunk: c.chunk, Chunk: c.chunk,
s: s, s: s,
@ -852,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
for i, c := range s.chunks { for i, c := range s.chunks {
// Do not expose chunks that are outside of the specified range. // Do not expose chunks that are outside of the specified range.
if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue continue
} }
*chks = append(*chks, chunks.Meta{ *chks = append(*chks, chunks.Meta{
@ -1291,6 +1286,11 @@ type memChunk struct {
minTime, maxTime int64 minTime, maxTime int64
} }
// Returns true if the chunk overlaps [mint, maxt].
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}
type memSafeIterator struct { type memSafeIterator struct {
chunkenc.Iterator chunkenc.Iterator