From b7b70066ae0e5f01934037af1e491ad43db78a60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 6 Oct 2021 11:04:37 +0200 Subject: [PATCH] Address review feedback. --- tsdb/compact.go | 42 +++++++++++++++++-------------- tsdb/symbols_batch.go | 51 ++++++++++++++++++++------------------ tsdb/symbols_batch_test.go | 3 +++ 3 files changed, 54 insertions(+), 42 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 37ded7202e..3a77bd0be7 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -810,23 +810,25 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) - // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. - // Postings can only be iterated once. - k, v = index.AllPostingsKey() - all, err = indexr.Postings(k, v) - if err != nil { - return err + if len(outBlocks) > 1 { + // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. + // Postings can only be iterated once. + k, v = index.AllPostingsKey() + all, err = indexr.Postings(k, v) + if err != nil { + return err + } + all = indexr.SortedPostings(all) + // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. + symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) + } else { + syms := indexr.Symbols() + if i == 0 { + symbols = syms + continue + } + symbols = NewMergedStringIter(symbols, syms) } - all = indexr.SortedPostings(all) - // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. - symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) - - syms := indexr.Symbols() - if i == 0 { - symbols = syms - continue - } - symbols = NewMergedStringIter(symbols, syms) } if len(outBlocks) == 1 { @@ -913,6 +915,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, return nil } +// How many symbols we buffer in memory per output block. +const inMemorySymbolsLimit = 1_000_000 + // populateSymbols writes symbols to output blocks. We need to iterate through all series to find // which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to // block's index. @@ -923,7 +928,7 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo batchers := make([]*symbolsBatcher, len(outBlocks)) for ix := range outBlocks { - batchers[ix] = newSymbolsBatcher(10000, outBlocks[ix].tmpDir) + batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir) // Always include empty symbol. Blocks created from Head always have it in the symbols table, // and if we only include symbols from series, we would skip it. @@ -997,7 +1002,8 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo closeIt = nil _ = it.Close() - // Delete symbol files from symbolsBatcher. + // Delete symbol files from symbolsBatcher. We don't need to perform the cleanup if populateSymbols + // or compaction fails, because in that case compactor already removes entire (temp) output block directory. for _, fn := range batchers[ix].symbolFiles() { if err := os.Remove(fn); err != nil { return errors.Wrap(err, "deleting symbols file") diff --git a/tsdb/symbols_batch.go b/tsdb/symbols_batch.go index 283acd344a..b6f6a3a345 100644 --- a/tsdb/symbols_batch.go +++ b/tsdb/symbols_batch.go @@ -70,7 +70,7 @@ func writeSymbolsToFile(filename string, symbols []string) error { return err } - // Snappy is used to for buffering and smaller files. + // Snappy is used for buffering and to create smaller files. sn := snappy.NewBufferedWriter(f) enc := gob.NewEncoder(sn) @@ -89,16 +89,19 @@ func writeSymbolsToFile(filename string, symbols []string) error { return errs.Err() } +// Implements heap.Interface using symbols from files. type symbolsHeap []*symbolsFile +// Len implements sort.Interface. func (s *symbolsHeap) Len() int { return len(*s) } +// Less implements sort.Interface. func (s *symbolsHeap) Less(i, j int) bool { iw, ierr := (*s)[i].Peek() if ierr != nil { - // empty string will be sorted first, so error will be returned before any other result. + // Empty string will be sorted first, so error will be returned before any other result. iw = "" } @@ -110,16 +113,17 @@ func (s *symbolsHeap) Less(i, j int) bool { return iw < jw } +// Swap implements sort.Interface. func (s *symbolsHeap) Swap(i, j int) { (*s)[i], (*s)[j] = (*s)[j], (*s)[i] } +// Push implements heap.Interface. Push should add x as element Len(). func (s *symbolsHeap) Push(x interface{}) { - if f, ok := x.(*symbolsFile); ok { - *s = append(*s, f) - } + *s = append(*s, x.(*symbolsFile)) } +// Pop implements heap.Interface. Pop should remove and return element Len() - 1. func (s *symbolsHeap) Pop() interface{} { l := len(*s) res := (*s)[l-1] @@ -160,31 +164,30 @@ func newSymbolsIterator(filenames []string) (*symbolsIterator, error) { // NextSymbol advances iterator forward, and returns next symbol. // If there is no next element, returns err == io.EOF. func (sit *symbolsIterator) NextSymbol() (string, error) { -again: - if len(sit.heap) == 0 { - return "", io.EOF - } + for len(sit.heap) > 0 { + result, err := sit.heap[0].Next() + if err == io.EOF { + // End of file, remove it from heap, and try next file. + heap.Remove(&sit.heap, 0) + continue + } - result, err := sit.heap[0].Next() - if err == io.EOF { - // End of file, remove it, and try next file. - heap.Remove(&sit.heap, 0) - goto again - } + if err != nil { + return "", err + } - if err != nil { - return "", err - } + heap.Fix(&sit.heap, 0) - heap.Fix(&sit.heap, 0) + if sit.lastReturned != nil && *sit.lastReturned == result { + // Duplicate symbol, try next one. + continue + } - if sit.lastReturned == nil || *sit.lastReturned != result { sit.lastReturned = &result return result, nil } - // Duplicate symbol, try next one. - goto again + return "", io.EOF } // Close all files. @@ -224,7 +227,7 @@ func (sf *symbolsFile) Peek() (string, error) { return sf.nextSymbol, sf.nextErr } -// Next advances iterator and returns next symbol or error. +// Next advances iterator and returns the next symbol or error. func (sf *symbolsFile) Next() (string, error) { if sf.nextValid { defer func() { @@ -256,7 +259,7 @@ func openFiles(filenames []string) ([]*os.File, error) { f, err := os.Open(fn) if err != nil { - // Close opened files so far. + // Close files opened so far. for _, sf := range result { _ = sf.Close() } diff --git a/tsdb/symbols_batch_test.go b/tsdb/symbols_batch_test.go index 06c0cab834..8b41d89630 100644 --- a/tsdb/symbols_batch_test.go +++ b/tsdb/symbols_batch_test.go @@ -32,6 +32,9 @@ func TestSymbolsBatchAndIteration(t *testing.T) { it, err := newSymbolsIterator(b.symbolFiles()) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, it.Close()) + }) first := true var w, prev string