From 5ceca3c81089a738a8c3000ec702b95f1dc1215a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 17 Jan 2017 16:33:58 +0100 Subject: [PATCH] Write to WAL before appending to memory storage --- head.go | 52 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/head.go b/head.go index 5e98ef1b7..42fdc414d 100644 --- a/head.go +++ b/head.go @@ -15,6 +15,23 @@ import ( "github.com/go-kit/kit/log" ) +var ( + // ErrNotFound is returned if a looked up resource was not found. + ErrNotFound = fmt.Errorf("not found") + + // ErrOutOfOrderSample is returned if an appended sample has a + // timestamp larger than the most recent sample. + ErrOutOfOrderSample = errors.New("out of order sample") + + // ErrAmendSample is returned if an appended sample has the same timestamp + // as the most recent sample but a different value. + ErrAmendSample = errors.New("amending sample") + + // ErrOutOfBounds is returned if an appended sample is out of the + // writable time range. + ErrOutOfBounds = errors.New("out of bounds") +) + // headBlock handles reads and writes of time series data within a time window. type headBlock struct { mtx sync.RWMutex @@ -40,7 +57,7 @@ type headBlock struct { // openHeadBlock creates a new empty head block. func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { - wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 15*time.Second) + wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 5*time.Second) if err != nil { return nil, err } @@ -67,6 +84,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { sample: func(s refdSample) { b.series[s.ref].append(s.t, s.v) + if s.t < b.stats.MinTime { + b.stats.MinTime = s.t + } if s.t > b.stats.MaxTime { b.stats.MaxTime = s.t } @@ -157,7 +177,9 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error // intermediate reference only valid for this batch. // It is indicated by the the LSB of the 4th byte being set to 1. // We use a random ID to avoid collisions when new series are created - // in two subsequent batches. (TODO(fabxc): safe enough?) + // in two subsequent batches. + // TODO(fabxc): Provide method for client to determine whether a ref + // is valid beyond the current transaction. ref := uint64(rand.Int31()) | (1 << 32) if a.newSeries == nil { @@ -260,19 +282,22 @@ func (a *headAppender) Commit() error { if s.ref&(1<<32) > 0 { s.ref = a.refmap[s.ref] } - if !a.series[s.ref].append(s.t, s.v) { - total-- - } } - a.mtx.RUnlock() - // Write all new series and samples to the WAL and add it to the // in-mem database on success. if err := a.wal.Log(a.newLabels, a.samples); err != nil { return err } + for _, s := range a.samples { + if !a.series[s.ref].append(s.t, s.v) { + total-- + } + } + + a.mtx.RUnlock() + a.stats.mtx.Lock() defer a.stats.mtx.Unlock() @@ -441,19 +466,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { return s } -var ( - ErrNotFound = fmt.Errorf("not found") - // ErrOutOfOrderSample is returned if an appended sample has a - // timestamp larger than the most recent sample. - ErrOutOfOrderSample = errors.New("out of order sample") - - // ErrAmendSample is returned if an appended sample has the same timestamp - // as the most recent sample but a different value. - ErrAmendSample = errors.New("amending sample") - - ErrOutOfBounds = errors.New("out of bounds") -) - func (h *headBlock) fullness() float64 { h.stats.mtx.RLock() defer h.stats.mtx.RUnlock()