From 259e09fe5f1e6d9cb2fc3371a88b3426bc34beaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 Oct 2021 16:06:38 +0200 Subject: [PATCH 1/4] When doing compaction with splitting, only use symbols from series that belong to given sharded block. --- tsdb/compact.go | 110 ++++++++++++++- tsdb/compact_test.go | 29 ++++ tsdb/symbols_batch.go | 269 +++++++++++++++++++++++++++++++++++++ tsdb/symbols_batch_test.go | 53 ++++++++ 4 files changed, 455 insertions(+), 6 deletions(-) create mode 100644 tsdb/symbols_batch.go create mode 100644 tsdb/symbols_batch_test.go diff --git a/tsdb/compact.go b/tsdb/compact.go index 167378bd75..aa47d62cc3 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -749,6 +749,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, var ( sets []storage.ChunkSeriesSet + symbolsSets []storage.ChunkSeriesSet // series sets used for finding symbols. Only used when doing sharding. symbols index.StringIter closers []io.Closer overlapping bool @@ -808,6 +809,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, all = indexr.SortedPostings(all) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) + + // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. + // Postings can only be iterated once. + k, v = index.AllPostingsKey() + all, err = indexr.Postings(k, v) + if err != nil { + return err + } + all = indexr.SortedPostings(all) + // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. + symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) + syms := indexr.Symbols() if i == 0 { symbols = syms @@ -816,15 +829,19 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, symbols = NewMergedStringIter(symbols, syms) } - for symbols.Next() { - for _, ob := range outBlocks { - if err := ob.indexw.AddSymbol(symbols.At()); err != nil { + if len(outBlocks) == 1 { + for symbols.Next() { + if err := outBlocks[0].indexw.AddSymbol(symbols.At()); err != nil { return errors.Wrap(err, "add symbol") } } - } - if symbols.Err() != nil { - return errors.Wrap(symbols.Err(), "next symbol") + if symbols.Err() != nil { + return errors.Wrap(symbols.Err(), "next symbol") + } + } else { + if err := c.populateSymbols(symbolsSets, outBlocks); err != nil { + return err + } } var ( @@ -895,3 +912,84 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, return nil } + +// populateSymbols writes symbols to output blocks. We need to iterate through all series to find +// which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to +// block's index. +func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlocks []shardedBlock) error { + if len(outBlocks) == 0 { + return errors.New("no output block") + } + + batchers := make([]*symbolsBatcher, len(outBlocks)) + for ix := range outBlocks { + batchers[ix] = newSymbolsBatcher(10000, outBlocks[ix].tmpDir) + } + + seriesSet := sets[0] + if len(sets) > 1 { + seriesSet = storage.NewMergeChunkSeriesSet(sets, c.mergeFunc) + } + + for seriesSet.Next() { + if err := c.ctx.Err(); err != nil { + return err + } + + s := seriesSet.At() + + obIx := s.Labels().Hash() % uint64(len(outBlocks)) + + for _, l := range s.Labels() { + if err := batchers[obIx].addSymbol(l.Name); err != nil { + return errors.Wrap(err, "addSymbol to batcher") + } + if err := batchers[obIx].addSymbol(l.Value); err != nil { + return errors.Wrap(err, "addSymbol to batcher") + } + } + } + + for ix := range outBlocks { + if err := c.ctx.Err(); err != nil { + return err + } + + // Flush the batcher to write remaining symbols. + if err := batchers[ix].flushSymbols(true); err != nil { + return errors.Wrap(err, "flushing batcher") + } + + it, err := newSymbolsIterator(batchers[ix].symbolFiles()) + if err != nil { + return errors.Wrap(err, "opening symbols iterator") + } + + // Each symbols iterator must be closed to close underlying files. + closeIt := it + defer func() { + if closeIt != nil { + _ = closeIt.Close() + } + }() + + var sym string + for sym, err = it.NextSymbol(); err == nil; sym, err = it.NextSymbol() { + err = outBlocks[ix].indexw.AddSymbol(sym) + if err != nil { + return errors.Wrap(err, "AddSymbol") + } + } + + if err != io.EOF { + return errors.Wrap(err, "iterating symbols") + } + + // if err == io.EOF, we have iterated through all symbols. We can close underlying + // files now. + closeIt = nil + _ = it.Close() + } + + return nil +} diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index b24f0b1bd4..29456a80e5 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -559,6 +559,9 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { require.Equal(t, ts, blockID.Time()) } + // Symbols found in series. + seriesSymbols := map[string]struct{}{} + block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil) require.NoError(t, err) @@ -585,8 +588,34 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { require.NoError(t, idxr.Series(ref, &lbls, nil)) require.Equal(t, uint64(shardIndex), lbls.Hash()%shardCount) + + // Collect all symbols used by series. + for _, l := range lbls { + seriesSymbols[l.Name] = struct{}{} + seriesSymbols[l.Value] = struct{}{} + } } require.NoError(t, p.Err()) + + // Check that all symbols in symbols table are actually used by series. + symIt := idxr.Symbols() + for symIt.Next() { + w := symIt.At() + + // When shardCount == 1, we're not doing symbols splitting. Head-compacted blocks + // however do have empty string as a symbol in the table. + // Since label name or value cannot be empty string, we will never find it in seriesSymbols. + if w == "" && shardCount <= 1 { + continue + } + + _, ok := seriesSymbols[w] + require.True(t, ok, "not found in series: '%s'", w) + delete(seriesSymbols, w) + } + + // Check that symbols table covered all symbols found from series. + require.Equal(t, 0, len(seriesSymbols)) } require.Equal(t, uint64(series), totalSeries) diff --git a/tsdb/symbols_batch.go b/tsdb/symbols_batch.go new file mode 100644 index 0000000000..283acd344a --- /dev/null +++ b/tsdb/symbols_batch.go @@ -0,0 +1,269 @@ +package tsdb + +import ( + "container/heap" + "encoding/gob" + "fmt" + "io" + "os" + "path/filepath" + "sort" + + "github.com/golang/snappy" + + "github.com/prometheus/prometheus/tsdb/errors" +) + +// symbolsBatcher keeps buffer of symbols in memory. Once the buffer reaches the size limit (number of symbols), +// batcher writes currently buffered symbols to file. At the end remaining symbols must be flushed. After writing +// all batches, symbolsBatcher has list of files that can be used together with newSymbolsIterator to iterate +// through all previously added symbols in sorted order. +type symbolsBatcher struct { + dir string + limit int + + buffer map[string]struct{} // using map to deduplicate + symbolsFiles []string // paths of symbol files that have been successfully written. +} + +func newSymbolsBatcher(limit int, dir string) *symbolsBatcher { + return &symbolsBatcher{ + limit: limit, + dir: dir, + buffer: make(map[string]struct{}, limit), + } +} + +func (sw *symbolsBatcher) addSymbol(sym string) error { + sw.buffer[sym] = struct{}{} + return sw.flushSymbols(false) +} + +func (sw *symbolsBatcher) flushSymbols(force bool) error { + if !force && len(sw.buffer) < sw.limit { + return nil + } + + sortedSymbols := make([]string, 0, len(sw.buffer)) + for s := range sw.buffer { + sortedSymbols = append(sortedSymbols, s) + } + sort.Strings(sortedSymbols) + + symbolsFile := filepath.Join(sw.dir, fmt.Sprintf("symbols_%d", len(sw.symbolsFiles))) + err := writeSymbolsToFile(symbolsFile, sortedSymbols) + if err == nil { + sw.buffer = make(map[string]struct{}, sw.limit) + sw.symbolsFiles = append(sw.symbolsFiles, symbolsFile) + } + + return err +} + +func (sw *symbolsBatcher) symbolFiles() []string { + return sw.symbolsFiles +} + +func writeSymbolsToFile(filename string, symbols []string) error { + f, err := os.Create(filename) + if err != nil { + return err + } + + // Snappy is used to for buffering and smaller files. + sn := snappy.NewBufferedWriter(f) + enc := gob.NewEncoder(sn) + + errs := errors.NewMulti() + + for _, s := range symbols { + err := enc.Encode(s) + if err != nil { + errs.Add(err) + break + } + } + + errs.Add(sn.Close()) + errs.Add(f.Close()) + return errs.Err() +} + +type symbolsHeap []*symbolsFile + +func (s *symbolsHeap) Len() int { + return len(*s) +} + +func (s *symbolsHeap) Less(i, j int) bool { + iw, ierr := (*s)[i].Peek() + if ierr != nil { + // empty string will be sorted first, so error will be returned before any other result. + iw = "" + } + + jw, jerr := (*s)[j].Peek() + if jerr != nil { + jw = "" + } + + return iw < jw +} + +func (s *symbolsHeap) Swap(i, j int) { + (*s)[i], (*s)[j] = (*s)[j], (*s)[i] +} + +func (s *symbolsHeap) Push(x interface{}) { + if f, ok := x.(*symbolsFile); ok { + *s = append(*s, f) + } +} + +func (s *symbolsHeap) Pop() interface{} { + l := len(*s) + res := (*s)[l-1] + *s = (*s)[:l-1] + return res +} + +type symbolsIterator struct { + files []*os.File + heap symbolsHeap + + // To avoid returning duplicates, we remember last returned symbol. We want to support "" as a valid + // symbol, so we use pointer to a string instead. + lastReturned *string +} + +func newSymbolsIterator(filenames []string) (*symbolsIterator, error) { + files, err := openFiles(filenames) + if err != nil { + return nil, err + } + + var symFiles []*symbolsFile + for _, f := range files { + symFiles = append(symFiles, newSymbolsFile(f)) + } + + h := &symbolsIterator{ + files: files, + heap: symFiles, + } + + heap.Init(&h.heap) + + return h, nil +} + +// NextSymbol advances iterator forward, and returns next symbol. +// If there is no next element, returns err == io.EOF. +func (sit *symbolsIterator) NextSymbol() (string, error) { +again: + if len(sit.heap) == 0 { + return "", io.EOF + } + + result, err := sit.heap[0].Next() + if err == io.EOF { + // End of file, remove it, and try next file. + heap.Remove(&sit.heap, 0) + goto again + } + + if err != nil { + return "", err + } + + heap.Fix(&sit.heap, 0) + + if sit.lastReturned == nil || *sit.lastReturned != result { + sit.lastReturned = &result + return result, nil + } + + // Duplicate symbol, try next one. + goto again +} + +// Close all files. +func (sit *symbolsIterator) Close() error { + errs := errors.NewMulti() + for _, f := range sit.files { + errs.Add(f.Close()) + } + return errs.Err() +} + +type symbolsFile struct { + dec *gob.Decoder + + nextValid bool // if true, nextSymbol and nextErr have the next symbol (possibly "") + nextSymbol string + nextErr error +} + +func newSymbolsFile(f *os.File) *symbolsFile { + sn := snappy.NewReader(f) + dec := gob.NewDecoder(sn) + + return &symbolsFile{ + dec: dec, + } +} + +// Peek returns next symbol or error, but also preserves them for subsequent Peek or Next calls. +func (sf *symbolsFile) Peek() (string, error) { + if sf.nextValid { + return sf.nextSymbol, sf.nextErr + } + + sf.nextValid = true + sf.nextSymbol, sf.nextErr = sf.readNext() + return sf.nextSymbol, sf.nextErr +} + +// Next advances iterator and returns next symbol or error. +func (sf *symbolsFile) Next() (string, error) { + if sf.nextValid { + defer func() { + sf.nextValid = false + sf.nextSymbol = "" + sf.nextErr = nil + }() + return sf.nextSymbol, sf.nextErr + } + + return sf.readNext() +} + +func (sf *symbolsFile) readNext() (string, error) { + var s string + err := sf.dec.Decode(&s) + // Decode returns io.EOF at the end. + if err != nil { + return "", err + } + + return s, nil +} + +func openFiles(filenames []string) ([]*os.File, error) { + var result []*os.File + + for _, fn := range filenames { + f, err := os.Open(fn) + + if err != nil { + // Close opened files so far. + for _, sf := range result { + _ = sf.Close() + } + return nil, err + } + + result = append(result, f) + } + return result, nil +} diff --git a/tsdb/symbols_batch_test.go b/tsdb/symbols_batch_test.go new file mode 100644 index 0000000000..06c0cab834 --- /dev/null +++ b/tsdb/symbols_batch_test.go @@ -0,0 +1,53 @@ +package tsdb + +import ( + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSymbolsBatchAndIteration(t *testing.T) { + dir := t.TempDir() + + b := newSymbolsBatcher(100, dir) + + allWords := map[string]struct{}{} + + for i := 0; i < 10; i++ { + require.NoError(t, b.addSymbol("")) + allWords[""] = struct{}{} + + for j := 0; j < 123; j++ { + w := fmt.Sprintf("word_%d_%d", i%3, j) + + require.NoError(t, b.addSymbol(w)) + + allWords[w] = struct{}{} + } + } + + require.NoError(t, b.flushSymbols(true)) + + it, err := newSymbolsIterator(b.symbolFiles()) + require.NoError(t, err) + + first := true + var w, prev string + for w, err = it.NextSymbol(); err == nil; w, err = it.NextSymbol() { + if !first { + require.True(t, w != "") + require.True(t, prev < w) + } + + first = false + + _, known := allWords[w] + require.True(t, known) + delete(allWords, w) + prev = w + } + require.Equal(t, io.EOF, err) + require.Equal(t, 0, len(allWords)) +} From 9a50267ea558bdce8de9d15b416a3cd759acaed1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 Oct 2021 17:56:20 +0200 Subject: [PATCH 2/4] Delete symbols files. --- tsdb/compact.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tsdb/compact.go b/tsdb/compact.go index aa47d62cc3..cc5f229c4e 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -989,6 +989,13 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo // files now. closeIt = nil _ = it.Close() + + // Delete symbol files from symbolsBatcher. + for _, fn := range batchers[ix].symbolFiles() { + if err := os.Remove(fn); err != nil { + return errors.Wrap(err, "deleting symbols file") + } + } } return nil From d116268e591d224d681176855274e825bf2109a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 6 Oct 2021 10:17:16 +0200 Subject: [PATCH 3/4] Always include empty symbol in the symbol table. --- tsdb/compact.go | 7 +++++++ tsdb/compact_test.go | 13 +++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index cc5f229c4e..37ded7202e 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -924,6 +924,13 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo batchers := make([]*symbolsBatcher, len(outBlocks)) for ix := range outBlocks { batchers[ix] = newSymbolsBatcher(10000, outBlocks[ix].tmpDir) + + // Always include empty symbol. Blocks created from Head always have it in the symbols table, + // and if we only include symbols from series, we would skip it. + // It may not be required, but it's small and better be safe than sorry. + if err := batchers[ix].addSymbol(""); err != nil { + return errors.Wrap(err, "addSymbol to batcher") + } } seriesSet := sets[0] diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 29456a80e5..4dfb06a673 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -562,6 +562,11 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { // Symbols found in series. seriesSymbols := map[string]struct{}{} + // We always expect to find "" symbol in the symbols table even if it's not in the series. + // Head compaction always includes it, and then it survives additional non-sharded compactions. + // Our splitting compaction preserves it too. + seriesSymbols[""] = struct{}{} + block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil) require.NoError(t, err) @@ -601,14 +606,6 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { symIt := idxr.Symbols() for symIt.Next() { w := symIt.At() - - // When shardCount == 1, we're not doing symbols splitting. Head-compacted blocks - // however do have empty string as a symbol in the table. - // Since label name or value cannot be empty string, we will never find it in seriesSymbols. - if w == "" && shardCount <= 1 { - continue - } - _, ok := seriesSymbols[w] require.True(t, ok, "not found in series: '%s'", w) delete(seriesSymbols, w) From b7b70066ae0e5f01934037af1e491ad43db78a60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 6 Oct 2021 11:04:37 +0200 Subject: [PATCH 4/4] Address review feedback. --- tsdb/compact.go | 42 +++++++++++++++++-------------- tsdb/symbols_batch.go | 51 ++++++++++++++++++++------------------ tsdb/symbols_batch_test.go | 3 +++ 3 files changed, 54 insertions(+), 42 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 37ded7202e..3a77bd0be7 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -810,23 +810,25 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) - // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. - // Postings can only be iterated once. - k, v = index.AllPostingsKey() - all, err = indexr.Postings(k, v) - if err != nil { - return err + if len(outBlocks) > 1 { + // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. + // Postings can only be iterated once. + k, v = index.AllPostingsKey() + all, err = indexr.Postings(k, v) + if err != nil { + return err + } + all = indexr.SortedPostings(all) + // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. + symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) + } else { + syms := indexr.Symbols() + if i == 0 { + symbols = syms + continue + } + symbols = NewMergedStringIter(symbols, syms) } - all = indexr.SortedPostings(all) - // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. - symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) - - syms := indexr.Symbols() - if i == 0 { - symbols = syms - continue - } - symbols = NewMergedStringIter(symbols, syms) } if len(outBlocks) == 1 { @@ -913,6 +915,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, return nil } +// How many symbols we buffer in memory per output block. +const inMemorySymbolsLimit = 1_000_000 + // populateSymbols writes symbols to output blocks. We need to iterate through all series to find // which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to // block's index. @@ -923,7 +928,7 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo batchers := make([]*symbolsBatcher, len(outBlocks)) for ix := range outBlocks { - batchers[ix] = newSymbolsBatcher(10000, outBlocks[ix].tmpDir) + batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir) // Always include empty symbol. Blocks created from Head always have it in the symbols table, // and if we only include symbols from series, we would skip it. @@ -997,7 +1002,8 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo closeIt = nil _ = it.Close() - // Delete symbol files from symbolsBatcher. + // Delete symbol files from symbolsBatcher. We don't need to perform the cleanup if populateSymbols + // or compaction fails, because in that case compactor already removes entire (temp) output block directory. for _, fn := range batchers[ix].symbolFiles() { if err := os.Remove(fn); err != nil { return errors.Wrap(err, "deleting symbols file") diff --git a/tsdb/symbols_batch.go b/tsdb/symbols_batch.go index 283acd344a..b6f6a3a345 100644 --- a/tsdb/symbols_batch.go +++ b/tsdb/symbols_batch.go @@ -70,7 +70,7 @@ func writeSymbolsToFile(filename string, symbols []string) error { return err } - // Snappy is used to for buffering and smaller files. + // Snappy is used for buffering and to create smaller files. sn := snappy.NewBufferedWriter(f) enc := gob.NewEncoder(sn) @@ -89,16 +89,19 @@ func writeSymbolsToFile(filename string, symbols []string) error { return errs.Err() } +// Implements heap.Interface using symbols from files. type symbolsHeap []*symbolsFile +// Len implements sort.Interface. func (s *symbolsHeap) Len() int { return len(*s) } +// Less implements sort.Interface. func (s *symbolsHeap) Less(i, j int) bool { iw, ierr := (*s)[i].Peek() if ierr != nil { - // empty string will be sorted first, so error will be returned before any other result. + // Empty string will be sorted first, so error will be returned before any other result. iw = "" } @@ -110,16 +113,17 @@ func (s *symbolsHeap) Less(i, j int) bool { return iw < jw } +// Swap implements sort.Interface. func (s *symbolsHeap) Swap(i, j int) { (*s)[i], (*s)[j] = (*s)[j], (*s)[i] } +// Push implements heap.Interface. Push should add x as element Len(). func (s *symbolsHeap) Push(x interface{}) { - if f, ok := x.(*symbolsFile); ok { - *s = append(*s, f) - } + *s = append(*s, x.(*symbolsFile)) } +// Pop implements heap.Interface. Pop should remove and return element Len() - 1. func (s *symbolsHeap) Pop() interface{} { l := len(*s) res := (*s)[l-1] @@ -160,31 +164,30 @@ func newSymbolsIterator(filenames []string) (*symbolsIterator, error) { // NextSymbol advances iterator forward, and returns next symbol. // If there is no next element, returns err == io.EOF. func (sit *symbolsIterator) NextSymbol() (string, error) { -again: - if len(sit.heap) == 0 { - return "", io.EOF - } + for len(sit.heap) > 0 { + result, err := sit.heap[0].Next() + if err == io.EOF { + // End of file, remove it from heap, and try next file. + heap.Remove(&sit.heap, 0) + continue + } - result, err := sit.heap[0].Next() - if err == io.EOF { - // End of file, remove it, and try next file. - heap.Remove(&sit.heap, 0) - goto again - } + if err != nil { + return "", err + } - if err != nil { - return "", err - } + heap.Fix(&sit.heap, 0) - heap.Fix(&sit.heap, 0) + if sit.lastReturned != nil && *sit.lastReturned == result { + // Duplicate symbol, try next one. + continue + } - if sit.lastReturned == nil || *sit.lastReturned != result { sit.lastReturned = &result return result, nil } - // Duplicate symbol, try next one. - goto again + return "", io.EOF } // Close all files. @@ -224,7 +227,7 @@ func (sf *symbolsFile) Peek() (string, error) { return sf.nextSymbol, sf.nextErr } -// Next advances iterator and returns next symbol or error. +// Next advances iterator and returns the next symbol or error. func (sf *symbolsFile) Next() (string, error) { if sf.nextValid { defer func() { @@ -256,7 +259,7 @@ func openFiles(filenames []string) ([]*os.File, error) { f, err := os.Open(fn) if err != nil { - // Close opened files so far. + // Close files opened so far. for _, sf := range result { _ = sf.Close() } diff --git a/tsdb/symbols_batch_test.go b/tsdb/symbols_batch_test.go index 06c0cab834..8b41d89630 100644 --- a/tsdb/symbols_batch_test.go +++ b/tsdb/symbols_batch_test.go @@ -32,6 +32,9 @@ func TestSymbolsBatchAndIteration(t *testing.T) { it, err := newSymbolsIterator(b.symbolFiles()) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, it.Close()) + }) first := true var w, prev string