Write to WAL before appending to memory storage

This commit is contained in:
Fabian Reinartz 2017-01-17 16:33:58 +01:00
parent 343dd9d94c
commit 5ceca3c810

52
head.go
View file

@ -15,6 +15,23 @@ import (
"github.com/go-kit/kit/log"
)
var (
// ErrNotFound is returned if a looked up resource was not found.
ErrNotFound = fmt.Errorf("not found")
// ErrOutOfOrderSample is returned if an appended sample has a
// timestamp larger than the most recent sample.
ErrOutOfOrderSample = errors.New("out of order sample")
// ErrAmendSample is returned if an appended sample has the same timestamp
// as the most recent sample but a different value.
ErrAmendSample = errors.New("amending sample")
// ErrOutOfBounds is returned if an appended sample is out of the
// writable time range.
ErrOutOfBounds = errors.New("out of bounds")
)
// headBlock handles reads and writes of time series data within a time window.
type headBlock struct {
mtx sync.RWMutex
@ -40,7 +57,7 @@ type headBlock struct {
// openHeadBlock creates a new empty head block.
func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 15*time.Second)
wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 5*time.Second)
if err != nil {
return nil, err
}
@ -67,6 +84,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
sample: func(s refdSample) {
b.series[s.ref].append(s.t, s.v)
if s.t < b.stats.MinTime {
b.stats.MinTime = s.t
}
if s.t > b.stats.MaxTime {
b.stats.MaxTime = s.t
}
@ -157,7 +177,9 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error
// intermediate reference only valid for this batch.
// It is indicated by the the LSB of the 4th byte being set to 1.
// We use a random ID to avoid collisions when new series are created
// in two subsequent batches. (TODO(fabxc): safe enough?)
// in two subsequent batches.
// TODO(fabxc): Provide method for client to determine whether a ref
// is valid beyond the current transaction.
ref := uint64(rand.Int31()) | (1 << 32)
if a.newSeries == nil {
@ -260,19 +282,22 @@ func (a *headAppender) Commit() error {
if s.ref&(1<<32) > 0 {
s.ref = a.refmap[s.ref]
}
if !a.series[s.ref].append(s.t, s.v) {
total--
}
}
a.mtx.RUnlock()
// Write all new series and samples to the WAL and add it to the
// in-mem database on success.
if err := a.wal.Log(a.newLabels, a.samples); err != nil {
return err
}
for _, s := range a.samples {
if !a.series[s.ref].append(s.t, s.v) {
total--
}
}
a.mtx.RUnlock()
a.stats.mtx.Lock()
defer a.stats.mtx.Unlock()
@ -441,19 +466,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
return s
}
var (
ErrNotFound = fmt.Errorf("not found")
// ErrOutOfOrderSample is returned if an appended sample has a
// timestamp larger than the most recent sample.
ErrOutOfOrderSample = errors.New("out of order sample")
// ErrAmendSample is returned if an appended sample has the same timestamp
// as the most recent sample but a different value.
ErrAmendSample = errors.New("amending sample")
ErrOutOfBounds = errors.New("out of bounds")
)
func (h *headBlock) fullness() float64 {
h.stats.mtx.RLock()
defer h.stats.mtx.RUnlock()