Merge remote-tracking branch 'upstream/master' into new-metrics

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2018-09-30 20:18:45 +05:30
commit 2638b587f6
No known key found for this signature in database
GPG key ID: 0241A11211763456
4 changed files with 51 additions and 2 deletions

View file

@ -478,7 +478,6 @@ Outer:
err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _, iv := range ivs { for _, iv := range ivs {
stones.addInterval(id, iv) stones.addInterval(id, iv)
pb.meta.Stats.NumTombstones++
} }
return nil return nil
}) })
@ -486,6 +485,7 @@ Outer:
return err return err
} }
pb.tombstones = stones pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
return err return err

View file

@ -1302,3 +1302,38 @@ func TestInitializeHeadTimestamp(t *testing.T) {
testutil.Equals(t, int64(15000), db.head.MaxTime()) testutil.Equals(t, int64(15000), db.head.MaxTime())
}) })
} }
func TestCorrectNumTombstones(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
app := db.Appender()
for i := int64(0); i < 3; i++ {
for j := int64(0); j < 15; j++ {
_, err := app.Add(label, i*blockRange+j, 0)
testutil.Ok(t, err)
}
}
testutil.Ok(t, app.Commit())
err := db.compact()
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.blocks))
testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar")))
testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
// {0, 1} and {2, 3} are merged to form 1 tombstone.
testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar")))
testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar")))
testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones)
testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar")))
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}

View file

@ -2,7 +2,7 @@
The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The maximum size per segment file is 512MiB. The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The maximum size per segment file is 512MiB.
Chunks in the files are referenced from the index by the in-file offset in the 4 LSB and the segment sequence number in the higher 4 MSBs. Chunks in the files are referenced from the index by uint64 composed of in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes).
``` ```
┌────────────────────────────────────────┬──────────────────────┐ ┌────────────────────────────────────────┬──────────────────────┐

View file

@ -42,6 +42,9 @@ type TombstoneReader interface {
// Iter calls the given function for each encountered interval. // Iter calls the given function for each encountered interval.
Iter(func(uint64, Intervals) error) error Iter(func(uint64, Intervals) error) error
// Total returns the total count of tombstones.
Total() uint64
// Close any underlying resources // Close any underlying resources
Close() error Close() error
} }
@ -185,6 +188,17 @@ func (t *memTombstones) Iter(f func(uint64, Intervals) error) error {
return nil return nil
} }
func (t *memTombstones) Total() uint64 {
t.mtx.RLock()
defer t.mtx.RUnlock()
total := uint64(0)
for _, ivs := range t.intvlGroups {
total += uint64(len(ivs))
}
return total
}
// addInterval to an existing memTombstones // addInterval to an existing memTombstones
func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
t.mtx.Lock() t.mtx.Lock()