diff --git a/block.go b/block.go index 365d2f6dc..94f57f206 100644 --- a/block.go +++ b/block.go @@ -199,10 +199,17 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { pos += int64(n) } } + if err := tr.Err(); err != nil { + return err + } // Write the offset table. buf.reset() buf.putBE32int(len(refs)) + if _, err := f.Write(buf.get()); err != nil { + return err + } + for _, ref := range refs { buf.reset() buf.putBE32(ref) @@ -325,7 +332,7 @@ Outer: // Merge the current and new tombstones. tr := newMapTombstoneReader(ir.tombstones) - str := newSimpleTombstoneReader(vPostings, []trange{mint, maxt}) + str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}}) tombreader := newMergedTombstoneReader(tr, str) return writeTombstoneFile(pb.dir, tombreader) @@ -371,13 +378,13 @@ func newTombStoneReader(dir string) (*tombstoneReader, error) { } d = &decbuf{b: b[off:]} - numStones := d.be64int64() + numStones := d.be32int() if err := d.err(); err != nil { return nil, err } return &tombstoneReader{ - stones: b[off+8 : (off+8)+(numStones*12)], + stones: b[off+4:], idx: -1, len: int(numStones), @@ -448,6 +455,7 @@ func newMapTombstoneReader(ts map[uint32][]trange) *mapTombstoneReader { func (t *mapTombstoneReader) Next() bool { if len(t.refs) > 0 { t.cur = t.refs[0] + t.refs = t.refs[1:] return true } diff --git a/block_test.go b/block_test.go new file mode 100644 index 000000000..491c1c138 --- /dev/null +++ b/block_test.go @@ -0,0 +1,47 @@ +package tsdb + +import ( + "io/ioutil" + "math/rand" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestWriteAndReadbackTombStones(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + ref := uint32(0) + + stones := make(map[uint32][]trange) + // Generate the tombstones. + for i := 0; i < 100; i++ { + ref += uint32(rand.Int31n(10)) + 1 + numRanges := rand.Intn(5) + dranges := make([]trange, numRanges) + mint := rand.Int63n(time.Now().UnixNano()) + for j := 0; j < numRanges; j++ { + dranges[j] = trange{mint, mint + rand.Int63n(1000)} + mint += rand.Int63n(1000) + 1 + } + stones[ref] = dranges + } + + require.NoError(t, writeTombstoneFile(tmpdir, newMapTombstoneReader(stones))) + + restr, err := readTombstoneFile(tmpdir) + require.NoError(t, err) + exptr := newMapTombstoneReader(stones) + // Compare the two readers. + for restr.Next() { + require.True(t, exptr.Next()) + + require.Equal(t, exptr.At(), restr.At()) + } + require.False(t, exptr.Next()) + require.NoError(t, restr.Err()) + require.NoError(t, exptr.Err()) +} diff --git a/chunks.go b/chunks.go index 5e89267e2..ca030b76d 100644 --- a/chunks.go +++ b/chunks.go @@ -47,46 +47,6 @@ type ChunkMeta struct { dranges []trange } -type trange struct { - mint, maxt int64 -} - -func (tr trange) inBounds(t int64) bool { - return t >= tr.mint && t <= tr.maxt -} - -// This adds the new time-range to the existing ones. -// The existing ones must be sorted. -func addNewInterval(existing []trange, n trange) []trange { - for i, r := range existing { - if r.inBounds(n.mint) { - if n.maxt > r.maxt { - existing[i].maxt = n.maxt - } - - return existing - } - if r.inBounds(n.maxt) { - if n.mint < r.maxt { - existing[i].mint = n.mint - } - - return existing - } - - if n.mint < r.mint { - newRange := existing[:i] - newRange = append(newRange, n) - newRange = append(newRange, existing[i:]...) - - return newRange - } - } - - existing = append(existing, n) - return existing -} - // writeHash writes the chunk encoding and raw data into the provided hash. func (cm *ChunkMeta) writeHash(h hash.Hash) error { if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { @@ -112,6 +72,47 @@ func (cm *ChunkMeta) Iterator() chunks.Iterator { return cm.Chunk.Iterator() } +type trange struct { + mint, maxt int64 +} + +func (tr trange) inBounds(t int64) bool { + return t >= tr.mint && t <= tr.maxt +} + +// This adds the new time-range to the existing ones. +// The existing ones must be sorted and should not be nil. +func addNewInterval(existing []trange, n trange) []trange { + for i, r := range existing { + if r.inBounds(n.mint) { + if n.maxt > r.maxt { + existing[i].maxt = n.maxt + } + + return existing + } + if r.inBounds(n.maxt) { + if n.mint < r.maxt { + existing[i].mint = n.mint + } + + return existing + } + + if n.mint < r.mint { + newRange := make([]trange, i, len(existing[:i])+1) + copy(newRange, existing[:i]) + newRange = append(newRange, n) + newRange = append(newRange, existing[i:]...) + + return newRange + } + } + + existing = append(existing, n) + return existing +} + // deletedIterator wraps an Iterator and makes sure any deleted metrics are not // returned. type deletedIterator struct { @@ -128,10 +129,12 @@ func (it *deletedIterator) Next() bool { Outer: for it.it.Next() { ts, _ := it.it.At() + for _, tr := range it.dranges { if tr.inBounds(ts) { continue Outer } + if ts > tr.maxt { it.dranges = it.dranges[1:] continue @@ -147,7 +150,7 @@ Outer: } func (it *deletedIterator) Err() error { - return it.Err() + return it.it.Err() } // ChunkWriter serializes a time block of chunked series data. diff --git a/chunks_test.go b/chunks_test.go index ae9d69876..2837259f8 100644 --- a/chunks_test.go +++ b/chunks_test.go @@ -14,8 +14,12 @@ package tsdb import ( + "math/rand" + "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/chunks" + "github.com/stretchr/testify/require" ) type mockChunkReader map[uint64]chunks.Chunk @@ -32,3 +36,108 @@ func (cr mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { func (cr mockChunkReader) Close() error { return nil } + +func TestAddingNewIntervals(t *testing.T) { + cases := []struct { + exist []trange + new trange + + exp []trange + }{ + { + new: trange{1, 2}, + exp: []trange{{1, 2}}, + }, + { + exist: []trange{{1, 10}, {12, 20}, {25, 30}}, + new: trange{21, 23}, + exp: []trange{{1, 10}, {12, 20}, {21, 23}, {25, 30}}, + }, + { + exist: []trange{{1, 10}, {12, 20}, {25, 30}}, + new: trange{21, 25}, + exp: []trange{{1, 10}, {12, 20}, {21, 30}}, + }, + { + exist: []trange{{1, 10}, {12, 20}, {25, 30}}, + new: trange{18, 23}, + exp: []trange{{1, 10}, {12, 23}, {25, 30}}, + }, + // TODO(gouthamve): (below) This is technically right, but fix it in the future. + { + exist: []trange{{1, 10}, {12, 20}, {25, 30}}, + new: trange{9, 23}, + exp: []trange{{1, 23}, {12, 20}, {25, 30}}, + }, + { + exist: []trange{{5, 10}, {12, 20}, {25, 30}}, + new: trange{1, 4}, + exp: []trange{{1, 4}, {5, 10}, {12, 20}, {25, 30}}, + }, + } + + for _, c := range cases { + require.Equal(t, c.exp, addNewInterval(c.exist, c.new)) + } + return +} + +func TestDeletedIterator(t *testing.T) { + chk := chunks.NewXORChunk() + app, err := chk.Appender() + require.NoError(t, err) + // Insert random stuff from (0, 1000). + act := make([]sample, 1000) + for i := 0; i < 1000; i++ { + act[i].t = int64(i) + act[i].v = rand.Float64() + app.Append(act[i].t, act[i].v) + } + + cases := []struct { + r []trange + }{ + {r: []trange{{1, 20}}}, + {r: []trange{{1, 10}, {12, 20}, {21, 23}, {25, 30}}}, + {r: []trange{{1, 10}, {12, 20}, {20, 30}}}, + {r: []trange{{1, 10}, {12, 23}, {25, 30}}}, + {r: []trange{{1, 23}, {12, 20}, {25, 30}}}, + {r: []trange{{1, 23}, {12, 20}, {25, 3000}}}, + {r: []trange{{0, 2000}}}, + {r: []trange{{500, 2000}}}, + {r: []trange{{0, 200}}}, + {r: []trange{{1000, 20000}}}, + } + + for _, c := range cases { + i := int64(-1) + it := &deletedIterator{it: chk.Iterator(), dranges: c.r[:]} + ranges := c.r[:] + for it.Next() { + i++ + for _, tr := range ranges { + if tr.inBounds(i) { + i = tr.maxt + 1 + ranges = ranges[1:] + } + } + + require.True(t, i < 1000) + + ts, v := it.At() + require.Equal(t, act[i].t, ts) + require.Equal(t, act[i].v, v) + } + // There has been an extra call to Next(). + i++ + for _, tr := range ranges { + if tr.inBounds(i) { + i = tr.maxt + 1 + ranges = ranges[1:] + } + } + + require.False(t, i < 1000) + require.NoError(t, it.Err()) + } +}