Address review feedback.

This commit is contained in:
Peter Štibraný 2021-10-06 11:04:37 +02:00
parent d116268e59
commit b7b70066ae
3 changed files with 54 additions and 42 deletions

View file

@ -810,23 +810,25 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
// To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. if len(outBlocks) > 1 {
// Postings can only be iterated once. // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy.
k, v = index.AllPostingsKey() // Postings can only be iterated once.
all, err = indexr.Postings(k, v) k, v = index.AllPostingsKey()
if err != nil { all, err = indexr.Postings(k, v)
return err if err != nil {
return err
}
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
} else {
syms := indexr.Symbols()
if i == 0 {
symbols = syms
continue
}
symbols = NewMergedStringIter(symbols, syms)
} }
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
syms := indexr.Symbols()
if i == 0 {
symbols = syms
continue
}
symbols = NewMergedStringIter(symbols, syms)
} }
if len(outBlocks) == 1 { if len(outBlocks) == 1 {
@ -913,6 +915,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
return nil return nil
} }
// How many symbols we buffer in memory per output block.
const inMemorySymbolsLimit = 1_000_000
// populateSymbols writes symbols to output blocks. We need to iterate through all series to find // populateSymbols writes symbols to output blocks. We need to iterate through all series to find
// which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to // which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to
// block's index. // block's index.
@ -923,7 +928,7 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo
batchers := make([]*symbolsBatcher, len(outBlocks)) batchers := make([]*symbolsBatcher, len(outBlocks))
for ix := range outBlocks { for ix := range outBlocks {
batchers[ix] = newSymbolsBatcher(10000, outBlocks[ix].tmpDir) batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir)
// Always include empty symbol. Blocks created from Head always have it in the symbols table, // Always include empty symbol. Blocks created from Head always have it in the symbols table,
// and if we only include symbols from series, we would skip it. // and if we only include symbols from series, we would skip it.
@ -997,7 +1002,8 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo
closeIt = nil closeIt = nil
_ = it.Close() _ = it.Close()
// Delete symbol files from symbolsBatcher. // Delete symbol files from symbolsBatcher. We don't need to perform the cleanup if populateSymbols
// or compaction fails, because in that case compactor already removes entire (temp) output block directory.
for _, fn := range batchers[ix].symbolFiles() { for _, fn := range batchers[ix].symbolFiles() {
if err := os.Remove(fn); err != nil { if err := os.Remove(fn); err != nil {
return errors.Wrap(err, "deleting symbols file") return errors.Wrap(err, "deleting symbols file")

View file

@ -70,7 +70,7 @@ func writeSymbolsToFile(filename string, symbols []string) error {
return err return err
} }
// Snappy is used to for buffering and smaller files. // Snappy is used for buffering and to create smaller files.
sn := snappy.NewBufferedWriter(f) sn := snappy.NewBufferedWriter(f)
enc := gob.NewEncoder(sn) enc := gob.NewEncoder(sn)
@ -89,16 +89,19 @@ func writeSymbolsToFile(filename string, symbols []string) error {
return errs.Err() return errs.Err()
} }
// Implements heap.Interface using symbols from files.
type symbolsHeap []*symbolsFile type symbolsHeap []*symbolsFile
// Len implements sort.Interface.
func (s *symbolsHeap) Len() int { func (s *symbolsHeap) Len() int {
return len(*s) return len(*s)
} }
// Less implements sort.Interface.
func (s *symbolsHeap) Less(i, j int) bool { func (s *symbolsHeap) Less(i, j int) bool {
iw, ierr := (*s)[i].Peek() iw, ierr := (*s)[i].Peek()
if ierr != nil { if ierr != nil {
// empty string will be sorted first, so error will be returned before any other result. // Empty string will be sorted first, so error will be returned before any other result.
iw = "" iw = ""
} }
@ -110,16 +113,17 @@ func (s *symbolsHeap) Less(i, j int) bool {
return iw < jw return iw < jw
} }
// Swap implements sort.Interface.
func (s *symbolsHeap) Swap(i, j int) { func (s *symbolsHeap) Swap(i, j int) {
(*s)[i], (*s)[j] = (*s)[j], (*s)[i] (*s)[i], (*s)[j] = (*s)[j], (*s)[i]
} }
// Push implements heap.Interface. Push should add x as element Len().
func (s *symbolsHeap) Push(x interface{}) { func (s *symbolsHeap) Push(x interface{}) {
if f, ok := x.(*symbolsFile); ok { *s = append(*s, x.(*symbolsFile))
*s = append(*s, f)
}
} }
// Pop implements heap.Interface. Pop should remove and return element Len() - 1.
func (s *symbolsHeap) Pop() interface{} { func (s *symbolsHeap) Pop() interface{} {
l := len(*s) l := len(*s)
res := (*s)[l-1] res := (*s)[l-1]
@ -160,31 +164,30 @@ func newSymbolsIterator(filenames []string) (*symbolsIterator, error) {
// NextSymbol advances iterator forward, and returns next symbol. // NextSymbol advances iterator forward, and returns next symbol.
// If there is no next element, returns err == io.EOF. // If there is no next element, returns err == io.EOF.
func (sit *symbolsIterator) NextSymbol() (string, error) { func (sit *symbolsIterator) NextSymbol() (string, error) {
again: for len(sit.heap) > 0 {
if len(sit.heap) == 0 { result, err := sit.heap[0].Next()
return "", io.EOF if err == io.EOF {
} // End of file, remove it from heap, and try next file.
heap.Remove(&sit.heap, 0)
continue
}
result, err := sit.heap[0].Next() if err != nil {
if err == io.EOF { return "", err
// End of file, remove it, and try next file. }
heap.Remove(&sit.heap, 0)
goto again
}
if err != nil { heap.Fix(&sit.heap, 0)
return "", err
}
heap.Fix(&sit.heap, 0) if sit.lastReturned != nil && *sit.lastReturned == result {
// Duplicate symbol, try next one.
continue
}
if sit.lastReturned == nil || *sit.lastReturned != result {
sit.lastReturned = &result sit.lastReturned = &result
return result, nil return result, nil
} }
// Duplicate symbol, try next one. return "", io.EOF
goto again
} }
// Close all files. // Close all files.
@ -224,7 +227,7 @@ func (sf *symbolsFile) Peek() (string, error) {
return sf.nextSymbol, sf.nextErr return sf.nextSymbol, sf.nextErr
} }
// Next advances iterator and returns next symbol or error. // Next advances iterator and returns the next symbol or error.
func (sf *symbolsFile) Next() (string, error) { func (sf *symbolsFile) Next() (string, error) {
if sf.nextValid { if sf.nextValid {
defer func() { defer func() {
@ -256,7 +259,7 @@ func openFiles(filenames []string) ([]*os.File, error) {
f, err := os.Open(fn) f, err := os.Open(fn)
if err != nil { if err != nil {
// Close opened files so far. // Close files opened so far.
for _, sf := range result { for _, sf := range result {
_ = sf.Close() _ = sf.Close()
} }

View file

@ -32,6 +32,9 @@ func TestSymbolsBatchAndIteration(t *testing.T) {
it, err := newSymbolsIterator(b.symbolFiles()) it, err := newSymbolsIterator(b.symbolFiles())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, it.Close())
})
first := true first := true
var w, prev string var w, prev string