mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Test multiple full write/restore cylcles in WAL
This commit is contained in:
parent
ca1bc920b7
commit
d3edfb5540
4
wal.go
4
wal.go
|
@ -299,6 +299,8 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
|
||||||
sz = int64(6 + 4 + len(buf))
|
sz = int64(6 + 4 + len(buf))
|
||||||
newsz = w.curN + sz
|
newsz = w.curN + sz
|
||||||
)
|
)
|
||||||
|
// XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened.
|
||||||
|
// Probably fine in general but may yield a lot of short files in some cases.
|
||||||
if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
|
if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
|
||||||
if err := w.cut(); err != nil {
|
if err := w.cut(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -431,6 +433,8 @@ func NewWALReader(rs ...io.ReadCloser) *WALReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
// At returns the last decoded entry of labels or samples.
|
// At returns the last decoded entry of labels or samples.
|
||||||
|
// The returned slices are only valid until the next call to Next(). Their elements
|
||||||
|
// have to be copied to preserve them.
|
||||||
func (r *WALReader) At() ([]labels.Labels, []refdSample) {
|
func (r *WALReader) At() ([]labels.Labels, []refdSample) {
|
||||||
return r.labels, r.samples
|
return r.labels, r.samples
|
||||||
}
|
}
|
||||||
|
|
100
wal_test.go
100
wal_test.go
|
@ -8,6 +8,8 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/fabxc/tsdb/labels"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -117,52 +119,88 @@ func TestWAL_cut(t *testing.T) {
|
||||||
|
|
||||||
// Symmetrical test of reading and writing to the WAL via its main interface.
|
// Symmetrical test of reading and writing to the WAL via its main interface.
|
||||||
func TestWAL_Log_Restore(t *testing.T) {
|
func TestWAL_Log_Restore(t *testing.T) {
|
||||||
|
const (
|
||||||
|
numMetrics = 5000
|
||||||
|
iterations = 5
|
||||||
|
stepSize = 100
|
||||||
|
)
|
||||||
// Generate testing data. It does not make semantical sense but
|
// Generate testing data. It does not make semantical sense but
|
||||||
// for the purpose of this test.
|
// for the purpose of this test.
|
||||||
series, err := readPrometheusLabels("testdata/20k.series", 10000)
|
series, err := readPrometheusLabels("testdata/20k.series", numMetrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var samples []refdSample
|
|
||||||
for i := 0; i < 200000; i++ {
|
|
||||||
samples = append(samples, refdSample{
|
|
||||||
ref: uint64(i % 10000),
|
|
||||||
t: int64(i * 2),
|
|
||||||
v: rand.Float64(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "test_wal_log_restore")
|
dir, err := ioutil.TempDir("", "test_wal_log_restore")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
w, err := OpenWAL(dir, nil, 0)
|
var (
|
||||||
require.NoError(t, err)
|
recordedSeries [][]labels.Labels
|
||||||
|
recordedSamples [][]refdSample
|
||||||
|
)
|
||||||
|
var totalSamples int
|
||||||
|
|
||||||
// Set smaller segment size so we can actually write several files.
|
// Open WAL a bunch of times, validate all previous data can be read,
|
||||||
w.segmentSize = 300 * 1000
|
// write more data to it, close it.
|
||||||
|
for k := 0; k < numMetrics; k += numMetrics / iterations {
|
||||||
|
w, err := OpenWAL(dir, nil, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
for i := 0; i < len(series); i += 100 {
|
// Set smaller segment size so we can actually write several files.
|
||||||
require.NoError(t, w.Log(series[i:i+100], samples[i*10:(i+100)*10]))
|
w.segmentSize = 1000 * 1000
|
||||||
}
|
|
||||||
|
|
||||||
require.NoError(t, w.Close())
|
r := w.Reader()
|
||||||
|
|
||||||
w, err = OpenWAL(dir, nil, 0)
|
var (
|
||||||
r := w.Reader()
|
resultSeries [][]labels.Labels
|
||||||
|
resultSamples [][]refdSample
|
||||||
|
)
|
||||||
|
|
||||||
var i, j int
|
for r.Next() {
|
||||||
|
lsets, smpls := r.At()
|
||||||
|
|
||||||
for r.Next() {
|
if len(lsets) > 0 {
|
||||||
lsets, smpls := r.At()
|
clsets := make([]labels.Labels, len(lsets))
|
||||||
|
copy(clsets, lsets)
|
||||||
if l := len(lsets); l > 0 {
|
resultSeries = append(resultSeries, clsets)
|
||||||
require.Equal(t, series[i:i+l], lsets)
|
}
|
||||||
i += l
|
if len(smpls) > 0 {
|
||||||
|
csmpls := make([]refdSample, len(smpls))
|
||||||
|
copy(csmpls, smpls)
|
||||||
|
resultSamples = append(resultSamples, csmpls)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if l := len(smpls); l > 0 {
|
require.NoError(t, r.Err())
|
||||||
require.Equal(t, samples[j:j+l], smpls)
|
|
||||||
j += l
|
require.Equal(t, recordedSamples, resultSamples)
|
||||||
|
require.Equal(t, recordedSeries, resultSeries)
|
||||||
|
|
||||||
|
series := series[k : k+(numMetrics/iterations)]
|
||||||
|
|
||||||
|
// Insert in batches and generate different amounts of samples for each.
|
||||||
|
for i := 0; i < len(series); i += stepSize {
|
||||||
|
var samples []refdSample
|
||||||
|
|
||||||
|
for j := 0; j < i*10; j++ {
|
||||||
|
samples = append(samples, refdSample{
|
||||||
|
ref: uint64(j % 10000),
|
||||||
|
t: int64(j * 2),
|
||||||
|
v: rand.Float64(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
lbls := series[i : i+stepSize]
|
||||||
|
|
||||||
|
require.NoError(t, w.Log(lbls, samples))
|
||||||
|
|
||||||
|
if len(lbls) > 0 {
|
||||||
|
recordedSeries = append(recordedSeries, lbls)
|
||||||
|
}
|
||||||
|
if len(samples) > 0 {
|
||||||
|
recordedSamples = append(recordedSamples, samples)
|
||||||
|
totalSamples += len(samples)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.NoError(t, w.Close())
|
||||||
}
|
}
|
||||||
require.NoError(t, r.Err())
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue