mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
When doing compaction with splitting, only use symbols from series that belong to given sharded block.
This commit is contained in:
parent
04e7926b03
commit
259e09fe5f
110
tsdb/compact.go
110
tsdb/compact.go
|
@ -749,6 +749,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sets []storage.ChunkSeriesSet
|
sets []storage.ChunkSeriesSet
|
||||||
|
symbolsSets []storage.ChunkSeriesSet // series sets used for finding symbols. Only used when doing sharding.
|
||||||
symbols index.StringIter
|
symbols index.StringIter
|
||||||
closers []io.Closer
|
closers []io.Closer
|
||||||
overlapping bool
|
overlapping bool
|
||||||
|
@ -808,6 +809,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
|
||||||
all = indexr.SortedPostings(all)
|
all = indexr.SortedPostings(all)
|
||||||
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
||||||
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
|
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
|
||||||
|
|
||||||
|
// To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy.
|
||||||
|
// Postings can only be iterated once.
|
||||||
|
k, v = index.AllPostingsKey()
|
||||||
|
all, err = indexr.Postings(k, v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
all = indexr.SortedPostings(all)
|
||||||
|
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
||||||
|
symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
|
||||||
|
|
||||||
syms := indexr.Symbols()
|
syms := indexr.Symbols()
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
symbols = syms
|
symbols = syms
|
||||||
|
@ -816,15 +829,19 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
|
||||||
symbols = NewMergedStringIter(symbols, syms)
|
symbols = NewMergedStringIter(symbols, syms)
|
||||||
}
|
}
|
||||||
|
|
||||||
for symbols.Next() {
|
if len(outBlocks) == 1 {
|
||||||
for _, ob := range outBlocks {
|
for symbols.Next() {
|
||||||
if err := ob.indexw.AddSymbol(symbols.At()); err != nil {
|
if err := outBlocks[0].indexw.AddSymbol(symbols.At()); err != nil {
|
||||||
return errors.Wrap(err, "add symbol")
|
return errors.Wrap(err, "add symbol")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if symbols.Err() != nil {
|
||||||
if symbols.Err() != nil {
|
return errors.Wrap(symbols.Err(), "next symbol")
|
||||||
return errors.Wrap(symbols.Err(), "next symbol")
|
}
|
||||||
|
} else {
|
||||||
|
if err := c.populateSymbols(symbolsSets, outBlocks); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -895,3 +912,84 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// populateSymbols writes symbols to output blocks. We need to iterate through all series to find
|
||||||
|
// which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to
|
||||||
|
// block's index.
|
||||||
|
func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlocks []shardedBlock) error {
|
||||||
|
if len(outBlocks) == 0 {
|
||||||
|
return errors.New("no output block")
|
||||||
|
}
|
||||||
|
|
||||||
|
batchers := make([]*symbolsBatcher, len(outBlocks))
|
||||||
|
for ix := range outBlocks {
|
||||||
|
batchers[ix] = newSymbolsBatcher(10000, outBlocks[ix].tmpDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
seriesSet := sets[0]
|
||||||
|
if len(sets) > 1 {
|
||||||
|
seriesSet = storage.NewMergeChunkSeriesSet(sets, c.mergeFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
for seriesSet.Next() {
|
||||||
|
if err := c.ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s := seriesSet.At()
|
||||||
|
|
||||||
|
obIx := s.Labels().Hash() % uint64(len(outBlocks))
|
||||||
|
|
||||||
|
for _, l := range s.Labels() {
|
||||||
|
if err := batchers[obIx].addSymbol(l.Name); err != nil {
|
||||||
|
return errors.Wrap(err, "addSymbol to batcher")
|
||||||
|
}
|
||||||
|
if err := batchers[obIx].addSymbol(l.Value); err != nil {
|
||||||
|
return errors.Wrap(err, "addSymbol to batcher")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for ix := range outBlocks {
|
||||||
|
if err := c.ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush the batcher to write remaining symbols.
|
||||||
|
if err := batchers[ix].flushSymbols(true); err != nil {
|
||||||
|
return errors.Wrap(err, "flushing batcher")
|
||||||
|
}
|
||||||
|
|
||||||
|
it, err := newSymbolsIterator(batchers[ix].symbolFiles())
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "opening symbols iterator")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each symbols iterator must be closed to close underlying files.
|
||||||
|
closeIt := it
|
||||||
|
defer func() {
|
||||||
|
if closeIt != nil {
|
||||||
|
_ = closeIt.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var sym string
|
||||||
|
for sym, err = it.NextSymbol(); err == nil; sym, err = it.NextSymbol() {
|
||||||
|
err = outBlocks[ix].indexw.AddSymbol(sym)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "AddSymbol")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != io.EOF {
|
||||||
|
return errors.Wrap(err, "iterating symbols")
|
||||||
|
}
|
||||||
|
|
||||||
|
// if err == io.EOF, we have iterated through all symbols. We can close underlying
|
||||||
|
// files now.
|
||||||
|
closeIt = nil
|
||||||
|
_ = it.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -559,6 +559,9 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||||
require.Equal(t, ts, blockID.Time())
|
require.Equal(t, ts, blockID.Time())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Symbols found in series.
|
||||||
|
seriesSymbols := map[string]struct{}{}
|
||||||
|
|
||||||
block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil)
|
block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -585,8 +588,34 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||||
require.NoError(t, idxr.Series(ref, &lbls, nil))
|
require.NoError(t, idxr.Series(ref, &lbls, nil))
|
||||||
|
|
||||||
require.Equal(t, uint64(shardIndex), lbls.Hash()%shardCount)
|
require.Equal(t, uint64(shardIndex), lbls.Hash()%shardCount)
|
||||||
|
|
||||||
|
// Collect all symbols used by series.
|
||||||
|
for _, l := range lbls {
|
||||||
|
seriesSymbols[l.Name] = struct{}{}
|
||||||
|
seriesSymbols[l.Value] = struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
require.NoError(t, p.Err())
|
require.NoError(t, p.Err())
|
||||||
|
|
||||||
|
// Check that all symbols in symbols table are actually used by series.
|
||||||
|
symIt := idxr.Symbols()
|
||||||
|
for symIt.Next() {
|
||||||
|
w := symIt.At()
|
||||||
|
|
||||||
|
// When shardCount == 1, we're not doing symbols splitting. Head-compacted blocks
|
||||||
|
// however do have empty string as a symbol in the table.
|
||||||
|
// Since label name or value cannot be empty string, we will never find it in seriesSymbols.
|
||||||
|
if w == "" && shardCount <= 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok := seriesSymbols[w]
|
||||||
|
require.True(t, ok, "not found in series: '%s'", w)
|
||||||
|
delete(seriesSymbols, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that symbols table covered all symbols found from series.
|
||||||
|
require.Equal(t, 0, len(seriesSymbols))
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, uint64(series), totalSeries)
|
require.Equal(t, uint64(series), totalSeries)
|
||||||
|
|
269
tsdb/symbols_batch.go
Normal file
269
tsdb/symbols_batch.go
Normal file
|
@ -0,0 +1,269 @@
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/heap"
|
||||||
|
"encoding/gob"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/golang/snappy"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/tsdb/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// symbolsBatcher keeps buffer of symbols in memory. Once the buffer reaches the size limit (number of symbols),
|
||||||
|
// batcher writes currently buffered symbols to file. At the end remaining symbols must be flushed. After writing
|
||||||
|
// all batches, symbolsBatcher has list of files that can be used together with newSymbolsIterator to iterate
|
||||||
|
// through all previously added symbols in sorted order.
|
||||||
|
type symbolsBatcher struct {
|
||||||
|
dir string
|
||||||
|
limit int
|
||||||
|
|
||||||
|
buffer map[string]struct{} // using map to deduplicate
|
||||||
|
symbolsFiles []string // paths of symbol files that have been successfully written.
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSymbolsBatcher(limit int, dir string) *symbolsBatcher {
|
||||||
|
return &symbolsBatcher{
|
||||||
|
limit: limit,
|
||||||
|
dir: dir,
|
||||||
|
buffer: make(map[string]struct{}, limit),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *symbolsBatcher) addSymbol(sym string) error {
|
||||||
|
sw.buffer[sym] = struct{}{}
|
||||||
|
return sw.flushSymbols(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *symbolsBatcher) flushSymbols(force bool) error {
|
||||||
|
if !force && len(sw.buffer) < sw.limit {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sortedSymbols := make([]string, 0, len(sw.buffer))
|
||||||
|
for s := range sw.buffer {
|
||||||
|
sortedSymbols = append(sortedSymbols, s)
|
||||||
|
}
|
||||||
|
sort.Strings(sortedSymbols)
|
||||||
|
|
||||||
|
symbolsFile := filepath.Join(sw.dir, fmt.Sprintf("symbols_%d", len(sw.symbolsFiles)))
|
||||||
|
err := writeSymbolsToFile(symbolsFile, sortedSymbols)
|
||||||
|
if err == nil {
|
||||||
|
sw.buffer = make(map[string]struct{}, sw.limit)
|
||||||
|
sw.symbolsFiles = append(sw.symbolsFiles, symbolsFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *symbolsBatcher) symbolFiles() []string {
|
||||||
|
return sw.symbolsFiles
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeSymbolsToFile(filename string, symbols []string) error {
|
||||||
|
f, err := os.Create(filename)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snappy is used to for buffering and smaller files.
|
||||||
|
sn := snappy.NewBufferedWriter(f)
|
||||||
|
enc := gob.NewEncoder(sn)
|
||||||
|
|
||||||
|
errs := errors.NewMulti()
|
||||||
|
|
||||||
|
for _, s := range symbols {
|
||||||
|
err := enc.Encode(s)
|
||||||
|
if err != nil {
|
||||||
|
errs.Add(err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
errs.Add(sn.Close())
|
||||||
|
errs.Add(f.Close())
|
||||||
|
return errs.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
type symbolsHeap []*symbolsFile
|
||||||
|
|
||||||
|
func (s *symbolsHeap) Len() int {
|
||||||
|
return len(*s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *symbolsHeap) Less(i, j int) bool {
|
||||||
|
iw, ierr := (*s)[i].Peek()
|
||||||
|
if ierr != nil {
|
||||||
|
// empty string will be sorted first, so error will be returned before any other result.
|
||||||
|
iw = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
jw, jerr := (*s)[j].Peek()
|
||||||
|
if jerr != nil {
|
||||||
|
jw = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return iw < jw
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *symbolsHeap) Swap(i, j int) {
|
||||||
|
(*s)[i], (*s)[j] = (*s)[j], (*s)[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *symbolsHeap) Push(x interface{}) {
|
||||||
|
if f, ok := x.(*symbolsFile); ok {
|
||||||
|
*s = append(*s, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *symbolsHeap) Pop() interface{} {
|
||||||
|
l := len(*s)
|
||||||
|
res := (*s)[l-1]
|
||||||
|
*s = (*s)[:l-1]
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
type symbolsIterator struct {
|
||||||
|
files []*os.File
|
||||||
|
heap symbolsHeap
|
||||||
|
|
||||||
|
// To avoid returning duplicates, we remember last returned symbol. We want to support "" as a valid
|
||||||
|
// symbol, so we use pointer to a string instead.
|
||||||
|
lastReturned *string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSymbolsIterator(filenames []string) (*symbolsIterator, error) {
|
||||||
|
files, err := openFiles(filenames)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var symFiles []*symbolsFile
|
||||||
|
for _, f := range files {
|
||||||
|
symFiles = append(symFiles, newSymbolsFile(f))
|
||||||
|
}
|
||||||
|
|
||||||
|
h := &symbolsIterator{
|
||||||
|
files: files,
|
||||||
|
heap: symFiles,
|
||||||
|
}
|
||||||
|
|
||||||
|
heap.Init(&h.heap)
|
||||||
|
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextSymbol advances iterator forward, and returns next symbol.
|
||||||
|
// If there is no next element, returns err == io.EOF.
|
||||||
|
func (sit *symbolsIterator) NextSymbol() (string, error) {
|
||||||
|
again:
|
||||||
|
if len(sit.heap) == 0 {
|
||||||
|
return "", io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := sit.heap[0].Next()
|
||||||
|
if err == io.EOF {
|
||||||
|
// End of file, remove it, and try next file.
|
||||||
|
heap.Remove(&sit.heap, 0)
|
||||||
|
goto again
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
heap.Fix(&sit.heap, 0)
|
||||||
|
|
||||||
|
if sit.lastReturned == nil || *sit.lastReturned != result {
|
||||||
|
sit.lastReturned = &result
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Duplicate symbol, try next one.
|
||||||
|
goto again
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close all files.
|
||||||
|
func (sit *symbolsIterator) Close() error {
|
||||||
|
errs := errors.NewMulti()
|
||||||
|
for _, f := range sit.files {
|
||||||
|
errs.Add(f.Close())
|
||||||
|
}
|
||||||
|
return errs.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
type symbolsFile struct {
|
||||||
|
dec *gob.Decoder
|
||||||
|
|
||||||
|
nextValid bool // if true, nextSymbol and nextErr have the next symbol (possibly "")
|
||||||
|
nextSymbol string
|
||||||
|
nextErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSymbolsFile(f *os.File) *symbolsFile {
|
||||||
|
sn := snappy.NewReader(f)
|
||||||
|
dec := gob.NewDecoder(sn)
|
||||||
|
|
||||||
|
return &symbolsFile{
|
||||||
|
dec: dec,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Peek returns next symbol or error, but also preserves them for subsequent Peek or Next calls.
|
||||||
|
func (sf *symbolsFile) Peek() (string, error) {
|
||||||
|
if sf.nextValid {
|
||||||
|
return sf.nextSymbol, sf.nextErr
|
||||||
|
}
|
||||||
|
|
||||||
|
sf.nextValid = true
|
||||||
|
sf.nextSymbol, sf.nextErr = sf.readNext()
|
||||||
|
return sf.nextSymbol, sf.nextErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next advances iterator and returns next symbol or error.
|
||||||
|
func (sf *symbolsFile) Next() (string, error) {
|
||||||
|
if sf.nextValid {
|
||||||
|
defer func() {
|
||||||
|
sf.nextValid = false
|
||||||
|
sf.nextSymbol = ""
|
||||||
|
sf.nextErr = nil
|
||||||
|
}()
|
||||||
|
return sf.nextSymbol, sf.nextErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return sf.readNext()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sf *symbolsFile) readNext() (string, error) {
|
||||||
|
var s string
|
||||||
|
err := sf.dec.Decode(&s)
|
||||||
|
// Decode returns io.EOF at the end.
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func openFiles(filenames []string) ([]*os.File, error) {
|
||||||
|
var result []*os.File
|
||||||
|
|
||||||
|
for _, fn := range filenames {
|
||||||
|
f, err := os.Open(fn)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// Close opened files so far.
|
||||||
|
for _, sf := range result {
|
||||||
|
_ = sf.Close()
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, f)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
53
tsdb/symbols_batch_test.go
Normal file
53
tsdb/symbols_batch_test.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSymbolsBatchAndIteration(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
b := newSymbolsBatcher(100, dir)
|
||||||
|
|
||||||
|
allWords := map[string]struct{}{}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
require.NoError(t, b.addSymbol(""))
|
||||||
|
allWords[""] = struct{}{}
|
||||||
|
|
||||||
|
for j := 0; j < 123; j++ {
|
||||||
|
w := fmt.Sprintf("word_%d_%d", i%3, j)
|
||||||
|
|
||||||
|
require.NoError(t, b.addSymbol(w))
|
||||||
|
|
||||||
|
allWords[w] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, b.flushSymbols(true))
|
||||||
|
|
||||||
|
it, err := newSymbolsIterator(b.symbolFiles())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
first := true
|
||||||
|
var w, prev string
|
||||||
|
for w, err = it.NextSymbol(); err == nil; w, err = it.NextSymbol() {
|
||||||
|
if !first {
|
||||||
|
require.True(t, w != "")
|
||||||
|
require.True(t, prev < w)
|
||||||
|
}
|
||||||
|
|
||||||
|
first = false
|
||||||
|
|
||||||
|
_, known := allWords[w]
|
||||||
|
require.True(t, known)
|
||||||
|
delete(allWords, w)
|
||||||
|
prev = w
|
||||||
|
}
|
||||||
|
require.Equal(t, io.EOF, err)
|
||||||
|
require.Equal(t, 0, len(allWords))
|
||||||
|
}
|
Loading…
Reference in a new issue