Refactor WAL into Head and misc improvements

This commit is contained in:
Fabian Reinartz 2017-08-30 18:34:54 +02:00
parent 8209e3ec23
commit 5cf2662074
8 changed files with 633 additions and 598 deletions

View file

@ -37,6 +37,8 @@ type DiskBlock interface {
Delete(mint, maxt int64, m ...labels.Matcher) error
Snapshot(dir string) error
Close() error
}

View file

@ -179,7 +179,7 @@ func (w *chunkWriter) cut() error {
return err
}
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
p, _, err := nextSequenceFile(w.dirFile.Name())
if err != nil {
return err
}
@ -302,7 +302,7 @@ type chunkReader struct {
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
files, err := sequenceFiles(dir, "")
files, err := sequenceFiles(dir)
if err != nil {
return nil, err
}

611
db.go
View file

@ -16,19 +16,15 @@ package tsdb
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
@ -103,7 +99,6 @@ type DB struct {
metrics *dbMetrics
opts *Options
chunkPool chunks.Pool
appendPool sync.Pool
compactor Compactor
wal WAL
@ -123,33 +118,15 @@ type DB struct {
}
type dbMetrics struct {
activeAppenders prometheus.Gauge
loadedBlocks prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
walTruncateDuration prometheus.Summary
samplesAppended prometheus.Counter
headSeries prometheus.Gauge
headSeriesCreated prometheus.Counter
headSeriesRemoved prometheus.Counter
headChunks prometheus.Gauge
headChunksCreated prometheus.Gauge
headChunksRemoved prometheus.Gauge
headGCDuration prometheus.Summary
headMinTime prometheus.GaugeFunc
headMaxTime prometheus.GaugeFunc
compactionsTriggered prometheus.Counter
}
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{}
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_active_appenders",
Help: "Number of currently active appender transactions",
})
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_blocks_loaded",
Help: "Number of currently loaded data blocks",
@ -166,57 +143,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload black data from disk.",
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
})
m.headSeries = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series",
Help: "Total number of series in the head block.",
})
m.headSeriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_created_total",
Help: "Total number of series created in the head",
})
m.headSeriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_removed_total",
Help: "Total number of series removed in the head",
})
m.headChunks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks",
Help: "Total number of chunks in the head block.",
})
m.headChunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head",
})
m.headChunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head",
})
m.headGCDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
})
m.headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_max_time",
Help: "Maximum timestamp of the head block.",
}, func() float64 {
return float64(db.head.MaxTime())
})
m.headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_min_time",
Help: "Minimum time bound of the head block.",
}, func() float64 {
return float64(db.head.MinTime())
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_samples_appended_total",
Help: "Total number of appended sampledb.",
})
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
@ -224,23 +150,9 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
if r != nil {
r.MustRegister(
m.activeAppenders,
m.loadedBlocks,
m.reloads,
m.reloadsFailed,
m.walTruncateDuration,
m.headChunks,
m.headChunksCreated,
m.headChunksRemoved,
m.headSeries,
m.headSeriesCreated,
m.headSeriesRemoved,
m.headMinTime,
m.headMaxTime,
m.headGCDuration,
m.samplesAppended,
m.compactionsTriggered,
)
}
@ -260,16 +172,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
opts = DefaultOptions
}
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second)
if err != nil {
return nil, err
}
db = &DB{
dir: dir,
logger: l,
opts: opts,
wal: wal,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
@ -312,14 +218,15 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.compactor = NewLeveledCompactor(r, l, copts)
db.head, err = NewHead(l, copts.blockRanges[0])
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second)
if err != nil {
return nil, err
}
if err := db.readWAL(db.wal.Reader()); err != nil {
db.head, err = NewHead(r, l, wal, copts.blockRanges[0])
if err != nil {
return nil, err
}
if err := db.reloadBlocks(); err != nil {
if err := db.reload(); err != nil {
return nil, err
}
@ -341,6 +248,7 @@ func (db *DB) run() {
for {
select {
case <-db.stopc:
return
case <-time.After(backoff):
}
@ -364,7 +272,9 @@ func (db *DB) run() {
}
if err1 != nil || err2 != nil {
exponential(backoff, 1*time.Second, 1*time.Minute)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else {
backoff = 0
}
case <-db.stopc:
@ -391,6 +301,30 @@ func (db *DB) retentionCutoff() (bool, error) {
return retentionCutoff(db.dir, mint)
}
// Appender opens a new appender against the database.
func (db *DB) Appender() Appender {
return dbAppender{db: db, Appender: db.head.Appender()}
}
// dbAppender wraps the DB's head appender and triggers compactions on commit
// if necessary.
type dbAppender struct {
Appender
db *DB
}
func (a dbAppender) Commit() error {
err := a.Appender.Commit()
if a.db.head.MaxTime()-a.db.head.MinTime() > a.db.head.chunkRange/2*3 {
select {
case a.db.compactc <- struct{}{}:
default:
}
}
return err
}
func (db *DB) compact() (changes bool, err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()
@ -425,7 +359,7 @@ func (db *DB) compact() (changes bool, err error) {
}
changes = true
if err := db.reloadBlocks(); err != nil {
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
}
runtime.GC()
@ -458,7 +392,7 @@ func (db *DB) compact() (changes bool, err error) {
}
}
if err := db.reloadBlocks(); err != nil {
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
}
runtime.GC()
@ -512,50 +446,7 @@ func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) {
return nil, false
}
func (db *DB) readWAL(r WALReader) error {
seriesFunc := func(series []labels.Labels) error {
for _, lset := range series {
db.head.create(lset.Hash(), lset)
db.metrics.headSeries.Inc()
db.metrics.headSeriesCreated.Inc()
}
return nil
}
samplesFunc := func(samples []RefSample) error {
for _, s := range samples {
ms, ok := db.head.series[uint32(s.Ref)]
if !ok {
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
}
_, chunkCreated := ms.append(s.T, s.V)
if chunkCreated {
db.metrics.headChunksCreated.Inc()
db.metrics.headChunks.Inc()
}
}
return nil
}
deletesFunc := func(stones []Stone) error {
for _, s := range stones {
for _, itv := range s.intervals {
db.head.tombstones.add(s.ref, itv)
}
}
return nil
}
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
return errors.Wrap(err, "consume WAL")
}
return nil
}
func (db *DB) reloadBlocks() (err error) {
func (db *DB) reload() (err error) {
defer func() {
if err != nil {
db.metrics.reloadsFailed.Inc()
@ -613,29 +504,11 @@ func (db *DB) reloadBlocks() (err error) {
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(blocks) == 0 {
return
return nil
}
maxt := blocks[len(db.blocks)-1].Meta().MaxTime
if maxt <= db.head.MinTime() {
return
}
start := time.Now()
atomic.StoreInt64(&db.head.minTime, maxt)
series, chunks := db.head.gc()
db.metrics.headSeriesRemoved.Add(float64(series))
db.metrics.headSeries.Sub(float64(series))
db.metrics.headChunksRemoved.Add(float64(chunks))
db.metrics.headChunks.Sub(float64(chunks))
db.logger.Log("msg", "head GC completed", "duration", time.Since(start))
start = time.Now()
if err := db.wal.Truncate(maxt); err != nil {
return errors.Wrapf(err, "truncate WAL at %d", maxt)
}
db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
db.head.Truncate(maxt)
return nil
}
@ -701,24 +574,28 @@ func (db *DB) EnableCompactions() {
// Snapshot writes the current data to the directory.
func (db *DB) Snapshot(dir string) error {
// if dir == db.dir {
// return errors.Errorf("cannot snapshot into base directory")
// }
// db.cmtx.Lock()
// defer db.cmtx.Unlock()
if dir == db.dir {
return errors.Errorf("cannot snapshot into base directory")
}
if _, err := ulid.Parse(dir); err == nil {
return errors.Errorf("dir must not be a valid ULID")
}
// db.mtx.Lock() // To block any appenders.
// defer db.mtx.Unlock()
db.cmtx.Lock()
defer db.cmtx.Unlock()
// blocks := db.blocks[:]
// for _, b := range blocks {
// db.logger.Log("msg", "snapshotting block", "block", b)
// if err := b.Snapshot(dir); err != nil {
// return errors.Wrap(err, "error snapshotting headblock")
// }
// }
db.mtx.RLock()
defer db.mtx.RUnlock()
return nil
for _, b := range db.blocks {
db.logger.Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock")
}
}
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
}
// Querier returns a new querier over the data partition for the given time range.
@ -741,320 +618,9 @@ func (db *DB) Querier(mint, maxt int64) Querier {
tombstones: b.Tombstones(),
})
}
return sq
}
// initAppender is a helper to initialize the time bounds of a the head
// upon the first sample it receives.
type initAppender struct {
app Appender
db *DB
}
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if a.app != nil {
return a.app.Add(lset, t, v)
}
for {
// In the init state, the head has a high timestamp of math.MinInt64.
ht := a.db.head.MaxTime()
if ht != math.MinInt64 {
break
}
cr := a.db.opts.BlockRanges[0]
mint, _ := rangeForTimestamp(t, cr)
atomic.CompareAndSwapInt64(&a.db.head.maxTime, ht, t)
atomic.StoreInt64(&a.db.head.minTime, mint-cr)
}
a.app = a.db.appender()
return a.app.Add(lset, t, v)
}
func (a *initAppender) AddFast(ref string, t int64, v float64) error {
if a.app == nil {
return ErrNotFound
}
return a.app.AddFast(ref, t, v)
}
func (a *initAppender) Commit() error {
if a.app == nil {
return nil
}
return a.app.Commit()
}
func (a *initAppender) Rollback() error {
if a.app == nil {
return nil
}
return a.app.Rollback()
}
// Appender returns a new Appender on the database.
func (db *DB) Appender() Appender {
db.metrics.activeAppenders.Inc()
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if db.head.MaxTime() == math.MinInt64 {
return &initAppender{db: db}
}
return db.appender()
}
func (db *DB) appender() *dbAppender {
db.head.mtx.RLock()
return &dbAppender{
db: db,
head: db.head,
wal: db.wal,
mint: db.head.MaxTime() - db.opts.BlockRanges[0]/2,
samples: db.getAppendBuffer(),
highTimestamp: math.MinInt64,
lowTimestamp: math.MaxInt64,
}
}
func (db *DB) getAppendBuffer() []RefSample {
b := db.appendPool.Get()
if b == nil {
return make([]RefSample, 0, 512)
}
return b.([]RefSample)
}
func (db *DB) putAppendBuffer(b []RefSample) {
db.appendPool.Put(b[:0])
}
type dbAppender struct {
db *DB
head *Head
wal WAL
mint int64
newSeries []*hashedLabels
newLabels []labels.Labels
newHashes map[uint64]uint64
samples []RefSample
highTimestamp int64
lowTimestamp int64
}
type hashedLabels struct {
ref uint64
hash uint64
labels labels.Labels
}
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if t < a.mint {
return "", ErrOutOfBounds
}
hash := lset.Hash()
refb := make([]byte, 8)
// Series exists already in the block.
if ms := a.head.get(hash, lset); ms != nil {
binary.BigEndian.PutUint64(refb, uint64(ms.ref))
return string(refb), a.AddFast(string(refb), t, v)
}
// Series was added in this transaction previously.
if ref, ok := a.newHashes[hash]; ok {
binary.BigEndian.PutUint64(refb, ref)
// XXX(fabxc): there's no fast path for multiple samples for the same new series
// in the same transaction. We always return the invalid empty ref. It's has not
// been a relevant use case so far and is not worth the trouble.
return "", a.AddFast(string(refb), t, v)
}
// The series is completely new.
if a.newSeries == nil {
a.newHashes = map[uint64]uint64{}
}
// First sample for new series.
ref := uint64(len(a.newSeries))
a.newSeries = append(a.newSeries, &hashedLabels{
ref: ref,
hash: hash,
labels: lset,
})
// First bit indicates its a series created in this transaction.
ref |= (1 << 63)
a.newHashes[hash] = ref
binary.BigEndian.PutUint64(refb, ref)
return "", a.AddFast(string(refb), t, v)
}
func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
if len(ref) != 8 {
return errors.Wrap(ErrNotFound, "invalid ref length")
}
var (
refn = binary.BigEndian.Uint64(yoloBytes(ref))
id = uint32(refn)
inTx = refn&(1<<63) != 0
)
// Distinguish between existing series and series created in
// this transaction.
if inTx {
if id > uint32(len(a.newSeries)-1) {
return errors.Wrap(ErrNotFound, "transaction series ID too high")
}
// TODO(fabxc): we also have to validate here that the
// sample sequence is valid.
// We also have to revalidate it as we switch locks and create
// the new series.
} else {
ms, ok := a.head.series[id]
if !ok {
return errors.Wrap(ErrNotFound, "unknown series")
}
if err := ms.appendable(t, v); err != nil {
return err
}
}
if t < a.mint {
return ErrOutOfBounds
}
if t > a.highTimestamp {
a.highTimestamp = t
}
// if t < a.lowTimestamp {
// a.lowTimestamp = t
// }
a.samples = append(a.samples, RefSample{
Ref: refn,
T: t,
V: v,
})
return nil
}
func (a *dbAppender) createSeries() error {
if len(a.newSeries) == 0 {
return nil
}
a.newLabels = make([]labels.Labels, 0, len(a.newSeries))
base0 := len(a.head.series)
a.head.mtx.RUnlock()
defer a.head.mtx.RLock()
a.head.mtx.Lock()
defer a.head.mtx.Unlock()
base1 := len(a.head.series)
for _, l := range a.newSeries {
// We switched locks and have to re-validate that the series were not
// created by another goroutine in the meantime.
if base1 > base0 {
if ms := a.head.get(l.hash, l.labels); ms != nil {
l.ref = uint64(ms.ref)
continue
}
}
// Series is still new.
a.newLabels = append(a.newLabels, l.labels)
s := a.head.create(l.hash, l.labels)
l.ref = uint64(s.ref)
a.db.metrics.headSeriesCreated.Inc()
a.db.metrics.headSeries.Inc()
}
// Write all new series to the WAL.
if err := a.wal.LogSeries(a.newLabels); err != nil {
return errors.Wrap(err, "WAL log series")
}
return nil
}
func (a *dbAppender) Commit() error {
defer a.head.mtx.RUnlock()
defer a.db.metrics.activeAppenders.Dec()
defer a.db.putAppendBuffer(a.samples)
if err := a.createSeries(); err != nil {
return err
}
// We have to update the refs of samples for series we just created.
for i := range a.samples {
s := &a.samples[i]
if s.Ref&(1<<63) != 0 {
s.Ref = a.newSeries[(s.Ref<<1)>>1].ref
}
}
// Write all new samples to the WAL and add them to the
// in-mem database on success.
if err := a.wal.LogSamples(a.samples); err != nil {
return errors.Wrap(err, "WAL log samples")
}
total := uint64(len(a.samples))
for _, s := range a.samples {
series, ok := a.head.series[uint32(s.Ref)]
if !ok {
return errors.Errorf("series with ID %d not found", s.Ref)
}
ok, chunkCreated := series.append(s.T, s.V)
if !ok {
total--
}
if chunkCreated {
a.db.metrics.headChunks.Inc()
a.db.metrics.headChunksCreated.Inc()
}
}
a.db.metrics.samplesAppended.Add(float64(total))
for {
ht := a.head.MaxTime()
if a.highTimestamp <= ht {
break
}
if a.highTimestamp-a.head.MinTime() > a.head.chunkRange/2*3 {
select {
case a.db.compactc <- struct{}{}:
default:
}
}
if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.highTimestamp) {
break
}
}
return nil
}
func (a *dbAppender) Rollback() error {
a.head.mtx.RUnlock()
a.db.metrics.activeAppenders.Dec()
a.db.putAppendBuffer(a.samples)
return nil
}
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
mint = (t / width) * width
return mint, mint + width
@ -1078,41 +644,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
}(b))
}
}
if err := g.Wait(); err != nil {
return err
}
ir := db.head.Index()
pr := newPostingsReader(ir)
p, absent := pr.Select(ms...)
var stones []Stone
Outer:
for p.Next() {
series := db.head.series[p.At()]
for _, abs := range absent {
if series.lset.Get(abs) != "" {
continue Outer
}
}
// Delete only until the current values and not beyond.
t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime())
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
}
if p.Err() != nil {
return p.Err()
}
if err := db.wal.LogDeletes(stones); err != nil {
return err
}
for _, s := range stones {
db.head.tombstones.add(s.ref, s.intervals[0])
}
g.Go(func() error {
return db.head.Delete(mint, maxt, ms...)
})
if err := g.Wait(); err != nil {
return err
@ -1171,7 +706,7 @@ func blockDirs(dir string) ([]string, error) {
return dirs, nil
}
func sequenceFiles(dir, prefix string) ([]string, error) {
func sequenceFiles(dir string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
@ -1179,24 +714,15 @@ func sequenceFiles(dir, prefix string) ([]string, error) {
var res []string
for _, fi := range files {
if isSequenceFile(fi, prefix) {
res = append(res, filepath.Join(dir, fi.Name()))
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue
}
res = append(res, filepath.Join(dir, fi.Name()))
}
return res, nil
}
func isSequenceFile(fi os.FileInfo, prefix string) bool {
if !strings.HasPrefix(fi.Name(), prefix) {
return false
}
if _, err := strconv.ParseUint(fi.Name()[len(prefix):], 10, 32); err != nil {
return false
}
return true
}
func nextSequenceFile(dir, prefix string) (string, int, error) {
func nextSequenceFile(dir string) (string, int, error) {
names, err := fileutil.ReadDir(dir)
if err != nil {
return "", 0, err
@ -1204,16 +730,13 @@ func nextSequenceFile(dir, prefix string) (string, int, error) {
i := uint64(0)
for _, n := range names {
if !strings.HasPrefix(n, prefix) {
continue
}
j, err := strconv.ParseUint(n[len(prefix):], 10, 32)
j, err := strconv.ParseUint(n, 10, 64)
if err != nil {
continue
}
i = j
}
return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
}
// The MultiError type implements the error interface, and contains the

View file

@ -33,9 +33,7 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
require.NoError(t, err)
// Do not close the test database by default as it will deadlock on test failures.
return db, func() {
os.RemoveAll(tmpdir)
}
return db, func() { os.RemoveAll(tmpdir) }
}
// Convert a SeriesSet into a form useable with reflect.DeepEqual.

551
head.go
View file

@ -14,14 +14,16 @@
package tsdb
import (
"fmt"
"encoding/binary"
"math"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
)
@ -47,6 +49,10 @@ var (
type Head struct {
chunkRange int64
mtx sync.RWMutex
metrics *headMetrics
wal WAL
logger log.Logger
appendPool sync.Pool
minTime, maxTime int64
lastSeriesID uint32
@ -65,9 +71,110 @@ type Head struct {
tombstones tombstoneReader
}
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.Gauge
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
chunks prometheus.Gauge
chunksCreated prometheus.Gauge
chunksRemoved prometheus.Gauge
gcDuration prometheus.Summary
minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
}
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m := &headMetrics{}
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_active_appenders",
Help: "Number of currently active appender transactions",
})
m.series = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series",
Help: "Total number of series in the head block.",
})
m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_created_total",
Help: "Total number of series created in the head",
})
m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_removed_total",
Help: "Total number of series removed in the head",
})
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks",
Help: "Total number of chunks in the head block.",
})
m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head",
})
m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head",
})
m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
})
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_max_time",
Help: "Maximum timestamp of the head block.",
}, func() float64 {
return float64(h.MaxTime())
})
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_min_time",
Help: "Minimum time bound of the head block.",
}, func() float64 {
return float64(h.MinTime())
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_head_samples_appended_total",
Help: "Total number of appended sampledb.",
})
if r != nil {
r.MustRegister(
m.activeAppenders,
m.chunks,
m.chunksCreated,
m.chunksRemoved,
m.series,
m.seriesCreated,
m.seriesRemoved,
m.minTime,
m.maxTime,
m.gcDuration,
m.walTruncateDuration,
m.samplesAppended,
)
}
return m
}
// NewHead opens the head block in dir.
func NewHead(l log.Logger, chunkRange int64) (*Head, error) {
func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) {
if l == nil {
l = log.NewNopLogger()
}
if wal == nil {
wal = NopWAL{}
}
if chunkRange < 1 {
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
}
h := &Head{
wal: wal,
logger: l,
chunkRange: chunkRange,
minTime: math.MaxInt64,
maxTime: math.MinInt64,
@ -78,15 +185,422 @@ func NewHead(l log.Logger, chunkRange int64) (*Head, error) {
postings: &memPostings{m: make(map[term][]uint32)},
tombstones: newEmptyTombstoneReader(),
}
return h, nil
h.metrics = newHeadMetrics(h, r)
return h, h.readWAL()
}
func (h *Head) readWAL() error {
r := h.wal.Reader(h.MinTime())
seriesFunc := func(series []labels.Labels) error {
for _, lset := range series {
h.create(lset.Hash(), lset)
}
return nil
}
samplesFunc := func(samples []RefSample) error {
for _, s := range samples {
ms, ok := h.series[uint32(s.Ref)]
if !ok {
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
}
_, chunkCreated := ms.append(s.T, s.V)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
}
return nil
}
deletesFunc := func(stones []Stone) error {
for _, s := range stones {
for _, itv := range s.intervals {
h.tombstones.add(s.ref, itv)
}
}
return nil
}
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
return errors.Wrap(err, "consume WAL")
}
return nil
}
func (h *Head) String() string {
return "<head>"
}
// Truncate removes all data before mint from the head block and truncates its WAL.
func (h *Head) Truncate(mint int64) {
if h.minTime >= mint {
return
}
atomic.StoreInt64(&h.minTime, mint)
start := time.Now()
h.gc()
h.logger.Log("msg", "head GC completed", "duration", time.Since(start))
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
start = time.Now()
if err := h.wal.Truncate(mint); err == nil {
h.logger.Log("msg", "WAL truncation completed", "duration", time.Since(start))
} else {
h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
}
// initTime initializes a head with the first timestamp. This only needs to be called
// for a compltely fresh head with an empty WAL.
// Returns true if the initialization took an effect.
func (h *Head) initTime(t int64) (initialized bool) {
// In the init state, the head has a high timestamp of math.MinInt64.
if h.MaxTime() != math.MinInt64 {
return false
}
mint, _ := rangeForTimestamp(t, h.chunkRange)
if !atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t) {
return false
}
atomic.StoreInt64(&h.minTime, mint-h.chunkRange)
return true
}
// initAppender is a helper to initialize the time bounds of a the head
// upon the first sample it receives.
type initAppender struct {
app Appender
head *Head
}
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if a.app != nil {
return a.app.Add(lset, t, v)
}
if a.head.initTime(t) {
a.app = a.head.appender()
}
return a.app.Add(lset, t, v)
}
func (a *initAppender) AddFast(ref string, t int64, v float64) error {
if a.app == nil {
return ErrNotFound
}
return a.app.AddFast(ref, t, v)
}
func (a *initAppender) Commit() error {
if a.app == nil {
return nil
}
return a.app.Commit()
}
func (a *initAppender) Rollback() error {
if a.app == nil {
return nil
}
return a.app.Rollback()
}
// Appender returns a new Appender on the database.
func (h *Head) Appender() Appender {
h.metrics.activeAppenders.Inc()
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MaxTime() == math.MinInt64 {
return &initAppender{head: h}
}
return h.appender()
}
func (h *Head) appender() *headAppender {
h.mtx.RLock()
return &headAppender{
head: h,
mint: h.MaxTime() - h.chunkRange/2,
samples: h.getAppendBuffer(),
highTimestamp: math.MinInt64,
}
}
func (h *Head) getAppendBuffer() []RefSample {
b := h.appendPool.Get()
if b == nil {
return make([]RefSample, 0, 512)
}
return b.([]RefSample)
}
func (h *Head) putAppendBuffer(b []RefSample) {
h.appendPool.Put(b[:0])
}
type headAppender struct {
head *Head
mint int64
newSeries []*hashedLabels
newLabels []labels.Labels
newHashes map[uint64]uint64
samples []RefSample
highTimestamp int64
}
type hashedLabels struct {
ref uint64
hash uint64
labels labels.Labels
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if t < a.mint {
return "", ErrOutOfBounds
}
hash := lset.Hash()
refb := make([]byte, 8)
// Series exists already in the block.
if ms := a.head.get(hash, lset); ms != nil {
binary.BigEndian.PutUint64(refb, uint64(ms.ref))
return string(refb), a.AddFast(string(refb), t, v)
}
// Series was added in this transaction previously.
if ref, ok := a.newHashes[hash]; ok {
binary.BigEndian.PutUint64(refb, ref)
// XXX(fabxc): there's no fast path for multiple samples for the same new series
// in the same transaction. We always return the invalid empty ref. It's has not
// been a relevant use case so far and is not worth the trouble.
return "", a.AddFast(string(refb), t, v)
}
// The series is completely new.
if a.newSeries == nil {
a.newHashes = map[uint64]uint64{}
}
// First sample for new series.
ref := uint64(len(a.newSeries))
a.newSeries = append(a.newSeries, &hashedLabels{
ref: ref,
hash: hash,
labels: lset,
})
// First bit indicates its a series created in this transaction.
ref |= (1 << 63)
a.newHashes[hash] = ref
binary.BigEndian.PutUint64(refb, ref)
return "", a.AddFast(string(refb), t, v)
}
func (a *headAppender) AddFast(ref string, t int64, v float64) error {
if len(ref) != 8 {
return errors.Wrap(ErrNotFound, "invalid ref length")
}
var (
refn = binary.BigEndian.Uint64(yoloBytes(ref))
id = uint32(refn)
inTx = refn&(1<<63) != 0
)
// Distinguish between existing series and series created in
// this transaction.
if inTx {
if id > uint32(len(a.newSeries)-1) {
return errors.Wrap(ErrNotFound, "transaction series ID too high")
}
// TODO(fabxc): we also have to validate here that the
// sample sequence is valid.
// We also have to revalidate it as we switch locks and create
// the new series.
} else {
ms, ok := a.head.series[id]
if !ok {
return errors.Wrap(ErrNotFound, "unknown series")
}
if err := ms.appendable(t, v); err != nil {
return err
}
}
if t < a.mint {
return ErrOutOfBounds
}
if t > a.highTimestamp {
a.highTimestamp = t
}
a.samples = append(a.samples, RefSample{
Ref: refn,
T: t,
V: v,
})
return nil
}
func (a *headAppender) createSeries() error {
if len(a.newSeries) == 0 {
return nil
}
a.newLabels = make([]labels.Labels, 0, len(a.newSeries))
base0 := len(a.head.series)
a.head.mtx.RUnlock()
defer a.head.mtx.RLock()
a.head.mtx.Lock()
defer a.head.mtx.Unlock()
base1 := len(a.head.series)
for _, l := range a.newSeries {
// We switched locks and have to re-validate that the series were not
// created by another goroutine in the meantime.
if base1 > base0 {
if ms := a.head.get(l.hash, l.labels); ms != nil {
l.ref = uint64(ms.ref)
continue
}
}
// Series is still new.
a.newLabels = append(a.newLabels, l.labels)
s := a.head.create(l.hash, l.labels)
l.ref = uint64(s.ref)
}
// Write all new series to the WAL.
if err := a.head.wal.LogSeries(a.newLabels); err != nil {
return errors.Wrap(err, "WAL log series")
}
return nil
}
func (a *headAppender) Commit() error {
defer a.head.mtx.RUnlock()
defer a.head.metrics.activeAppenders.Dec()
defer a.head.putAppendBuffer(a.samples)
if err := a.createSeries(); err != nil {
return err
}
// We have to update the refs of samples for series we just created.
for i := range a.samples {
s := &a.samples[i]
if s.Ref&(1<<63) != 0 {
s.Ref = a.newSeries[(s.Ref<<1)>>1].ref
}
}
// Write all new samples to the WAL and add them to the
// in-mem database on success.
if err := a.head.wal.LogSamples(a.samples); err != nil {
return errors.Wrap(err, "WAL log samples")
}
total := uint64(len(a.samples))
for _, s := range a.samples {
series, ok := a.head.series[uint32(s.Ref)]
if !ok {
return errors.Errorf("series with ID %d not found", s.Ref)
}
ok, chunkCreated := series.append(s.T, s.V)
if !ok {
total--
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
}
a.head.metrics.samplesAppended.Add(float64(total))
for {
ht := a.head.MaxTime()
if a.highTimestamp <= ht {
break
}
if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.highTimestamp) {
break
}
}
return nil
}
func (a *headAppender) Rollback() error {
a.head.mtx.RUnlock()
a.head.metrics.activeAppenders.Dec()
a.head.putAppendBuffer(a.samples)
return nil
}
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
// label matchers.
func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
// Do not delete anything beyond the currently valid range.
mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime())
ir := h.indexRange(mint, maxt)
pr := newPostingsReader(ir)
p, absent := pr.Select(ms...)
var stones []Stone
Outer:
for p.Next() {
series := h.series[p.At()]
for _, abs := range absent {
if series.lset.Get(abs) != "" {
continue Outer
}
}
// Delete only until the current values and not beyond.
t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime())
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
}
if p.Err() != nil {
return p.Err()
}
if err := h.wal.LogDeletes(stones); err != nil {
return err
}
for _, s := range stones {
h.tombstones.add(s.ref, s.intervals[0])
}
return nil
}
// gc removes data before the minimum timestmap from the head.
func (h *Head) gc() (seriesRemoved, chunksRemoved int) {
func (h *Head) gc() {
var (
seriesRemoved int
chunksRemoved int
)
// Only data strictly lower than this timestamp must be deleted.
mint := h.MinTime()
@ -102,7 +616,6 @@ func (h *Head) gc() (seriesRemoved, chunksRemoved int) {
if len(s.chunks) == 0 {
deletedHashes[hash] = append(deletedHashes[hash], s.ref)
}
s.mtx.Unlock()
}
}
@ -142,12 +655,12 @@ func (h *Head) gc() (seriesRemoved, chunksRemoved int) {
continue
}
delete(h.series, s.ref)
seriesRemoved++
}
if len(rem) > 0 {
h.hashes[hash] = rem
} else {
delete(h.hashes, hash)
seriesRemoved++
}
}
@ -185,7 +698,10 @@ func (h *Head) gc() (seriesRemoved, chunksRemoved int) {
h.symbols = symbols
h.values = values
return seriesRemoved, chunksRemoved
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
h.metrics.series.Sub(float64(seriesRemoved))
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved))
}
func (h *Head) Tombstones() TombstoneReader {
@ -421,6 +937,9 @@ func (h *Head) get(hash uint64, lset labels.Labels) *memSeries {
}
func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
h.metrics.series.Inc()
h.metrics.seriesCreated.Inc()
id := atomic.AddUint32(&h.lastSeriesID, 1)
s := newMemSeries(lset, id, h.chunkRange)
@ -524,12 +1043,7 @@ func (s *memSeries) appendable(t int64, v float64) error {
}
func (s *memSeries) chunk(id int) *memChunk {
ix := id - s.firstChunkID
if ix >= len(s.chunks) || ix < 0 {
fmt.Println("get chunk", id, len(s.chunks), s.firstChunkID)
}
return s.chunks[ix]
return s.chunks[id-s.firstChunkID]
}
func (s *memSeries) chunkID(pos int) int {
@ -571,6 +1085,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
}
if c.samples > samplesPerChunk/4 && t >= s.nextAt {
c = s.cut(t)
chunkCreated = true
}
s.app.Append(t, v)
@ -603,13 +1118,15 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/a
}
func (s *memSeries) iterator(i int) chunks.Iterator {
c := s.chunk(i)
func (s *memSeries) iterator(id int) chunks.Iterator {
c := s.chunk(id)
if i < len(s.chunks)-1 {
// TODO(fabxc): !!! Test this and everything around chunk ID != list pos.
if id-s.firstChunkID < len(s.chunks)-1 {
return c.chunk.Iterator()
}
// Serve the last 4 samples for the last chunk from the series buffer
// as their compressed bytes may be mutated by added samples.
it := &memSafeIterator{
Iterator: c.chunk.Iterator(),
i: -1,

View file

@ -28,15 +28,10 @@ import (
)
func BenchmarkCreateSeries(b *testing.B) {
lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6)
lbls, err := readPrometheusLabels("testdata/all.series", b.N)
require.NoError(b, err)
b.Run("", func(b *testing.B) {
dir, err := ioutil.TempDir("", "create_series_bench")
require.NoError(b, err)
defer os.RemoveAll(dir)
h, err := NewHead(nil, nil, 10000)
h, err := NewHead(nil, nil, nil, 10000)
if err != nil {
require.NoError(b, err)
}
@ -44,10 +39,9 @@ func BenchmarkCreateSeries(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for _, l := range lbls[:b.N] {
for _, l := range lbls {
h.create(l.Hash(), l)
}
})
}
func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) {

11
wal.go
View file

@ -80,7 +80,7 @@ type SegmentWAL struct {
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
Reader() WALReader
Reader(mint int64) WALReader
LogSeries([]labels.Labels) error
LogSamples([]RefSample) error
LogDeletes([]Stone) error
@ -91,10 +91,11 @@ type WAL interface {
type NopWAL struct{}
func (NopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil }
func (w NopWAL) Reader() WALReader { return w }
func (w NopWAL) Reader(int64) WALReader { return w }
func (NopWAL) LogSeries([]labels.Labels) error { return nil }
func (NopWAL) LogSamples([]RefSample) error { return nil }
func (NopWAL) LogDeletes([]Stone) error { return nil }
func (NopWAL) Truncate(int64) error { return nil }
func (NopWAL) Close() error { return nil }
// WALReader reads entries from a WAL.
@ -162,7 +163,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration)
// Reader returns a new reader over the the write ahead log data.
// It must be completely consumed before writing to the WAL.
func (w *SegmentWAL) Reader() WALReader {
func (w *SegmentWAL) Reader(int64) WALReader {
return newWALReader(w, w.logger)
}
@ -210,7 +211,7 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error {
// initSegments finds all existing segment files and opens them in the
// appropriate file modes.
func (w *SegmentWAL) initSegments() error {
fns, err := sequenceFiles(w.dirFile.Name(), "")
fns, err := sequenceFiles(w.dirFile.Name())
if err != nil {
return err
}
@ -268,7 +269,7 @@ func (w *SegmentWAL) cut() error {
}
}
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
p, _, err := nextSequenceFile(w.dirFile.Name())
if err != nil {
return err
}

View file

@ -162,7 +162,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
// Set smaller segment size so we can actually write several files.
w.segmentSize = 1000 * 1000
r := w.Reader()
r := w.Reader(0)
var (
resultSeries [][]labels.Labels
@ -340,7 +340,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
w2, err := OpenSegmentWAL(dir, logger, 0)
require.NoError(t, err)
r := w2.Reader()
r := w2.Reader(0)
serf := func(l []labels.Labels) error {
require.Equal(t, 0, len(l))
return nil
@ -370,7 +370,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
w3, err := OpenSegmentWAL(dir, logger, 0)
require.NoError(t, err)
r = w3.Reader()
r = w3.Reader(0)
i = 0
require.NoError(t, r.Read(serf, samplf, delf))