diff --git a/checkpoint_test.go b/checkpoint_test.go index 074bb46e8..daa54df19 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -15,7 +15,6 @@ package tsdb import ( - "fmt" "io/ioutil" "os" "path/filepath" @@ -82,7 +81,7 @@ func TestDeleteCheckpoints(t *testing.T) { func TestCheckpoint(t *testing.T) { dir, err := ioutil.TempDir("", "test_checkpoint") testutil.Ok(t, err) - fmt.Println(dir) + defer os.RemoveAll(dir) var enc RecordEncoder // Create a dummy segment to bump the initial number. @@ -138,11 +137,10 @@ func TestCheckpoint(t *testing.T) { } testutil.Ok(t, w.Close()) - stats, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { + _, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { return x%2 == 0 }, last/2) testutil.Ok(t, err) - testutil.Equals(t, 106, stats.HighSegment) // Only the new checkpoint should be left. files, err := fileutil.ReadDir(dir) diff --git a/db.go b/db.go index fcfbeeeb2..df17991eb 100644 --- a/db.go +++ b/db.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" "golang.org/x/sync/errgroup" ) @@ -221,18 +222,18 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, errors.Wrap(err, "create leveled compactor") } - wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r) + wlog, err := wal.New(l, r, filepath.Join(dir, "wal")) if err != nil { return nil, err } - db.head, err = NewHead(r, l, wal, opts.BlockRanges[0]) + db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0]) if err != nil { return nil, err } if err := db.reload(); err != nil { return nil, err } - if err := db.head.ReadWAL(); err != nil { + if err := db.head.Init(); err != nil { return nil, errors.Wrap(err, "read WAL") } diff --git a/head.go b/head.go index 372842865..4e12369da 100644 --- a/head.go +++ b/head.go @@ -15,6 +15,7 @@ package tsdb import ( "math" + "path/filepath" "runtime" "sort" "strings" @@ -30,6 +31,7 @@ import ( "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) var ( @@ -53,9 +55,10 @@ var ( type Head struct { chunkRange int64 metrics *headMetrics - wal WAL + wal *wal.WAL logger log.Logger appendPool sync.Pool + bytesPool sync.Pool minTime, maxTime int64 lastSeriesID uint64 @@ -169,13 +172,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { } // NewHead opens the head block in dir. -func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) { if l == nil { l = log.NewNopLogger() } - if wal == nil { - wal = NopWAL() - } if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) } @@ -206,6 +206,8 @@ func (h *Head) processWALSamples( ) (unknownRefs uint64) { defer close(output) + maxt := h.MaxTime() + for samples := range input { for _, s := range samples { if s.T < mint || s.Ref%total != partition { @@ -221,17 +223,27 @@ func (h *Head) processWALSamples( h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } + if s.T > maxt { + maxt = s.T + } } output <- samples } + + for { + ht := h.MaxTime() + if maxt <= ht { + break + } + if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) { + break + } + } + return unknownRefs } -// ReadWAL initializes the head by consuming the write ahead log. -func (h *Head) ReadWAL() error { - defer h.postings.EnsureOrder() - - r := h.wal.Reader() +func (h *Head) loadWAL(r *wal.Reader) error { mint := h.MinTime() // Track number of samples that referenced a series we don't know about @@ -263,49 +275,71 @@ func (h *Head) ReadWAL() error { input = output } - // TODO(fabxc): series entries spread between samples can starve the sample workers. - // Even with bufferd channels, this can impact startup time with lots of series churn. - // We must not paralellize series creation itself but could make the indexing asynchronous. - seriesFunc := func(series []RefSeries) { - for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + var ( + dec RecordDecoder + series []RefSeries + samples []RefSample + tstones []Stone + ) + for r.Next() { + series, samples, tstones = series[:0], samples[:0], tstones[:0] + rec := r.Record() - if h.lastSeriesID < s.Ref { - h.lastSeriesID = s.Ref + switch dec.Type(rec) { + case RecordSeries: + series, err := dec.Series(rec, series) + if err != nil { + return errors.Wrap(err, "decode series") } - } - } - samplesFunc := func(samples []RefSample) { - // We split up the samples into chunks of 5000 samples or less. - // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise - // cause thousands of very large in flight buffers occupying large amounts - // of unused memory. - for len(samples) > 0 { - n := 5000 - if len(samples) < n { - n = len(samples) - } - var buf []RefSample - select { - case buf = <-input: - default: - } - firstInput <- append(buf[:0], samples[:n]...) - samples = samples[n:] - } - } - deletesFunc := func(stones []Stone) { - for _, s := range stones { - for _, itv := range s.intervals { - if itv.Maxt < mint { - continue + for _, s := range series { + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref } - h.tombstones.addInterval(s.ref, itv) } + case RecordSamples: + samples, err := dec.Samples(rec, samples) + if err != nil { + return errors.Wrap(err, "decode samples") + } + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + n := 5000 + if len(samples) < n { + n = len(samples) + } + var buf []RefSample + select { + case buf = <-input: + default: + } + firstInput <- append(buf[:0], samples[:n]...) + samples = samples[n:] + } + case RecordTombstones: + tstones, err := dec.Tombstones(rec, tstones) + if err != nil { + return errors.Wrap(err, "decode tombstones") + } + for _, s := range tstones { + for _, itv := range s.intervals { + if itv.Maxt < mint { + continue + } + h.tombstones.addInterval(s.ref, itv) + } + } + default: + return errors.Errorf("invalid record type %v", dec.Type(rec)) } } - - err := r.Read(seriesFunc, samplesFunc, deletesFunc) + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } // Signal termination to first worker and wait for last one to close its output channel. close(firstInput) @@ -313,16 +347,58 @@ func (h *Head) ReadWAL() error { } wg.Wait() - if err != nil { - return errors.Wrap(err, "consume WAL") - } if unknownRefs > 0 { - level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) + level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) } return nil } -// Truncate removes all data before mint from the head block and truncates its WAL. +// Init loads data from the write ahead log and prepares the head for writes. +func (h *Head) Init() error { + defer h.postings.EnsureOrder() + + if h.wal == nil { + return nil + } + + // Backfill the checkpoint first if it exists. + cp, n, err := LastCheckpoint(h.wal.Dir()) + if err != nil && err != ErrNotFound { + return errors.Wrap(err, "find last checkpoint") + } + if err == nil { + sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp)) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer sr.Close() + + // A corrupted checkpoint is a hard error for now and requires user + // intervention. There's likely little data that can be recovered anyway. + if err := h.loadWAL(wal.NewReader(sr)); err != nil { + return errors.Wrap(err, "backfill checkpoint") + } + n++ + } + + // Backfill segments from the last checkpoint onwards + sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1) + if err != nil { + return errors.Wrap(err, "open WAL segments") + } + defer sr.Close() + + err = h.loadWAL(wal.NewReader(sr)) + if err == nil { + return nil + } + if err := h.wal.Repair(err); err != nil { + return errors.Wrap(err, "repair corrupted WAL") + } + return nil +} + +// Truncate removes old data before mint from the head. func (h *Head) Truncate(mint int64) error { initialize := h.MinTime() == math.MinInt64 @@ -348,18 +424,37 @@ func (h *Head) Truncate(mint int64) error { level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start)) h.metrics.gcDuration.Observe(time.Since(start).Seconds()) + if h.wal == nil { + return nil + } start = time.Now() + m, n, err := h.wal.Segments() + if err != nil { + return errors.Wrap(err, "get segment range") + } + n-- // Never consider last segment for checkpoint. + if n < 0 { + return nil // no segments yet. + } + // The lower third of segments should contain mostly obsolete samples. + // If we have too few segments, it's not worth checkpointing yet. + n = m + (n-m)/3 + if n <= m { + return nil + } + keep := func(id uint64) bool { return h.series.getByID(id) != nil } - if err := h.wal.Truncate(mint, keep); err == nil { - level.Info(h.logger).Log("msg", "WAL truncation completed", "duration", time.Since(start)) - } else { - level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) + if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil { + return errors.Wrap(err, "create checkpoint") } h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) + level.Info(h.logger).Log("msg", "WAL checkpoint complete", + "low", m, "high", n, "duration", time.Since(start)) + return nil } @@ -468,6 +563,18 @@ func (h *Head) putAppendBuffer(b []RefSample) { h.appendPool.Put(b[:0]) } +func (h *Head) getBytesBuffer() []byte { + b := h.bytesPool.Get() + if b == nil { + return make([]byte, 0, 1024) + } + return b.([]byte) +} + +func (h *Head) putBytesBuffer(b []byte) { + h.bytesPool.Put(b[:0]) +} + type headAppender struct { head *Head mint, maxt int64 @@ -520,15 +627,42 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { return nil } +func (a *headAppender) log() error { + if a.head.wal == nil { + return nil + } + + buf := a.head.getBytesBuffer() + defer func() { a.head.putBytesBuffer(buf) }() + + var rec []byte + var enc RecordEncoder + + if len(a.series) > 0 { + rec = enc.Series(a.series, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log series") + } + } + if len(a.samples) > 0 { + rec = enc.Samples(a.samples, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log samples") + } + } + return nil +} + func (a *headAppender) Commit() error { defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) - if err := a.head.wal.LogSeries(a.series); err != nil { - return err - } - if err := a.head.wal.LogSamples(a.samples); err != nil { - return errors.Wrap(err, "WAL log samples") + if err := a.log(); err != nil { + return errors.Wrap(err, "write to WAL") } total := len(a.samples) @@ -568,7 +702,8 @@ func (a *headAppender) Rollback() error { // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. - return a.head.wal.LogSeries(a.series) + a.samples = nil + return a.log() } // Delete all samples in the range of [mint, maxt] for series that satisfy the given @@ -601,8 +736,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { if p.Err() != nil { return p.Err() } - if err := h.wal.LogDeletes(stones); err != nil { - return err + var enc RecordEncoder + + if h.wal != nil { + if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { + return err + } } for _, s := range stones { h.tombstones.addInterval(s.ref, s.intervals[0]) @@ -694,6 +833,9 @@ func (h *Head) MaxTime() int64 { // Close flushes the WAL and closes the head. func (h *Head) Close() error { + if h.wal == nil { + return nil + } return h.wal.Close() } diff --git a/head_test.go b/head_test.go index 9a8c89364..b06a66c26 100644 --- a/head_test.go +++ b/head_test.go @@ -14,7 +14,9 @@ package tsdb import ( + "io/ioutil" "math/rand" + "os" "testing" "github.com/prometheus/tsdb/chunkenc" @@ -22,6 +24,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func BenchmarkCreateSeries(b *testing.B) { @@ -42,42 +45,50 @@ func BenchmarkCreateSeries(b *testing.B) { } } -type memoryWAL struct { - nopWAL - entries []interface{} -} - -func (w *memoryWAL) LogSeries(s []RefSeries) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) LogSamples(s []RefSample) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) LogDeletes(s []Stone) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) Reader() WALReader { - return w -} - -func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), deletes func([]Stone)) error { - for _, e := range w.entries { - switch v := e.(type) { +func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { + var enc RecordEncoder + for _, r := range recs { + switch v := r.(type) { case []RefSeries: - series(v) + testutil.Ok(t, w.Log(enc.Series(v, nil))) case []RefSample: - samples(v) + testutil.Ok(t, w.Log(enc.Samples(v, nil))) case []Stone: - deletes(v) + testutil.Ok(t, w.Log(enc.Tombstones(v, nil))) } } - return nil +} + +func readTestWAL(t testing.TB, dir string) (recs []interface{}) { + sr, err := wal.NewSegmentsReader(dir) + testutil.Ok(t, err) + defer sr.Close() + + var dec RecordDecoder + r := wal.NewReader(sr) + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err := dec.Series(rec, nil) + testutil.Ok(t, err) + recs = append(recs, series) + case RecordSamples: + samples, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + recs = append(recs, samples) + case RecordTombstones: + tstones, err := dec.Tombstones(rec, nil) + testutil.Ok(t, err) + recs = append(recs, tstones) + default: + t.Fatalf("unknown record type") + } + } + testutil.Ok(t, r.Err()) + return recs } func TestHead_ReadWAL(t *testing.T) { @@ -100,13 +111,19 @@ func TestHead_ReadWAL(t *testing.T) { {Ref: 50, T: 101, V: 6}, }, } - wal := &memoryWAL{entries: entries} + dir, err := ioutil.TempDir("", "test_read_wal") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - head, err := NewHead(nil, nil, wal, 1000) + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) + populateTestWAL(t, w, entries) + + head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.ReadWAL()) + testutil.Ok(t, head.Init()) testutil.Equals(t, uint64(100), head.lastSeriesID) s10 := head.series.getByID(10) @@ -259,13 +276,19 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { {Ref: 50, T: 90, V: 1}, }, } - wal := &memoryWAL{entries: entries} + dir, err := ioutil.TempDir("", "test_delete_series") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - head, err := NewHead(nil, nil, wal, 1000) + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) + populateTestWAL(t, w, entries) + + head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.ReadWAL()) + testutil.Ok(t, head.Init()) testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) } @@ -705,7 +728,7 @@ func TestMemSeries_append(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, err := NewHead(nil, nil, NopWAL(), 1000) + h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() @@ -745,7 +768,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, NopWAL(), 1000) + h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() @@ -786,7 +809,12 @@ func TestGCSeriesAccess(t *testing.T) { } func TestHead_LogRollback(t *testing.T) { - w := &memoryWAL{} + dir, err := ioutil.TempDir("", "wal_rollback") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) h, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) @@ -795,9 +823,11 @@ func TestHead_LogRollback(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) - testutil.Equals(t, 1, len(w.entries)) + recs := readTestWAL(t, w.Dir()) - series, ok := w.entries[0].([]RefSeries) - testutil.Assert(t, ok, "expected series record but got %+v", w.entries[0]) + testutil.Equals(t, 1, len(recs)) + + series, ok := recs[0].([]RefSeries) + testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}) }