Merge pull request #17 from grafana/symbols-cleanup

Splitting compactor: remove unused symbols from index of compacted block
This commit is contained in:
Peter Štibraný 2021-10-06 14:36:45 +02:00 committed by GitHub
commit b3ae917f96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 483 additions and 11 deletions

View file

@ -749,6 +749,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
var (
sets []storage.ChunkSeriesSet
symbolsSets []storage.ChunkSeriesSet // series sets used for finding symbols. Only used when doing sharding.
symbols index.StringIter
closers []io.Closer
overlapping bool
@ -808,23 +809,41 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
syms := indexr.Symbols()
if i == 0 {
symbols = syms
continue
if len(outBlocks) > 1 {
// To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy.
// Postings can only be iterated once.
k, v = index.AllPostingsKey()
all, err = indexr.Postings(k, v)
if err != nil {
return err
}
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
} else {
syms := indexr.Symbols()
if i == 0 {
symbols = syms
continue
}
symbols = NewMergedStringIter(symbols, syms)
}
symbols = NewMergedStringIter(symbols, syms)
}
for symbols.Next() {
for _, ob := range outBlocks {
if err := ob.indexw.AddSymbol(symbols.At()); err != nil {
if len(outBlocks) == 1 {
for symbols.Next() {
if err := outBlocks[0].indexw.AddSymbol(symbols.At()); err != nil {
return errors.Wrap(err, "add symbol")
}
}
}
if symbols.Err() != nil {
return errors.Wrap(symbols.Err(), "next symbol")
if symbols.Err() != nil {
return errors.Wrap(symbols.Err(), "next symbol")
}
} else {
if err := c.populateSymbols(symbolsSets, outBlocks); err != nil {
return err
}
}
var (
@ -895,3 +914,102 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
return nil
}
// How many symbols we buffer in memory per output block.
const inMemorySymbolsLimit = 1_000_000
// populateSymbols writes symbols to output blocks. We need to iterate through all series to find
// which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to
// block's index.
func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlocks []shardedBlock) error {
if len(outBlocks) == 0 {
return errors.New("no output block")
}
batchers := make([]*symbolsBatcher, len(outBlocks))
for ix := range outBlocks {
batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir)
// Always include empty symbol. Blocks created from Head always have it in the symbols table,
// and if we only include symbols from series, we would skip it.
// It may not be required, but it's small and better be safe than sorry.
if err := batchers[ix].addSymbol(""); err != nil {
return errors.Wrap(err, "addSymbol to batcher")
}
}
seriesSet := sets[0]
if len(sets) > 1 {
seriesSet = storage.NewMergeChunkSeriesSet(sets, c.mergeFunc)
}
for seriesSet.Next() {
if err := c.ctx.Err(); err != nil {
return err
}
s := seriesSet.At()
obIx := s.Labels().Hash() % uint64(len(outBlocks))
for _, l := range s.Labels() {
if err := batchers[obIx].addSymbol(l.Name); err != nil {
return errors.Wrap(err, "addSymbol to batcher")
}
if err := batchers[obIx].addSymbol(l.Value); err != nil {
return errors.Wrap(err, "addSymbol to batcher")
}
}
}
for ix := range outBlocks {
if err := c.ctx.Err(); err != nil {
return err
}
// Flush the batcher to write remaining symbols.
if err := batchers[ix].flushSymbols(true); err != nil {
return errors.Wrap(err, "flushing batcher")
}
it, err := newSymbolsIterator(batchers[ix].symbolFiles())
if err != nil {
return errors.Wrap(err, "opening symbols iterator")
}
// Each symbols iterator must be closed to close underlying files.
closeIt := it
defer func() {
if closeIt != nil {
_ = closeIt.Close()
}
}()
var sym string
for sym, err = it.NextSymbol(); err == nil; sym, err = it.NextSymbol() {
err = outBlocks[ix].indexw.AddSymbol(sym)
if err != nil {
return errors.Wrap(err, "AddSymbol")
}
}
if err != io.EOF {
return errors.Wrap(err, "iterating symbols")
}
// if err == io.EOF, we have iterated through all symbols. We can close underlying
// files now.
closeIt = nil
_ = it.Close()
// Delete symbol files from symbolsBatcher. We don't need to perform the cleanup if populateSymbols
// or compaction fails, because in that case compactor already removes entire (temp) output block directory.
for _, fn := range batchers[ix].symbolFiles() {
if err := os.Remove(fn); err != nil {
return errors.Wrap(err, "deleting symbols file")
}
}
}
return nil
}

View file

@ -559,6 +559,14 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
require.Equal(t, ts, blockID.Time())
}
// Symbols found in series.
seriesSymbols := map[string]struct{}{}
// We always expect to find "" symbol in the symbols table even if it's not in the series.
// Head compaction always includes it, and then it survives additional non-sharded compactions.
// Our splitting compaction preserves it too.
seriesSymbols[""] = struct{}{}
block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil)
require.NoError(t, err)
@ -585,8 +593,26 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
require.NoError(t, idxr.Series(ref, &lbls, nil))
require.Equal(t, uint64(shardIndex), lbls.Hash()%shardCount)
// Collect all symbols used by series.
for _, l := range lbls {
seriesSymbols[l.Name] = struct{}{}
seriesSymbols[l.Value] = struct{}{}
}
}
require.NoError(t, p.Err())
// Check that all symbols in symbols table are actually used by series.
symIt := idxr.Symbols()
for symIt.Next() {
w := symIt.At()
_, ok := seriesSymbols[w]
require.True(t, ok, "not found in series: '%s'", w)
delete(seriesSymbols, w)
}
// Check that symbols table covered all symbols found from series.
require.Equal(t, 0, len(seriesSymbols))
}
require.Equal(t, uint64(series), totalSeries)

272
tsdb/symbols_batch.go Normal file
View file

@ -0,0 +1,272 @@
package tsdb
import (
"container/heap"
"encoding/gob"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/tsdb/errors"
)
// symbolsBatcher keeps buffer of symbols in memory. Once the buffer reaches the size limit (number of symbols),
// batcher writes currently buffered symbols to file. At the end remaining symbols must be flushed. After writing
// all batches, symbolsBatcher has list of files that can be used together with newSymbolsIterator to iterate
// through all previously added symbols in sorted order.
type symbolsBatcher struct {
dir string
limit int
buffer map[string]struct{} // using map to deduplicate
symbolsFiles []string // paths of symbol files that have been successfully written.
}
func newSymbolsBatcher(limit int, dir string) *symbolsBatcher {
return &symbolsBatcher{
limit: limit,
dir: dir,
buffer: make(map[string]struct{}, limit),
}
}
func (sw *symbolsBatcher) addSymbol(sym string) error {
sw.buffer[sym] = struct{}{}
return sw.flushSymbols(false)
}
func (sw *symbolsBatcher) flushSymbols(force bool) error {
if !force && len(sw.buffer) < sw.limit {
return nil
}
sortedSymbols := make([]string, 0, len(sw.buffer))
for s := range sw.buffer {
sortedSymbols = append(sortedSymbols, s)
}
sort.Strings(sortedSymbols)
symbolsFile := filepath.Join(sw.dir, fmt.Sprintf("symbols_%d", len(sw.symbolsFiles)))
err := writeSymbolsToFile(symbolsFile, sortedSymbols)
if err == nil {
sw.buffer = make(map[string]struct{}, sw.limit)
sw.symbolsFiles = append(sw.symbolsFiles, symbolsFile)
}
return err
}
func (sw *symbolsBatcher) symbolFiles() []string {
return sw.symbolsFiles
}
func writeSymbolsToFile(filename string, symbols []string) error {
f, err := os.Create(filename)
if err != nil {
return err
}
// Snappy is used for buffering and to create smaller files.
sn := snappy.NewBufferedWriter(f)
enc := gob.NewEncoder(sn)
errs := errors.NewMulti()
for _, s := range symbols {
err := enc.Encode(s)
if err != nil {
errs.Add(err)
break
}
}
errs.Add(sn.Close())
errs.Add(f.Close())
return errs.Err()
}
// Implements heap.Interface using symbols from files.
type symbolsHeap []*symbolsFile
// Len implements sort.Interface.
func (s *symbolsHeap) Len() int {
return len(*s)
}
// Less implements sort.Interface.
func (s *symbolsHeap) Less(i, j int) bool {
iw, ierr := (*s)[i].Peek()
if ierr != nil {
// Empty string will be sorted first, so error will be returned before any other result.
iw = ""
}
jw, jerr := (*s)[j].Peek()
if jerr != nil {
jw = ""
}
return iw < jw
}
// Swap implements sort.Interface.
func (s *symbolsHeap) Swap(i, j int) {
(*s)[i], (*s)[j] = (*s)[j], (*s)[i]
}
// Push implements heap.Interface. Push should add x as element Len().
func (s *symbolsHeap) Push(x interface{}) {
*s = append(*s, x.(*symbolsFile))
}
// Pop implements heap.Interface. Pop should remove and return element Len() - 1.
func (s *symbolsHeap) Pop() interface{} {
l := len(*s)
res := (*s)[l-1]
*s = (*s)[:l-1]
return res
}
type symbolsIterator struct {
files []*os.File
heap symbolsHeap
// To avoid returning duplicates, we remember last returned symbol. We want to support "" as a valid
// symbol, so we use pointer to a string instead.
lastReturned *string
}
func newSymbolsIterator(filenames []string) (*symbolsIterator, error) {
files, err := openFiles(filenames)
if err != nil {
return nil, err
}
var symFiles []*symbolsFile
for _, f := range files {
symFiles = append(symFiles, newSymbolsFile(f))
}
h := &symbolsIterator{
files: files,
heap: symFiles,
}
heap.Init(&h.heap)
return h, nil
}
// NextSymbol advances iterator forward, and returns next symbol.
// If there is no next element, returns err == io.EOF.
func (sit *symbolsIterator) NextSymbol() (string, error) {
for len(sit.heap) > 0 {
result, err := sit.heap[0].Next()
if err == io.EOF {
// End of file, remove it from heap, and try next file.
heap.Remove(&sit.heap, 0)
continue
}
if err != nil {
return "", err
}
heap.Fix(&sit.heap, 0)
if sit.lastReturned != nil && *sit.lastReturned == result {
// Duplicate symbol, try next one.
continue
}
sit.lastReturned = &result
return result, nil
}
return "", io.EOF
}
// Close all files.
func (sit *symbolsIterator) Close() error {
errs := errors.NewMulti()
for _, f := range sit.files {
errs.Add(f.Close())
}
return errs.Err()
}
type symbolsFile struct {
dec *gob.Decoder
nextValid bool // if true, nextSymbol and nextErr have the next symbol (possibly "")
nextSymbol string
nextErr error
}
func newSymbolsFile(f *os.File) *symbolsFile {
sn := snappy.NewReader(f)
dec := gob.NewDecoder(sn)
return &symbolsFile{
dec: dec,
}
}
// Peek returns next symbol or error, but also preserves them for subsequent Peek or Next calls.
func (sf *symbolsFile) Peek() (string, error) {
if sf.nextValid {
return sf.nextSymbol, sf.nextErr
}
sf.nextValid = true
sf.nextSymbol, sf.nextErr = sf.readNext()
return sf.nextSymbol, sf.nextErr
}
// Next advances iterator and returns the next symbol or error.
func (sf *symbolsFile) Next() (string, error) {
if sf.nextValid {
defer func() {
sf.nextValid = false
sf.nextSymbol = ""
sf.nextErr = nil
}()
return sf.nextSymbol, sf.nextErr
}
return sf.readNext()
}
func (sf *symbolsFile) readNext() (string, error) {
var s string
err := sf.dec.Decode(&s)
// Decode returns io.EOF at the end.
if err != nil {
return "", err
}
return s, nil
}
func openFiles(filenames []string) ([]*os.File, error) {
var result []*os.File
for _, fn := range filenames {
f, err := os.Open(fn)
if err != nil {
// Close files opened so far.
for _, sf := range result {
_ = sf.Close()
}
return nil, err
}
result = append(result, f)
}
return result, nil
}

View file

@ -0,0 +1,56 @@
package tsdb
import (
"fmt"
"io"
"testing"
"github.com/stretchr/testify/require"
)
func TestSymbolsBatchAndIteration(t *testing.T) {
dir := t.TempDir()
b := newSymbolsBatcher(100, dir)
allWords := map[string]struct{}{}
for i := 0; i < 10; i++ {
require.NoError(t, b.addSymbol(""))
allWords[""] = struct{}{}
for j := 0; j < 123; j++ {
w := fmt.Sprintf("word_%d_%d", i%3, j)
require.NoError(t, b.addSymbol(w))
allWords[w] = struct{}{}
}
}
require.NoError(t, b.flushSymbols(true))
it, err := newSymbolsIterator(b.symbolFiles())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, it.Close())
})
first := true
var w, prev string
for w, err = it.NextSymbol(); err == nil; w, err = it.NextSymbol() {
if !first {
require.True(t, w != "")
require.True(t, prev < w)
}
first = false
_, known := allWords[w]
require.True(t, known)
delete(allWords, w)
prev = w
}
require.Equal(t, io.EOF, err)
require.Equal(t, 0, len(allWords))
}