mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 14:57:40 -08:00
Merge pull request #141 from prometheus/dectmpstr
Fixup allocation regression during compaction
This commit is contained in:
commit
30d29b889c
13
compact.go
13
compact.go
|
@ -17,7 +17,6 @@ import (
|
|||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
|
@ -365,10 +364,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
|||
}
|
||||
c.metrics.ran.Inc()
|
||||
c.metrics.duration.Observe(time.Since(t).Seconds())
|
||||
|
||||
// We might have done quite a few allocs. Enforce a GC so they do not accumulate
|
||||
// with subsequent compactions or head GCs.
|
||||
runtime.GC()
|
||||
}(time.Now())
|
||||
|
||||
dir := filepath.Join(dest, meta.ULID.String())
|
||||
|
@ -570,14 +565,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
return errors.Wrap(err, "write postings")
|
||||
}
|
||||
}
|
||||
// Write a postings list containing all series.
|
||||
all := make([]uint64, i)
|
||||
for i := range all {
|
||||
all[i] = uint64(i)
|
||||
}
|
||||
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
|
||||
return errors.Wrap(err, "write 'all' postings")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
6
db.go
6
db.go
|
@ -21,6 +21,7 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -349,9 +350,12 @@ func (db *DB) compact() (changes bool, err error) {
|
|||
}
|
||||
changes = true
|
||||
|
||||
runtime.GC()
|
||||
|
||||
if err := db.reload(); err != nil {
|
||||
return changes, errors.Wrap(err, "reload blocks")
|
||||
}
|
||||
runtime.GC()
|
||||
}
|
||||
|
||||
// Check for compactions of multiple blocks.
|
||||
|
@ -380,10 +384,12 @@ func (db *DB) compact() (changes bool, err error) {
|
|||
return changes, errors.Wrap(err, "delete compacted block")
|
||||
}
|
||||
}
|
||||
runtime.GC()
|
||||
|
||||
if err := db.reload(); err != nil {
|
||||
return changes, errors.Wrap(err, "reload blocks")
|
||||
}
|
||||
runtime.GC()
|
||||
}
|
||||
|
||||
return changes, nil
|
||||
|
|
|
@ -77,6 +77,22 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
|
|||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||
|
||||
// uvarintTempStr decodes like uvarintStr but the returned string is
|
||||
// not safe to use if the underyling buffer changes.
|
||||
func (d *decbuf) uvarintTempStr() string {
|
||||
l := d.uvarint64()
|
||||
if d.e != nil {
|
||||
return ""
|
||||
}
|
||||
if len(d.b) < int(l) {
|
||||
d.e = errInvalidSize
|
||||
return ""
|
||||
}
|
||||
s := yoloString(d.b[:l])
|
||||
d.b = d.b[l:]
|
||||
return s
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarintStr() string {
|
||||
l := d.uvarint64()
|
||||
if d.e != nil {
|
||||
|
|
3
head.go
3
head.go
|
@ -15,7 +15,6 @@ package tsdb
|
|||
|
||||
import (
|
||||
"math"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -515,8 +514,6 @@ Outer:
|
|||
|
||||
// gc removes data before the minimum timestmap from the head.
|
||||
func (h *Head) gc() {
|
||||
defer runtime.GC()
|
||||
|
||||
// Only data strictly lower than this timestamp must be deleted.
|
||||
mint := h.MinTime()
|
||||
|
||||
|
|
10
index.go
10
index.go
|
@ -335,10 +335,6 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error {
|
|||
|
||||
for _, s := range symbols {
|
||||
w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
|
||||
|
||||
// NOTE: len(s) gives the number of runes, not the number of bytes.
|
||||
// Therefore the read-back length for strings with unicode characters will
|
||||
// be off when not using putUvarintStr.
|
||||
w.buf2.putUvarintStr(s)
|
||||
}
|
||||
|
||||
|
@ -636,7 +632,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
|||
keys := make([]string, 0, keyCount)
|
||||
|
||||
for i := 0; i < keyCount; i++ {
|
||||
keys = append(keys, d2.uvarintStr())
|
||||
keys = append(keys, d2.uvarintTempStr())
|
||||
}
|
||||
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
|
||||
|
||||
|
@ -673,7 +669,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
|||
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
||||
d := r.decbufAt(int(o))
|
||||
|
||||
s := d.uvarintStr()
|
||||
s := d.uvarintTempStr()
|
||||
if d.err() != nil {
|
||||
return "", errors.Wrapf(d.err(), "read symbol at %d", o)
|
||||
}
|
||||
|
@ -688,7 +684,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) {
|
|||
sym := make(map[string]struct{}, count)
|
||||
|
||||
for ; count > 0; count-- {
|
||||
s := d2.uvarintStr()
|
||||
s := d2.uvarintTempStr()
|
||||
sym[s] = struct{}{}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue