Provide flag to compress the tsdb WAL

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2019-07-03 07:23:13 -06:00
parent 475ca2ecd0
commit 06f1ba73eb
No known key found for this signature in database
GPG key ID: B7FD940BC86A8E7A
3 changed files with 374 additions and 335 deletions

View file

@ -207,6 +207,9 @@ func main() {
a.Flag("storage.tsdb.allow-overlapping-blocks", "[EXPERIMENTAL] Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge."). a.Flag("storage.tsdb.allow-overlapping-blocks", "[EXPERIMENTAL] Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge.").
Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks) Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks)
a.Flag("storage.tsdb.wal-compression", "Compress the tsdb WAL.").
Default("false").BoolVar(&cfg.tsdb.WALCompression)
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline) Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
@ -671,6 +674,7 @@ func main() {
"RetentionDuration", cfg.tsdb.RetentionDuration, "RetentionDuration", cfg.tsdb.RetentionDuration,
"WALSegmentSize", cfg.tsdb.WALSegmentSize, "WALSegmentSize", cfg.tsdb.WALSegmentSize,
"AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks, "AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks,
"WALCompression", cfg.tsdb.WALCompression,
) )
startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)

View file

@ -92,71 +92,75 @@ func TestTailSamples(t *testing.T) {
pageSize := 32 * 1024 pageSize := 32 * 1024
const seriesCount = 10 const seriesCount = 10
const samplesCount = 250 const samplesCount = 250
now := time.Now() for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
now := time.Now()
dir, err := ioutil.TempDir("", "readCheckpoint") dir, err := ioutil.TempDir("", "readCheckpoint")
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
wdir := path.Join(dir, "wal") wdir := path.Join(dir, "wal")
err = os.Mkdir(wdir, 0777) err = os.Mkdir(wdir, 0777)
testutil.Ok(t, err) testutil.Ok(t, err)
enc := tsdb.RecordEncoder{} enc := tsdb.RecordEncoder{}
w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
// Write to the initial segment then checkpoint. // Write to the initial segment then checkpoint.
for i := 0; i < seriesCount; i++ { for i := 0; i < seriesCount; i++ {
ref := i + 100 ref := i + 100
series := enc.Series([]tsdb.RefSeries{ series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{ tsdb.RefSeries{
Ref: uint64(ref), Ref: uint64(ref),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}},
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(series)) testutil.Ok(t, w.Log(series))
for j := 0; j < samplesCount; j++ { for j := 0; j < samplesCount; j++ {
inner := rand.Intn(ref + 1) inner := rand.Intn(ref + 1)
sample := enc.Samples([]tsdb.RefSample{ sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{ tsdb.RefSample{
Ref: uint64(inner), Ref: uint64(inner),
T: int64(now.UnixNano()) + 1, T: int64(now.UnixNano()) + 1,
V: float64(i), V: float64(i),
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(sample)) testutil.Ok(t, w.Log(sample))
} }
}
// Start read after checkpoint, no more data written.
first, last, err := w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher.startTime = now.UnixNano()
// Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics()
for i := first; i <= last; i++ {
segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i))
testutil.Ok(t, err)
defer segment.Close()
reader := wal.NewLiveReader(nil, liveReaderMetrics, segment)
// Use tail true so we can ensure we got the right number of samples.
watcher.readSegment(reader, i, true)
}
expectedSeries := seriesCount
expectedSamples := seriesCount * samplesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries
})
testutil.Equals(t, expectedSeries, wt.checkNumLabels())
testutil.Equals(t, expectedSamples, wt.samplesAppended)
})
} }
// Start read after checkpoint, no more data written.
first, last, err := w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher.startTime = now.UnixNano()
// Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics()
for i := first; i <= last; i++ {
segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i))
testutil.Ok(t, err)
defer segment.Close()
reader := wal.NewLiveReader(nil, liveReaderMetrics, segment)
// Use tail true so we can ensure we got the right number of samples.
watcher.readSegment(reader, i, true)
}
expectedSeries := seriesCount
expectedSamples := seriesCount * samplesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries
})
testutil.Equals(t, expectedSeries, wt.checkNumLabels())
testutil.Equals(t, expectedSamples, wt.samplesAppended)
} }
func TestReadToEndNoCheckpoint(t *testing.T) { func TestReadToEndNoCheckpoint(t *testing.T) {
@ -164,61 +168,65 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
const seriesCount = 10 const seriesCount = 10
const samplesCount = 250 const samplesCount = 250
dir, err := ioutil.TempDir("", "readToEnd_noCheckpoint") for _, compress := range []bool{false, true} {
testutil.Ok(t, err) t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
defer os.RemoveAll(dir) dir, err := ioutil.TempDir("", "readToEnd_noCheckpoint")
wdir := path.Join(dir, "wal") testutil.Ok(t, err)
err = os.Mkdir(wdir, 0777) defer os.RemoveAll(dir)
testutil.Ok(t, err) wdir := path.Join(dir, "wal")
err = os.Mkdir(wdir, 0777)
testutil.Ok(t, err)
w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
var recs [][]byte var recs [][]byte
enc := tsdb.RecordEncoder{} enc := tsdb.RecordEncoder{}
for i := 0; i < seriesCount; i++ { for i := 0; i < seriesCount; i++ {
series := enc.Series([]tsdb.RefSeries{ series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{ tsdb.RefSeries{
Ref: uint64(i), Ref: uint64(i),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}},
}, },
}, nil) }, nil)
recs = append(recs, series) recs = append(recs, series)
for j := 0; j < samplesCount; j++ { for j := 0; j < samplesCount; j++ {
sample := enc.Samples([]tsdb.RefSample{ sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{ tsdb.RefSample{
Ref: uint64(j), Ref: uint64(j),
T: int64(i), T: int64(i),
V: float64(i), V: float64(i),
}, },
}, nil) }, nil)
recs = append(recs, sample) recs = append(recs, sample)
// Randomly batch up records. // Randomly batch up records.
if rand.Intn(4) < 3 { if rand.Intn(4) < 3 {
testutil.Ok(t, w.Log(recs...)) testutil.Ok(t, w.Log(recs...))
recs = recs[:0] recs = recs[:0]
}
}
} }
} testutil.Ok(t, w.Log(recs...))
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
go watcher.Start()
expected := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expected
})
watcher.Stop()
testutil.Equals(t, expected, wt.checkNumLabels())
})
} }
testutil.Ok(t, w.Log(recs...))
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
go watcher.Start()
expected := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expected
})
watcher.Stop()
testutil.Equals(t, expected, wt.checkNumLabels())
} }
func TestReadToEndWithCheckpoint(t *testing.T) { func TestReadToEndWithCheckpoint(t *testing.T) {
@ -228,79 +236,83 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
const seriesCount = 10 const seriesCount = 10
const samplesCount = 250 const samplesCount = 250
dir, err := ioutil.TempDir("", "readToEnd_withCheckpoint") for _, compress := range []bool{false, true} {
testutil.Ok(t, err) t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
defer os.RemoveAll(dir) dir, err := ioutil.TempDir("", "readToEnd_withCheckpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
wdir := path.Join(dir, "wal") wdir := path.Join(dir, "wal")
err = os.Mkdir(wdir, 0777) err = os.Mkdir(wdir, 0777)
testutil.Ok(t, err) testutil.Ok(t, err)
enc := tsdb.RecordEncoder{} enc := tsdb.RecordEncoder{}
w, err := wal.NewSize(nil, nil, wdir, segmentSize, false) w, err := wal.NewSize(nil, nil, wdir, segmentSize, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
// Write to the initial segment then checkpoint. // Write to the initial segment then checkpoint.
for i := 0; i < seriesCount; i++ { for i := 0; i < seriesCount; i++ {
ref := i + 100 ref := i + 100
series := enc.Series([]tsdb.RefSeries{ series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{ tsdb.RefSeries{
Ref: uint64(ref), Ref: uint64(ref),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}},
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(series)) testutil.Ok(t, w.Log(series))
for j := 0; j < samplesCount; j++ { for j := 0; j < samplesCount; j++ {
inner := rand.Intn(ref + 1) inner := rand.Intn(ref + 1)
sample := enc.Samples([]tsdb.RefSample{ sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{ tsdb.RefSample{
Ref: uint64(inner), Ref: uint64(inner),
T: int64(i), T: int64(i),
V: float64(i), V: float64(i),
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(sample)) testutil.Ok(t, w.Log(sample))
} }
}
tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0)
w.Truncate(1)
// Write more records after checkpointing.
for i := 0; i < seriesCount; i++ {
series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{
Ref: uint64(i),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}},
},
}, nil)
testutil.Ok(t, w.Log(series))
for j := 0; j < samplesCount; j++ {
sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{
Ref: uint64(j),
T: int64(i),
V: float64(i),
},
}, nil)
testutil.Ok(t, w.Log(sample))
}
}
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
go watcher.Start()
expected := seriesCount * 2
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expected
})
watcher.Stop()
testutil.Equals(t, expected, wt.checkNumLabels())
})
} }
tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0)
w.Truncate(1)
// Write more records after checkpointing.
for i := 0; i < seriesCount; i++ {
series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{
Ref: uint64(i),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}},
},
}, nil)
testutil.Ok(t, w.Log(series))
for j := 0; j < samplesCount; j++ {
sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{
Ref: uint64(j),
T: int64(i),
V: float64(i),
},
}, nil)
testutil.Ok(t, w.Log(sample))
}
}
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
go watcher.Start()
expected := seriesCount * 2
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expected
})
watcher.Stop()
testutil.Equals(t, expected, wt.checkNumLabels())
} }
func TestReadCheckpoint(t *testing.T) { func TestReadCheckpoint(t *testing.T) {
@ -308,61 +320,65 @@ func TestReadCheckpoint(t *testing.T) {
const seriesCount = 10 const seriesCount = 10
const samplesCount = 250 const samplesCount = 250
dir, err := ioutil.TempDir("", "readCheckpoint") for _, compress := range []bool{false, true} {
testutil.Ok(t, err) t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
defer os.RemoveAll(dir) dir, err := ioutil.TempDir("", "readCheckpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
wdir := path.Join(dir, "wal") wdir := path.Join(dir, "wal")
err = os.Mkdir(wdir, 0777) err = os.Mkdir(wdir, 0777)
testutil.Ok(t, err) testutil.Ok(t, err)
os.Create(wal.SegmentName(wdir, 30)) os.Create(wal.SegmentName(wdir, 30))
enc := tsdb.RecordEncoder{} enc := tsdb.RecordEncoder{}
w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
// Write to the initial segment then checkpoint. // Write to the initial segment then checkpoint.
for i := 0; i < seriesCount; i++ { for i := 0; i < seriesCount; i++ {
ref := i + 100 ref := i + 100
series := enc.Series([]tsdb.RefSeries{ series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{ tsdb.RefSeries{
Ref: uint64(ref), Ref: uint64(ref),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}},
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(series)) testutil.Ok(t, w.Log(series))
for j := 0; j < samplesCount; j++ { for j := 0; j < samplesCount; j++ {
inner := rand.Intn(ref + 1) inner := rand.Intn(ref + 1)
sample := enc.Samples([]tsdb.RefSample{ sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{ tsdb.RefSample{
Ref: uint64(inner), Ref: uint64(inner),
T: int64(i), T: int64(i),
V: float64(i), V: float64(i),
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(sample)) testutil.Ok(t, w.Log(sample))
} }
}
tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0)
w.Truncate(32)
// Start read after checkpoint, no more data written.
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
// watcher.
go watcher.Start()
expectedSeries := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries
})
watcher.Stop()
testutil.Equals(t, expectedSeries, wt.checkNumLabels())
})
} }
tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0)
w.Truncate(32)
// Start read after checkpoint, no more data written.
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
// watcher.
go watcher.Start()
expectedSeries := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries
})
watcher.Stop()
testutil.Equals(t, expectedSeries, wt.checkNumLabels())
} }
func TestReadCheckpointMultipleSegments(t *testing.T) { func TestReadCheckpointMultipleSegments(t *testing.T) {
@ -372,65 +388,69 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
const seriesCount = 20 const seriesCount = 20
const samplesCount = 300 const samplesCount = 300
dir, err := ioutil.TempDir("", "readCheckpoint") for _, compress := range []bool{false, true} {
testutil.Ok(t, err) t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
defer os.RemoveAll(dir) dir, err := ioutil.TempDir("", "readCheckpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
wdir := path.Join(dir, "wal") wdir := path.Join(dir, "wal")
err = os.Mkdir(wdir, 0777) err = os.Mkdir(wdir, 0777)
testutil.Ok(t, err) testutil.Ok(t, err)
enc := tsdb.RecordEncoder{} enc := tsdb.RecordEncoder{}
w, err := wal.NewSize(nil, nil, wdir, pageSize, false) w, err := wal.NewSize(nil, nil, wdir, pageSize, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
// Write a bunch of data. // Write a bunch of data.
for i := 0; i < segments; i++ { for i := 0; i < segments; i++ {
for j := 0; j < seriesCount; j++ { for j := 0; j < seriesCount; j++ {
ref := j + (i * 100) ref := j + (i * 100)
series := enc.Series([]tsdb.RefSeries{ series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{ tsdb.RefSeries{
Ref: uint64(ref), Ref: uint64(ref),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", j)}}, Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", j)}},
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(series)) testutil.Ok(t, w.Log(series))
for k := 0; k < samplesCount; k++ { for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1) inner := rand.Intn(ref + 1)
sample := enc.Samples([]tsdb.RefSample{ sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{ tsdb.RefSample{
Ref: uint64(inner), Ref: uint64(inner),
T: int64(i), T: int64(i),
V: float64(i), V: float64(i),
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(sample)) testutil.Ok(t, w.Log(sample))
}
}
} }
}
// At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5.
checkpointDir := dir + "/wal/checkpoint.000004"
err = os.Mkdir(checkpointDir, 0777)
testutil.Ok(t, err)
for i := 0; i <= 4; i++ {
err := os.Rename(wal.SegmentName(dir+"/wal", i), wal.SegmentName(checkpointDir, i))
testutil.Ok(t, err)
}
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher.maxSegment = -1
// Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics()
lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir)
testutil.Ok(t, err)
err = watcher.readCheckpoint(lastCheckpoint)
testutil.Ok(t, err)
})
} }
// At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5.
checkpointDir := dir + "/wal/checkpoint.000004"
err = os.Mkdir(checkpointDir, 0777)
testutil.Ok(t, err)
for i := 0; i <= 4; i++ {
err := os.Rename(wal.SegmentName(dir+"/wal", i), wal.SegmentName(checkpointDir, i))
testutil.Ok(t, err)
}
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher.maxSegment = -1
// Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics()
lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir)
testutil.Ok(t, err)
err = watcher.readCheckpoint(lastCheckpoint)
testutil.Ok(t, err)
} }
func TestCheckpointSeriesReset(t *testing.T) { func TestCheckpointSeriesReset(t *testing.T) {
@ -439,71 +459,82 @@ func TestCheckpointSeriesReset(t *testing.T) {
// in order to get enough segments for us to checkpoint. // in order to get enough segments for us to checkpoint.
const seriesCount = 20 const seriesCount = 20
const samplesCount = 350 const samplesCount = 350
testCases := []struct {
dir, err := ioutil.TempDir("", "seriesReset") compress bool
testutil.Ok(t, err) segments int
defer os.RemoveAll(dir) }{
{compress: false, segments: 14},
wdir := path.Join(dir, "wal") {compress: true, segments: 13},
err = os.Mkdir(wdir, 0777)
testutil.Ok(t, err)
enc := tsdb.RecordEncoder{}
w, err := wal.NewSize(nil, nil, wdir, segmentSize, false)
testutil.Ok(t, err)
// Write to the initial segment, then checkpoint later.
for i := 0; i < seriesCount; i++ {
ref := i + 100
series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{
Ref: uint64(ref),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}},
},
}, nil)
testutil.Ok(t, w.Log(series))
for j := 0; j < samplesCount; j++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{
Ref: uint64(inner),
T: int64(i),
V: float64(i),
},
}, nil)
testutil.Ok(t, w.Log(sample))
}
} }
_, _, err = w.Segments() for _, tc := range testCases {
testutil.Ok(t, err) t.Run(fmt.Sprintf("compress=%t", tc.compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "seriesReset")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
wt := newWriteToMock() wdir := path.Join(dir, "wal")
watcher := NewWALWatcher(nil, "", wt, dir) err = os.Mkdir(wdir, 0777)
watcher.maxSegment = -1 testutil.Ok(t, err)
go watcher.Start()
expected := seriesCount enc := tsdb.RecordEncoder{}
retry(t, defaultRetryInterval, defaultRetries, func() bool { w, err := wal.NewSize(nil, nil, wdir, segmentSize, tc.compress)
return wt.checkNumLabels() >= expected testutil.Ok(t, err)
})
testutil.Equals(t, seriesCount, wt.checkNumLabels())
_, err = tsdb.Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0) // Write to the initial segment, then checkpoint later.
testutil.Ok(t, err) for i := 0; i < seriesCount; i++ {
ref := i + 100
series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{
Ref: uint64(ref),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}},
},
}, nil)
testutil.Ok(t, w.Log(series))
err = w.Truncate(5) for j := 0; j < samplesCount; j++ {
testutil.Ok(t, err) inner := rand.Intn(ref + 1)
sample := enc.Samples([]tsdb.RefSample{
tsdb.RefSample{
Ref: uint64(inner),
T: int64(i),
V: float64(i),
},
}, nil)
testutil.Ok(t, w.Log(sample))
}
}
_, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal")) _, _, err = w.Segments()
testutil.Ok(t, err) testutil.Ok(t, err)
err = watcher.garbageCollectSeries(cpi + 1)
testutil.Ok(t, err)
watcher.Stop() wt := newWriteToMock()
// If you modify the checkpoint and truncate segment #'s run the test to see how watcher := NewWALWatcher(nil, "", wt, dir)
// many series records you end up with and change the last Equals check accordingly watcher.maxSegment = -1
// or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) go watcher.Start()
testutil.Equals(t, 14, wt.checkNumLabels())
expected := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expected
})
testutil.Equals(t, seriesCount, wt.checkNumLabels())
_, err = tsdb.Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0)
testutil.Ok(t, err)
err = w.Truncate(5)
testutil.Ok(t, err)
_, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal"))
testutil.Ok(t, err)
err = watcher.garbageCollectSeries(cpi + 1)
testutil.Ok(t, err)
watcher.Stop()
// If you modify the checkpoint and truncate segment #'s run the test to see how
// many series records you end up with and change the last Equals check accordingly
// or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10)
testutil.Equals(t, tc.segments, wt.checkNumLabels())
})
}
} }

View file

@ -130,6 +130,9 @@ type Options struct {
// When true it disables the overlapping blocks check. // When true it disables the overlapping blocks check.
// This in-turn enables vertical compaction and vertical query merge. // This in-turn enables vertical compaction and vertical query merge.
AllowOverlappingBlocks bool AllowOverlappingBlocks bool
// When true records in the WAL will be compressed.
WALCompression bool
} }
var ( var (
@ -195,6 +198,7 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t
BlockRanges: rngs, BlockRanges: rngs,
NoLockfile: opts.NoLockfile, NoLockfile: opts.NoLockfile,
AllowOverlappingBlocks: opts.AllowOverlappingBlocks, AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
WALCompression: opts.WALCompression,
}) })
if err != nil { if err != nil {
return nil, err return nil, err