diff --git a/tsdb/compact.go b/tsdb/compact.go index 9026b37bf2..7c6e142166 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -330,7 +330,9 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { return splitDirs } -func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { +// CompactBlockMetas merges many block metas into one, combining it's source blocks together +// and adjusting compaction level. +func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { res := &BlockMeta{ ULID: uid, MinTime: blocks[0].MinTime, @@ -415,7 +417,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u uid = ulid.MustNew(ulid.Now(), rand.Reader) - meta := compactBlockMetas(uid, metas...) + meta := CompactBlockMetas(uid, metas...) err = c.write(dest, meta, blocks...) if err == nil { if meta.Stats.NumSamples == 0 { @@ -527,7 +529,6 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { } // write creates a new block that is the union of the provided blocks into dir. -// It cleans up all files of the old blocks after completing successfully. func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + tmpForCreationBlockDirSuffix @@ -633,7 +634,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } df = nil - // Block successfully written, make visible and remove old ones. + // Block successfully written, make it visible in destination dir by moving it from tmp one. if err := fileutil.Replace(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") } @@ -715,7 +716,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, symbols = syms continue } - symbols = newMergedStringIter(symbols, syms) + symbols = NewMergedStringIter(symbols, syms) } for symbols.Next() { diff --git a/tsdb/querier.go b/tsdb/querier.go index f666bb7904..e27c9cf5f9 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -485,7 +485,7 @@ type populateWithDelGenericSeriesIterator struct { i int err error - bufIter *deletedIterator + bufIter *DeletedIterator intervals tombstones.Intervals currDelIter chunkenc.Iterator @@ -501,7 +501,7 @@ func newPopulateWithDelGenericSeriesIterator( chunks: chunks, chks: chks, i: -1, - bufIter: &deletedIterator{}, + bufIter: &DeletedIterator{}, intervals: intervals, } } @@ -520,10 +520,10 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { return false } - p.bufIter.intervals = p.bufIter.intervals[:0] + p.bufIter.Intervals = p.bufIter.Intervals[:0] for _, interval := range p.intervals { if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) { - p.bufIter.intervals = p.bufIter.intervals.Add(interval) + p.bufIter.Intervals = p.bufIter.Intervals.Add(interval) } } @@ -534,14 +534,14 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { // // TODO think how to avoid the typecasting to verify when it is head block. _, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk) - if len(p.bufIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) { + if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) { // If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is. p.currDelIter = nil return true } // We don't want full chunk or it's potentially still opened, take just part of it. - p.bufIter.it = p.currChkMeta.Chunk.Iterator(nil) + p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil) p.currDelIter = p.bufIter return true } @@ -723,7 +723,8 @@ func (b *blockChunkSeriesSet) At() storage.ChunkSeries { } } -func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter { +// NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result. +func NewMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter { return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()} } @@ -767,35 +768,35 @@ func (m mergedStringIter) Err() error { return m.b.Err() } -// deletedIterator wraps an Iterator and makes sure any deleted metrics are not -// returned. -type deletedIterator struct { - it chunkenc.Iterator - - intervals tombstones.Intervals +// DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned. +type DeletedIterator struct { + // Iter is an Iterator to be wrapped. + Iter chunkenc.Iterator + // Intervals are the deletion intervals. + Intervals tombstones.Intervals } -func (it *deletedIterator) At() (int64, float64) { - return it.it.At() +func (it *DeletedIterator) At() (int64, float64) { + return it.Iter.At() } -func (it *deletedIterator) Seek(t int64) bool { - if it.it.Err() != nil { +func (it *DeletedIterator) Seek(t int64) bool { + if it.Iter.Err() != nil { return false } - if ok := it.it.Seek(t); !ok { + if ok := it.Iter.Seek(t); !ok { return false } // Now double check if the entry falls into a deleted interval. ts, _ := it.At() - for _, itv := range it.intervals { + for _, itv := range it.Intervals { if ts < itv.Mint { return true } if ts > itv.Maxt { - it.intervals = it.intervals[1:] + it.Intervals = it.Intervals[1:] continue } @@ -807,12 +808,12 @@ func (it *deletedIterator) Seek(t int64) bool { return true } -func (it *deletedIterator) Next() bool { +func (it *DeletedIterator) Next() bool { Outer: - for it.it.Next() { - ts, _ := it.it.At() + for it.Iter.Next() { + ts, _ := it.Iter.At() - for _, tr := range it.intervals { + for _, tr := range it.Intervals { if tr.InBounds(ts) { continue Outer } @@ -821,14 +822,14 @@ Outer: return true } - it.intervals = it.intervals[1:] + it.Intervals = it.Intervals[1:] } return true } return false } -func (it *deletedIterator) Err() error { return it.it.Err() } +func (it *DeletedIterator) Err() error { return it.Iter.Err() } type nopChunkReader struct { emptyChunk chunkenc.Chunk diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 7da9372fe8..7b983c0223 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1007,7 +1007,7 @@ func TestDeletedIterator(t *testing.T) { for _, c := range cases { i := int64(-1) - it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]} + it := &DeletedIterator{Iter: chk.Iterator(nil), Intervals: c.r[:]} ranges := c.r[:] for it.Next() { i++ @@ -1069,7 +1069,7 @@ func TestDeletedIterator_WithSeek(t *testing.T) { } for _, c := range cases { - it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]} + it := &DeletedIterator{Iter: chk.Iterator(nil), Intervals: c.r[:]} require.Equal(t, c.ok, it.Seek(c.seek)) if c.ok {