diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 26c9658c49..60f39e5d56 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -26,6 +26,7 @@ import ( "time" "unsafe" + "github.com/go-kit/kit/log" "github.com/pkg/errors" promlabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" @@ -88,7 +89,10 @@ func (b *writeBenchmark) run() { dir := filepath.Join(b.outPath, "storage") - st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ + l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + + st, err := tsdb.Open(dir, l, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3), diff --git a/head.go b/head.go index 40a4eb5b92..a5ce94e455 100644 --- a/head.go +++ b/head.go @@ -185,13 +185,18 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( return h, nil } +// ReadWAL initializes the head by consuming the write ahead log. func (h *Head) ReadWAL() error { r := h.wal.Reader() mint := h.MinTime() seriesFunc := func(series []RefSeries) error { for _, s := range series { - h.create(s.Labels.Hash(), s.Labels) + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref + } } return nil } @@ -202,7 +207,8 @@ func (h *Head) ReadWAL() error { } ms := h.series.getByID(s.Ref) if ms == nil { - return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) + h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref) + continue } _, chunkCreated := ms.append(s.T, s.V) if chunkCreated { @@ -210,7 +216,6 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } - return nil } deletesFunc := func(stones []Stone) error { @@ -222,7 +227,6 @@ func (h *Head) ReadWAL() error { h.tombstones.add(s.ref, itv) } } - return nil } @@ -379,17 +383,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro if t < a.mint { return 0, ErrOutOfBounds } - hash := lset.Hash() - - s := a.head.series.getByHash(hash, lset) - - if s == nil { - s = a.head.create(hash, lset) + s, created := a.head.getOrCreate(lset.Hash(), lset) + if created { a.series = append(a.series, RefSeries{ Ref: s.ref, Labels: lset, - hash: hash, }) } return s.ref, a.AddFast(s.ref, t, v) @@ -839,20 +838,32 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { return res, nil } -func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { - h.metrics.series.Inc() - h.metrics.seriesCreated.Inc() +func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) { + // Just using `getOrSet` below would be semantically sufficient, but we'd create + // a new series on every sample inserted via Add(), which causes allocations + // and makes our series IDs rather random and harder to compress in postings. + s := h.series.getByHash(hash, lset) + if s != nil { + return s, false + } // Optimistically assume that we are the first one to create the series. id := atomic.AddUint64(&h.lastSeriesID, 1) + + return h.getOrCreateWithID(id, hash, lset) +} + +func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) { s := newMemSeries(lset, id, h.chunkRange) s, created := h.series.getOrSet(hash, s) - // Skip indexing if we didn't actually create the series. if !created { - return s + return s, false } + h.metrics.series.Inc() + h.metrics.seriesCreated.Inc() + h.postings.add(id, lset) h.symMtx.Lock() @@ -870,7 +881,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { h.symbols[l.Value] = struct{}{} } - return s + return s, true } // seriesHashmap is a simple hashmap for memSeries by their label set. It is built diff --git a/head_test.go b/head_test.go index 724dab224a..b7603e78ee 100644 --- a/head_test.go +++ b/head_test.go @@ -21,6 +21,7 @@ import ( "unsafe" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" promlabels "github.com/prometheus/prometheus/pkg/labels" @@ -41,7 +42,7 @@ func BenchmarkCreateSeries(b *testing.B) { b.ResetTimer() for _, l := range lbls { - h.create(l.Hash(), l) + h.getOrCreate(l.Hash(), l) } } @@ -83,16 +84,92 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { return mets, nil } +type memoryWAL struct { + nopWAL + entries []interface{} +} + +func (w *memoryWAL) Reader() WALReader { + return w +} + +func (w *memoryWAL) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { + for _, e := range w.entries { + switch v := e.(type) { + case []RefSeries: + series(v) + case []RefSample: + samples(v) + case []Stone: + deletes(v) + } + } + return nil +} + +func TestHead_ReadWAL(t *testing.T) { + entries := []interface{}{ + []RefSeries{ + {Ref: 10, Labels: labels.FromStrings("a", "1")}, + {Ref: 11, Labels: labels.FromStrings("a", "2")}, + {Ref: 100, Labels: labels.FromStrings("a", "3")}, + }, + []RefSample{ + {Ref: 0, T: 99, V: 1}, + {Ref: 10, T: 100, V: 2}, + {Ref: 100, T: 100, V: 3}, + }, + []RefSeries{ + {Ref: 50, Labels: labels.FromStrings("a", "4")}, + }, + []RefSample{ + {Ref: 10, T: 101, V: 5}, + {Ref: 50, T: 101, V: 6}, + }, + } + wal := &memoryWAL{entries: entries} + + head, err := NewHead(nil, nil, wal, 1000) + require.NoError(t, err) + + require.NoError(t, head.ReadWAL()) + require.Equal(t, uint64(100), head.lastSeriesID) + + s10 := head.series.getByID(10) + s11 := head.series.getByID(11) + s50 := head.series.getByID(50) + s100 := head.series.getByID(100) + + require.Equal(t, labels.FromStrings("a", "1"), s10.lset) + require.Equal(t, labels.FromStrings("a", "2"), s11.lset) + require.Equal(t, labels.FromStrings("a", "4"), s50.lset) + require.Equal(t, labels.FromStrings("a", "3"), s100.lset) + + expandChunk := func(c chunks.Iterator) (x []sample) { + for c.Next() { + t, v := c.At() + x = append(x, sample{t: t, v: v}) + } + require.NoError(t, c.Err()) + return x + } + + require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) + require.Equal(t, 0, len(s11.chunks)) + require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) + require.Equal(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) +} + func TestHead_Truncate(t *testing.T) { h, err := NewHead(nil, nil, nil, 1000) require.NoError(t, err) h.initTime(0) - s1 := h.create(1, labels.FromStrings("a", "1", "b", "1")) - s2 := h.create(2, labels.FromStrings("a", "2", "b", "1")) - s3 := h.create(3, labels.FromStrings("a", "1", "b", "2")) - s4 := h.create(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) + s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) + s2, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1")) + s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) + s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) s1.chunks = []*memChunk{ {minTime: 0, maxTime: 999}, diff --git a/wal.go b/wal.go index 9af9a18536..27984ea0ce 100644 --- a/wal.go +++ b/wal.go @@ -99,9 +99,6 @@ type WALReader interface { type RefSeries struct { Ref uint64 Labels labels.Labels - - // hash for the label set. This field is not generally populated. - hash uint64 } // RefSample is a timestamp/value pair associated with a reference to a series. @@ -827,7 +824,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode series entry") } - seriesf(series) + if err := seriesf(series); err != nil { + return err + } cf := r.current() @@ -842,7 +841,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode samples entry") } - samplesf(samples) + if err := samplesf(samples); err != nil { + return err + } // Update the times for the WAL segment file. cf := r.current() @@ -858,7 +859,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode delete entry") } - deletesf(stones) + if err := deletesf(stones); err != nil { + return err + } // Update the times for the WAL segment file. cf := r.current()