From 06f1ba73eb730239aced75b02aa271a3307ad185 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Wed, 3 Jul 2019 07:23:13 -0600 Subject: [PATCH] Provide flag to compress the tsdb WAL Signed-off-by: Chris Marchbanks --- cmd/prometheus/main.go | 4 + storage/remote/wal_watcher_test.go | 701 +++++++++++++++-------------- storage/tsdb/tsdb.go | 4 + 3 files changed, 374 insertions(+), 335 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f6d936acc5..7b68003da7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -207,6 +207,9 @@ func main() { a.Flag("storage.tsdb.allow-overlapping-blocks", "[EXPERIMENTAL] Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge."). Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks) + a.Flag("storage.tsdb.wal-compression", "Compress the tsdb WAL."). + Default("false").BoolVar(&cfg.tsdb.WALCompression) + a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) @@ -671,6 +674,7 @@ func main() { "RetentionDuration", cfg.tsdb.RetentionDuration, "WALSegmentSize", cfg.tsdb.WALSegmentSize, "AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks, + "WALCompression", cfg.tsdb.WALCompression, ) startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index 34c0b9f36c..a039a82605 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -92,71 +92,75 @@ func TestTailSamples(t *testing.T) { pageSize := 32 * 1024 const seriesCount = 10 const samplesCount = 250 - now := time.Now() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + now := time.Now() - dir, err := ioutil.TempDir("", "readCheckpoint") - testutil.Ok(t, err) - defer os.RemoveAll(dir) + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - wdir := path.Join(dir, "wal") - err = os.Mkdir(wdir, 0777) - testutil.Ok(t, err) + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) - testutil.Ok(t, err) + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress) + testutil.Ok(t, err) - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]tsdb.RefSeries{ - tsdb.RefSeries{ - Ref: uint64(ref), - Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, - }, - }, nil) - testutil.Ok(t, w.Log(series)) + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ - tsdb.RefSample{ - Ref: uint64(inner), - T: int64(now.UnixNano()) + 1, - V: float64(i), - }, - }, nil) - testutil.Ok(t, w.Log(sample)) - } + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(now.UnixNano()) + 1, + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + // Start read after checkpoint, no more data written. + first, last, err := w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + watcher := NewWALWatcher(nil, "", wt, dir) + watcher.startTime = now.UnixNano() + + // Set the Watcher's metrics so they're not nil pointers. + watcher.setMetrics() + for i := first; i <= last; i++ { + segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i)) + testutil.Ok(t, err) + defer segment.Close() + + reader := wal.NewLiveReader(nil, liveReaderMetrics, segment) + // Use tail true so we can ensure we got the right number of samples. + watcher.readSegment(reader, i, true) + } + + expectedSeries := seriesCount + expectedSamples := seriesCount * samplesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expectedSeries + }) + testutil.Equals(t, expectedSeries, wt.checkNumLabels()) + testutil.Equals(t, expectedSamples, wt.samplesAppended) + }) } - - // Start read after checkpoint, no more data written. - first, last, err := w.Segments() - testutil.Ok(t, err) - - wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - watcher.startTime = now.UnixNano() - - // Set the Watcher's metrics so they're not nil pointers. - watcher.setMetrics() - for i := first; i <= last; i++ { - segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i)) - testutil.Ok(t, err) - defer segment.Close() - - reader := wal.NewLiveReader(nil, liveReaderMetrics, segment) - // Use tail true so we can ensure we got the right number of samples. - watcher.readSegment(reader, i, true) - } - - expectedSeries := seriesCount - expectedSamples := seriesCount * samplesCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expectedSeries - }) - testutil.Equals(t, expectedSeries, wt.checkNumLabels()) - testutil.Equals(t, expectedSamples, wt.samplesAppended) } func TestReadToEndNoCheckpoint(t *testing.T) { @@ -164,61 +168,65 @@ func TestReadToEndNoCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - dir, err := ioutil.TempDir("", "readToEnd_noCheckpoint") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - wdir := path.Join(dir, "wal") - err = os.Mkdir(wdir, 0777) - testutil.Ok(t, err) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "readToEnd_noCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) - testutil.Ok(t, err) + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress) + testutil.Ok(t, err) - var recs [][]byte + var recs [][]byte - enc := tsdb.RecordEncoder{} + enc := tsdb.RecordEncoder{} - for i := 0; i < seriesCount; i++ { - series := enc.Series([]tsdb.RefSeries{ - tsdb.RefSeries{ - Ref: uint64(i), - Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, - }, - }, nil) - recs = append(recs, series) - for j := 0; j < samplesCount; j++ { - sample := enc.Samples([]tsdb.RefSample{ - tsdb.RefSample{ - Ref: uint64(j), - T: int64(i), - V: float64(i), - }, - }, nil) + for i := 0; i < seriesCount; i++ { + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + recs = append(recs, series) + for j := 0; j < samplesCount; j++ { + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(j), + T: int64(i), + V: float64(i), + }, + }, nil) - recs = append(recs, sample) + recs = append(recs, sample) - // Randomly batch up records. - if rand.Intn(4) < 3 { - testutil.Ok(t, w.Log(recs...)) - recs = recs[:0] + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } } - } + testutil.Ok(t, w.Log(recs...)) + + _, _, err = w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + watcher := NewWALWatcher(nil, "", wt, dir) + go watcher.Start() + + expected := seriesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expected + }) + watcher.Stop() + testutil.Equals(t, expected, wt.checkNumLabels()) + }) } - testutil.Ok(t, w.Log(recs...)) - - _, _, err = w.Segments() - testutil.Ok(t, err) - - wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - go watcher.Start() - - expected := seriesCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expected - }) - watcher.Stop() - testutil.Equals(t, expected, wt.checkNumLabels()) } func TestReadToEndWithCheckpoint(t *testing.T) { @@ -228,79 +236,83 @@ func TestReadToEndWithCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - dir, err := ioutil.TempDir("", "readToEnd_withCheckpoint") - testutil.Ok(t, err) - defer os.RemoveAll(dir) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "readToEnd_withCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - wdir := path.Join(dir, "wal") - err = os.Mkdir(wdir, 0777) - testutil.Ok(t, err) + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, segmentSize, false) - testutil.Ok(t, err) + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, segmentSize, compress) + testutil.Ok(t, err) - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]tsdb.RefSeries{ - tsdb.RefSeries{ - Ref: uint64(ref), - Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, - }, - }, nil) - testutil.Ok(t, w.Log(series)) + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ - tsdb.RefSample{ - Ref: uint64(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - testutil.Ok(t, w.Log(sample)) - } + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0) + w.Truncate(1) + + // Write more records after checkpointing. + for i := 0; i < seriesCount; i++ { + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount; j++ { + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(j), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + _, _, err = w.Segments() + testutil.Ok(t, err) + wt := newWriteToMock() + watcher := NewWALWatcher(nil, "", wt, dir) + go watcher.Start() + + expected := seriesCount * 2 + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expected + }) + watcher.Stop() + testutil.Equals(t, expected, wt.checkNumLabels()) + }) } - - tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0) - w.Truncate(1) - - // Write more records after checkpointing. - for i := 0; i < seriesCount; i++ { - series := enc.Series([]tsdb.RefSeries{ - tsdb.RefSeries{ - Ref: uint64(i), - Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, - }, - }, nil) - testutil.Ok(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - sample := enc.Samples([]tsdb.RefSample{ - tsdb.RefSample{ - Ref: uint64(j), - T: int64(i), - V: float64(i), - }, - }, nil) - testutil.Ok(t, w.Log(sample)) - } - } - - _, _, err = w.Segments() - testutil.Ok(t, err) - wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - go watcher.Start() - - expected := seriesCount * 2 - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expected - }) - watcher.Stop() - testutil.Equals(t, expected, wt.checkNumLabels()) } func TestReadCheckpoint(t *testing.T) { @@ -308,61 +320,65 @@ func TestReadCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - dir, err := ioutil.TempDir("", "readCheckpoint") - testutil.Ok(t, err) - defer os.RemoveAll(dir) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - wdir := path.Join(dir, "wal") - err = os.Mkdir(wdir, 0777) - testutil.Ok(t, err) + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) - os.Create(wal.SegmentName(wdir, 30)) + os.Create(wal.SegmentName(wdir, 30)) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) - testutil.Ok(t, err) + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress) + testutil.Ok(t, err) - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]tsdb.RefSeries{ - tsdb.RefSeries{ - Ref: uint64(ref), - Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, - }, - }, nil) - testutil.Ok(t, w.Log(series)) + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ - tsdb.RefSample{ - Ref: uint64(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - testutil.Ok(t, w.Log(sample)) - } + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) + w.Truncate(32) + + // Start read after checkpoint, no more data written. + _, _, err = w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + watcher := NewWALWatcher(nil, "", wt, dir) + // watcher. + go watcher.Start() + + expectedSeries := seriesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expectedSeries + }) + watcher.Stop() + testutil.Equals(t, expectedSeries, wt.checkNumLabels()) + }) } - tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) - w.Truncate(32) - - // Start read after checkpoint, no more data written. - _, _, err = w.Segments() - testutil.Ok(t, err) - - wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - // watcher. - go watcher.Start() - - expectedSeries := seriesCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expectedSeries - }) - watcher.Stop() - testutil.Equals(t, expectedSeries, wt.checkNumLabels()) } func TestReadCheckpointMultipleSegments(t *testing.T) { @@ -372,65 +388,69 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { const seriesCount = 20 const samplesCount = 300 - dir, err := ioutil.TempDir("", "readCheckpoint") - testutil.Ok(t, err) - defer os.RemoveAll(dir) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - wdir := path.Join(dir, "wal") - err = os.Mkdir(wdir, 0777) - testutil.Ok(t, err) + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, pageSize, false) - testutil.Ok(t, err) + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, pageSize, compress) + testutil.Ok(t, err) - // Write a bunch of data. - for i := 0; i < segments; i++ { - for j := 0; j < seriesCount; j++ { - ref := j + (i * 100) - series := enc.Series([]tsdb.RefSeries{ - tsdb.RefSeries{ - Ref: uint64(ref), - Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", j)}}, - }, - }, nil) - testutil.Ok(t, w.Log(series)) + // Write a bunch of data. + for i := 0; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", j)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) - for k := 0; k < samplesCount; k++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ - tsdb.RefSample{ - Ref: uint64(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - testutil.Ok(t, w.Log(sample)) + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } } - } + + // At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5. + checkpointDir := dir + "/wal/checkpoint.000004" + err = os.Mkdir(checkpointDir, 0777) + testutil.Ok(t, err) + for i := 0; i <= 4; i++ { + err := os.Rename(wal.SegmentName(dir+"/wal", i), wal.SegmentName(checkpointDir, i)) + testutil.Ok(t, err) + } + + wt := newWriteToMock() + watcher := NewWALWatcher(nil, "", wt, dir) + watcher.maxSegment = -1 + + // Set the Watcher's metrics so they're not nil pointers. + watcher.setMetrics() + + lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir) + testutil.Ok(t, err) + + err = watcher.readCheckpoint(lastCheckpoint) + testutil.Ok(t, err) + }) } - - // At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5. - checkpointDir := dir + "/wal/checkpoint.000004" - err = os.Mkdir(checkpointDir, 0777) - testutil.Ok(t, err) - for i := 0; i <= 4; i++ { - err := os.Rename(wal.SegmentName(dir+"/wal", i), wal.SegmentName(checkpointDir, i)) - testutil.Ok(t, err) - } - - wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - watcher.maxSegment = -1 - - // Set the Watcher's metrics so they're not nil pointers. - watcher.setMetrics() - - lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir) - testutil.Ok(t, err) - - err = watcher.readCheckpoint(lastCheckpoint) - testutil.Ok(t, err) } func TestCheckpointSeriesReset(t *testing.T) { @@ -439,71 +459,82 @@ func TestCheckpointSeriesReset(t *testing.T) { // in order to get enough segments for us to checkpoint. const seriesCount = 20 const samplesCount = 350 - - dir, err := ioutil.TempDir("", "seriesReset") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - wdir := path.Join(dir, "wal") - err = os.Mkdir(wdir, 0777) - testutil.Ok(t, err) - - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, segmentSize, false) - testutil.Ok(t, err) - - // Write to the initial segment, then checkpoint later. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]tsdb.RefSeries{ - tsdb.RefSeries{ - Ref: uint64(ref), - Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, - }, - }, nil) - testutil.Ok(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ - tsdb.RefSample{ - Ref: uint64(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - testutil.Ok(t, w.Log(sample)) - } + testCases := []struct { + compress bool + segments int + }{ + {compress: false, segments: 14}, + {compress: true, segments: 13}, } - _, _, err = w.Segments() - testutil.Ok(t, err) + for _, tc := range testCases { + t.Run(fmt.Sprintf("compress=%t", tc.compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "seriesReset") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - watcher.maxSegment = -1 - go watcher.Start() + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) - expected := seriesCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expected - }) - testutil.Equals(t, seriesCount, wt.checkNumLabels()) + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, segmentSize, tc.compress) + testutil.Ok(t, err) - _, err = tsdb.Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0) - testutil.Ok(t, err) + // Write to the initial segment, then checkpoint later. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) - err = w.Truncate(5) - testutil.Ok(t, err) + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } - _, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal")) - testutil.Ok(t, err) - err = watcher.garbageCollectSeries(cpi + 1) - testutil.Ok(t, err) + _, _, err = w.Segments() + testutil.Ok(t, err) - watcher.Stop() - // If you modify the checkpoint and truncate segment #'s run the test to see how - // many series records you end up with and change the last Equals check accordingly - // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) - testutil.Equals(t, 14, wt.checkNumLabels()) + wt := newWriteToMock() + watcher := NewWALWatcher(nil, "", wt, dir) + watcher.maxSegment = -1 + go watcher.Start() + + expected := seriesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expected + }) + testutil.Equals(t, seriesCount, wt.checkNumLabels()) + + _, err = tsdb.Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0) + testutil.Ok(t, err) + + err = w.Truncate(5) + testutil.Ok(t, err) + + _, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal")) + testutil.Ok(t, err) + err = watcher.garbageCollectSeries(cpi + 1) + testutil.Ok(t, err) + + watcher.Stop() + // If you modify the checkpoint and truncate segment #'s run the test to see how + // many series records you end up with and change the last Equals check accordingly + // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) + testutil.Equals(t, tc.segments, wt.checkNumLabels()) + }) + } } diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 55f59c8cb7..42cb343536 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -130,6 +130,9 @@ type Options struct { // When true it disables the overlapping blocks check. // This in-turn enables vertical compaction and vertical query merge. AllowOverlappingBlocks bool + + // When true records in the WAL will be compressed. + WALCompression bool } var ( @@ -195,6 +198,7 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t BlockRanges: rngs, NoLockfile: opts.NoLockfile, AllowOverlappingBlocks: opts.AllowOverlappingBlocks, + WALCompression: opts.WALCompression, }) if err != nil { return nil, err