Make SeriesSets use tombstones.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-17 14:49:42 +05:30
parent 34a86af3c6
commit 22c1b5b492
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
9 changed files with 319 additions and 195 deletions

View file

@ -41,10 +41,6 @@ type ChunkMeta struct {
Chunk chunks.Chunk
MinTime, MaxTime int64 // time range the data covers
// To handle deleted time-ranges.
deleted bool
dranges []trange
}
// writeHash writes the chunk encoding and raw data into the provided hash.
@ -58,61 +54,6 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error {
return nil
}
// Iterator returns a chunks.Iterator that honors any deleted ranges.
// If there is no deleted range then the underlying iterator is returned.
func (cm *ChunkMeta) Iterator() chunks.Iterator {
if cm.Chunk == nil {
return nil
}
if cm.deleted {
return &deletedIterator{it: cm.Chunk.Iterator(), dranges: cm.dranges}
}
return cm.Chunk.Iterator()
}
type trange struct {
mint, maxt int64
}
func (tr trange) inBounds(t int64) bool {
return t >= tr.mint && t <= tr.maxt
}
// This adds the new time-range to the existing ones.
// The existing ones must be sorted and should not be nil.
func addNewInterval(existing []trange, n trange) []trange {
for i, r := range existing {
if r.inBounds(n.mint) {
if n.maxt > r.maxt {
existing[i].maxt = n.maxt
}
return existing
}
if r.inBounds(n.maxt) {
if n.mint < r.maxt {
existing[i].mint = n.mint
}
return existing
}
if n.mint < r.mint {
newRange := make([]trange, i, len(existing[:i])+1)
copy(newRange, existing[:i])
newRange = append(newRange, n)
newRange = append(newRange, existing[i:]...)
return newRange
}
}
existing = append(existing, n)
return existing
}
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
// returned.
type deletedIterator struct {
@ -287,27 +228,6 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
for _, c := range chks {
maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
// Remove the deleted parts.
if c.deleted {
// TODO(gouthamve): Try to do it in-place somehow?
chk := chunks.NewXORChunk()
app, err := chk.Appender()
if err != nil {
return err
}
it := c.Iterator()
for it.Next() {
ts, v := it.At()
app.Append(ts, v)
}
if err := it.Err(); err != nil {
return err
}
c.Chunk = chk
}
maxLen += int64(len(c.Chunk.Bytes()))
}
newsz := w.n + maxLen

View file

@ -37,51 +37,6 @@ func (cr mockChunkReader) Close() error {
return nil
}
func TestAddingNewIntervals(t *testing.T) {
cases := []struct {
exist []trange
new trange
exp []trange
}{
{
new: trange{1, 2},
exp: []trange{{1, 2}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{21, 23},
exp: []trange{{1, 10}, {12, 20}, {21, 23}, {25, 30}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{21, 25},
exp: []trange{{1, 10}, {12, 20}, {21, 30}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{18, 23},
exp: []trange{{1, 10}, {12, 23}, {25, 30}},
},
// TODO(gouthamve): (below) This is technically right, but fix it in the future.
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{9, 23},
exp: []trange{{1, 23}, {12, 20}, {25, 30}},
},
{
exist: []trange{{5, 10}, {12, 20}, {25, 30}},
new: trange{1, 4},
exp: []trange{{1, 4}, {5, 10}, {12, 20}, {25, 30}},
},
}
for _, c := range cases {
require.Equal(t, c.exp, addNewInterval(c.exist, c.new))
}
return
}
func TestDeletedIterator(t *testing.T) {
chk := chunks.NewXORChunk()
app, err := chk.Appender()

View file

@ -25,6 +25,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
)
@ -280,7 +281,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
if err != nil {
return nil, err
}
s := newCompactionSeriesSet(b.Index(), b.Chunks(), all)
s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all)
if i == 0 {
set = s
@ -301,14 +302,37 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
)
for set.Next() {
lset, chunks := set.At()
if err := chunkw.WriteChunks(chunks...); err != nil {
lset, chks, ranges := set.At() // The chunks here are not fully deleted.
if len(ranges) > 0 {
// Re-encode the chunk to not have deleted values.
for _, chk := range chks {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
if ranges[0].mint <= chk.MaxTime && chk.MinTime <= ranges[len(ranges)-1].maxt {
newChunk := chunks.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
it := &deletedIterator{it: chk.Chunk.Iterator(), dranges: ranges}
for it.Next() {
ts, v := it.At()
app.Append(ts, v)
}
chk.Chunk = newChunk
}
}
}
if err := chunkw.WriteChunks(chks...); err != nil {
return nil, err
}
indexw.AddSeries(i, lset, chunks...)
indexw.AddSeries(i, lset, chks...)
meta.Stats.NumChunks += uint64(len(chunks))
meta.Stats.NumChunks += uint64(len(chks))
meta.Stats.NumSeries++
for _, l := range lset {
@ -358,25 +382,28 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
type compactionSet interface {
Next() bool
At() (labels.Labels, []*ChunkMeta)
At() (labels.Labels, []*ChunkMeta, []trange)
Err() error
}
type compactionSeriesSet struct {
p Postings
index IndexReader
chunks ChunkReader
p Postings
index IndexReader
chunks ChunkReader
tombstones TombstoneReader
l labels.Labels
c []*ChunkMeta
err error
l labels.Labels
c []*ChunkMeta
dranges []trange
err error
}
func newCompactionSeriesSet(i IndexReader, c ChunkReader, p Postings) *compactionSeriesSet {
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p Postings) *compactionSeriesSet {
return &compactionSeriesSet{
index: i,
chunks: c,
p: p,
index: i,
chunks: c,
tombstones: t,
p: p,
}
}
@ -385,15 +412,34 @@ func (c *compactionSeriesSet) Next() bool {
return false
}
if c.tombstones.Seek(c.p.At()) {
s := c.tombstones.At()
if c.p.At() == s.ref {
c.dranges = s.ranges
} else {
c.dranges = nil
}
}
c.l, c.c, c.err = c.index.Series(c.p.At())
if c.err != nil {
return false
}
for _, chk := range c.c {
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
if c.err != nil {
return false
// Remove completely deleted chunks and re-encode partial ones.
if len(c.dranges) > 0 {
chks := make([]*ChunkMeta, 0, len(c.c))
for _, chk := range c.c {
if !(trange{chk.MinTime, chk.MaxTime}.isSubrange(c.dranges)) {
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
if c.err != nil {
return false
}
chks = append(chks, chk)
}
}
c.c = chks
}
return true
@ -406,8 +452,8 @@ func (c *compactionSeriesSet) Err() error {
return c.p.Err()
}
func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta) {
return c.l, c.c
func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta, []trange) {
return c.l, c.c, c.dranges
}
type compactionMerger struct {
@ -416,6 +462,7 @@ type compactionMerger struct {
aok, bok bool
l labels.Labels
c []*ChunkMeta
dranges []trange
}
type compactionSeries struct {
@ -443,8 +490,8 @@ func (c *compactionMerger) compare() int {
if !c.bok {
return -1
}
a, _ := c.a.At()
b, _ := c.b.At()
a, _, _ := c.a.At()
b, _, _ := c.b.At()
return labels.Compare(a, b)
}
@ -456,17 +503,21 @@ func (c *compactionMerger) Next() bool {
d := c.compare()
// Both sets contain the current series. Chain them into a single one.
if d > 0 {
c.l, c.c = c.b.At()
c.l, c.c, c.dranges = c.b.At()
c.bok = c.b.Next()
} else if d < 0 {
c.l, c.c = c.a.At()
c.l, c.c, c.dranges = c.a.At()
c.aok = c.a.Next()
} else {
l, ca := c.a.At()
_, cb := c.b.At()
l, ca, ra := c.a.At()
_, cb, rb := c.b.At()
for _, r := range rb {
ra = addNewInterval(ra, r)
}
c.l = l
c.c = append(ca, cb...)
c.dranges = ra
c.aok = c.a.Next()
c.bok = c.b.Next()
@ -481,8 +532,8 @@ func (c *compactionMerger) Err() error {
return c.b.Err()
}
func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta) {
return c.l, c.c
func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta, []trange) {
return c.l, c.c, c.dranges
}
func renameFile(from, to string) error {

View file

@ -100,7 +100,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
}
func TestDBAppenderAddRef(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test1")
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)

15
head.go
View file

@ -279,10 +279,12 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
series := h.series[:]
return &blockQuerier{
mint: mint,
maxt: maxt,
index: h.Index(),
chunks: h.Chunks(),
mint: mint,
maxt: maxt,
index: h.Index(),
chunks: h.Chunks(),
tombstones: h.Tombstones(),
postingsMapper: func(p Postings) Postings {
ep := make([]uint32, 0, 64)
@ -586,8 +588,6 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error
return nil, nil, ErrNotFound
}
dranges, deleted := h.tombstones[ref]
s := h.series[ref]
metas := make([]*ChunkMeta, 0, len(s.chunks))
@ -599,9 +599,6 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: (uint64(ref) << 32) | uint64(i),
deleted: deleted,
dranges: dranges,
})
}

View file

@ -150,6 +150,8 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
p: p,
index: q.index,
absent: absent,
tombstones: q.tombstones.Copy(),
},
chunks: q.chunks,
mint: q.mint,
@ -367,29 +369,35 @@ func (s *mergedSeriesSet) Next() bool {
type chunkSeriesSet interface {
Next() bool
At() (labels.Labels, []*ChunkMeta)
At() (labels.Labels, []*ChunkMeta, stone)
Err() error
}
// baseChunkSeries loads the label set and chunk references for a postings
// list from an index. It filters out series that have labels set that should be unset.
type baseChunkSeries struct {
p Postings
index IndexReader
absent []string // labels that must be unset in results.
p Postings
index IndexReader
tombstones TombstoneReader
absent []string // labels that must be unset in results.
lset labels.Labels
chks []*ChunkMeta
err error
lset labels.Labels
chks []*ChunkMeta
stone stone
err error
}
func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks }
func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, stone) {
return s.lset, s.chks, s.stone
}
func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) Next() bool {
Outer:
for s.p.Next() {
lset, chunks, err := s.index.Series(s.p.At())
ref := s.p.At()
lset, chunks, err := s.index.Series(ref)
if err != nil {
s.err = err
return false
@ -404,6 +412,19 @@ Outer:
s.lset = lset
s.chks = chunks
if s.tombstones.Seek(ref) && s.tombstones.At().ref == ref {
s.stone = s.tombstones.At()
// Only those chunks that are not entirely deleted.
chks := make([]*ChunkMeta, 0, len(s.chks))
for _, chk := range s.chks {
if !(trange{chk.MinTime, chk.MaxTime}.isSubrange(s.stone.ranges)) {
chks = append(chks, chk)
}
}
s.chks = chks
}
return true
}
@ -421,17 +442,20 @@ type populatedChunkSeries struct {
chunks ChunkReader
mint, maxt int64
err error
chks []*ChunkMeta
lset labels.Labels
err error
chks []*ChunkMeta
lset labels.Labels
stone stone
}
func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks }
func (s *populatedChunkSeries) Err() error { return s.err }
func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta, stone) {
return s.lset, s.chks, stone{}
}
func (s *populatedChunkSeries) Err() error { return s.err }
func (s *populatedChunkSeries) Next() bool {
for s.set.Next() {
lset, chks := s.set.At()
lset, chks, stn := s.set.At()
for len(chks) > 0 {
if chks[0].MaxTime >= s.mint {
@ -458,6 +482,7 @@ func (s *populatedChunkSeries) Next() bool {
s.lset = lset
s.chks = chks
s.stone = stn
return true
}
@ -478,8 +503,15 @@ type blockSeriesSet struct {
func (s *blockSeriesSet) Next() bool {
for s.set.Next() {
lset, chunks := s.set.At()
s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt}
lset, chunks, stn := s.set.At()
s.cur = &chunkSeries{
labels: lset,
chunks: chunks,
mint: s.mint,
maxt: s.maxt,
stone: stn,
}
return true
}
if s.set.Err() != nil {
@ -498,6 +530,8 @@ type chunkSeries struct {
chunks []*ChunkMeta // in-order chunk refs
mint, maxt int64
stone stone
}
func (s *chunkSeries) Labels() labels.Labels {
@ -505,7 +539,7 @@ func (s *chunkSeries) Labels() labels.Labels {
}
func (s *chunkSeries) Iterator() SeriesIterator {
return newChunkSeriesIterator(s.chunks, s.mint, s.maxt)
return newChunkSeriesIterator(s.chunks, s.stone, s.mint, s.maxt)
}
// SeriesIterator iterates over the data of a time series.
@ -602,16 +636,24 @@ type chunkSeriesIterator struct {
cur chunks.Iterator
maxt, mint int64
stone stone
}
func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator {
func newChunkSeriesIterator(cs []*ChunkMeta, s stone, mint, maxt int64) *chunkSeriesIterator {
it := cs[0].Chunk.Iterator()
if len(s.ranges) > 0 {
it = &deletedIterator{it: it, dranges: s.ranges}
}
return &chunkSeriesIterator{
chunks: cs,
i: 0,
cur: cs[0].Chunk.Iterator(),
cur: it,
mint: mint,
maxt: maxt,
stone: s,
}
}
@ -646,6 +688,9 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
it.i = x
it.cur = it.chunks[x].Chunk.Iterator()
if len(it.stone.ranges) > 0 {
it.cur = &deletedIterator{it: it.cur, dranges: it.stone.ranges}
}
for it.cur.Next() {
t0, _ := it.cur.At()
@ -677,6 +722,9 @@ func (it *chunkSeriesIterator) Next() bool {
it.i++
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.stone.ranges) > 0 {
it.cur = &deletedIterator{it: it.cur, dranges: it.stone.ranges}
}
return it.Next()
}

View file

@ -378,8 +378,9 @@ Outer:
for _, c := range cases.queries {
ir, cr := createIdxChkReaders(cases.data)
querier := &blockQuerier{
index: ir,
chunks: cr,
index: ir,
chunks: cr,
tombstones: emptyTombstoneReader,
mint: c.mint,
maxt: c.maxt,
@ -487,13 +488,14 @@ func TestBaseChunkSeries(t *testing.T) {
}
bcs := &baseChunkSeries{
p: newListPostings(tc.postings),
index: mi,
p: newListPostings(tc.postings),
index: mi,
tombstones: emptyTombstoneReader,
}
i := 0
for bcs.Next() {
lset, chks := bcs.At()
lset, chks, _ := bcs.At()
idx := tc.expIdxs[i]
@ -701,7 +703,7 @@ func TestSeriesIterator(t *testing.T) {
chunkFromSamples(tc.b),
chunkFromSamples(tc.c),
}
res := newChunkSeriesIterator(chkMetas, tc.mint, tc.maxt)
res := newChunkSeriesIterator(chkMetas, stone{}, tc.mint, tc.maxt)
smplValid := make([]sample, 0)
for _, s := range tc.exp {
@ -772,7 +774,7 @@ func TestSeriesIterator(t *testing.T) {
chunkFromSamples(tc.b),
chunkFromSamples(tc.c),
}
res := newChunkSeriesIterator(chkMetas, tc.mint, tc.maxt)
res := newChunkSeriesIterator(chkMetas, stone{}, tc.mint, tc.maxt)
smplValid := make([]sample, 0)
for _, s := range tc.exp {
@ -919,8 +921,8 @@ func (m *mockChunkSeriesSet) Next() bool {
return m.i < len(m.l)
}
func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta) {
return m.l[m.i], m.cm[m.i]
func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta, stone) {
return m.l[m.i], m.cm[m.i], stone{}
}
func (m *mockChunkSeriesSet) Err() error {

View file

@ -102,6 +102,8 @@ type TombstoneReader interface {
Next() bool
Seek(ref uint32) bool
At() stone
// A copy of the current instance. Changes to the copy will not affect parent.
Copy() TombstoneReader
Err() error
}
@ -195,6 +197,16 @@ func (t *tombstoneReader) At() stone {
return stone{ref: ref, ranges: dranges}
}
func (t *tombstoneReader) Copy() TombstoneReader {
return &tombstoneReader{
stones: t.stones[:],
idx: t.idx,
len: t.len,
b: t.b,
}
}
func (t *tombstoneReader) Err() error {
return t.err
}
@ -250,6 +262,15 @@ func (t *mapTombstoneReader) At() stone {
return stone{ref: t.cur, ranges: t.stones[t.cur]}
}
func (t *mapTombstoneReader) Copy() TombstoneReader {
return &mapTombstoneReader{
refs: t.refs[:],
cur: t.cur,
stones: t.stones,
}
}
func (t *mapTombstoneReader) Err() error {
return nil
}
@ -298,6 +319,10 @@ func (t *simpleTombstoneReader) At() stone {
return stone{ref: t.cur, ranges: t.ranges}
}
func (t *simpleTombstoneReader) Copy() TombstoneReader {
return &simpleTombstoneReader{refs: t.refs[:], cur: t.cur, ranges: t.ranges}
}
func (t *simpleTombstoneReader) Err() error {
return nil
}
@ -371,9 +396,86 @@ func (t *mergedTombstoneReader) At() stone {
return t.cur
}
func (t *mergedTombstoneReader) Copy() TombstoneReader {
return &mergedTombstoneReader{
a: t.a.Copy(),
b: t.b.Copy(),
cur: t.cur,
initialized: t.initialized,
aok: t.aok,
bok: t.bok,
}
}
func (t *mergedTombstoneReader) Err() error {
if t.a.Err() != nil {
return t.a.Err()
}
return t.b.Err()
}
type trange struct {
mint, maxt int64
}
func (tr trange) inBounds(t int64) bool {
return t >= tr.mint && t <= tr.maxt
}
func (tr trange) isSubrange(ranges []trange) bool {
for _, r := range ranges {
if r.inBounds(tr.mint) && r.inBounds(tr.maxt) {
return true
}
}
return false
}
// This adds the new time-range to the existing ones.
// The existing ones must be sorted and should not be nil.
func addNewInterval(existing []trange, n trange) []trange {
for i, r := range existing {
// TODO(gouthamve): Make this codepath easier to digest.
if r.inBounds(n.mint) {
if n.maxt > r.maxt {
existing[i].maxt = n.maxt
}
j := 0
for _, r2 := range existing[i+1:] {
if n.maxt < r2.mint {
break
}
j++
}
if j != 0 {
if existing[i+j].maxt > n.maxt {
existing[i].maxt = existing[i+j].maxt
}
existing = append(existing[:i+1], existing[i+j+1:]...)
}
return existing
}
if r.inBounds(n.maxt) {
if n.mint < r.maxt {
existing[i].mint = n.mint
}
return existing
}
if n.mint < r.mint {
newRange := make([]trange, i, len(existing[:i])+1)
copy(newRange, existing[:i])
newRange = append(newRange, n)
newRange = append(newRange, existing[i:]...)
return newRange
}
}
existing = append(existing, n)
return existing
}

View file

@ -45,3 +45,52 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
require.NoError(t, restr.Err())
require.NoError(t, exptr.Err())
}
func TestAddingNewIntervals(t *testing.T) {
cases := []struct {
exist []trange
new trange
exp []trange
}{
{
new: trange{1, 2},
exp: []trange{{1, 2}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{21, 23},
exp: []trange{{1, 10}, {12, 20}, {21, 23}, {25, 30}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{21, 25},
exp: []trange{{1, 10}, {12, 20}, {21, 30}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{18, 23},
exp: []trange{{1, 10}, {12, 23}, {25, 30}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{9, 23},
exp: []trange{{1, 23}, {25, 30}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{9, 230},
exp: []trange{{1, 230}},
},
{
exist: []trange{{5, 10}, {12, 20}, {25, 30}},
new: trange{1, 4},
exp: []trange{{1, 4}, {5, 10}, {12, 20}, {25, 30}},
},
}
for _, c := range cases {
require.Equal(t, c.exp, addNewInterval(c.exist, c.new))
}
return
}