diff --git a/wal.go b/wal.go index 5b85ca3c96..b06425e076 100644 --- a/wal.go +++ b/wal.go @@ -299,6 +299,8 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { sz = int64(6 + 4 + len(buf)) newsz = w.curN + sz ) + // XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened. + // Probably fine in general but may yield a lot of short files in some cases. if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { if err := w.cut(); err != nil { return err @@ -431,6 +433,8 @@ func NewWALReader(rs ...io.ReadCloser) *WALReader { } // At returns the last decoded entry of labels or samples. +// The returned slices are only valid until the next call to Next(). Their elements +// have to be copied to preserve them. func (r *WALReader) At() ([]labels.Labels, []refdSample) { return r.labels, r.samples } diff --git a/wal_test.go b/wal_test.go index 773fc4e085..8442dbddc7 100644 --- a/wal_test.go +++ b/wal_test.go @@ -8,6 +8,8 @@ import ( "os" "testing" + "github.com/fabxc/tsdb/labels" + "github.com/coreos/etcd/pkg/fileutil" "github.com/stretchr/testify/require" ) @@ -117,52 +119,88 @@ func TestWAL_cut(t *testing.T) { // Symmetrical test of reading and writing to the WAL via its main interface. func TestWAL_Log_Restore(t *testing.T) { + const ( + numMetrics = 5000 + iterations = 5 + stepSize = 100 + ) // Generate testing data. It does not make semantical sense but // for the purpose of this test. - series, err := readPrometheusLabels("testdata/20k.series", 10000) + series, err := readPrometheusLabels("testdata/20k.series", numMetrics) require.NoError(t, err) - var samples []refdSample - for i := 0; i < 200000; i++ { - samples = append(samples, refdSample{ - ref: uint64(i % 10000), - t: int64(i * 2), - v: rand.Float64(), - }) - } - dir, err := ioutil.TempDir("", "test_wal_log_restore") require.NoError(t, err) defer os.RemoveAll(dir) - w, err := OpenWAL(dir, nil, 0) - require.NoError(t, err) + var ( + recordedSeries [][]labels.Labels + recordedSamples [][]refdSample + ) + var totalSamples int - // Set smaller segment size so we can actually write several files. - w.segmentSize = 300 * 1000 + // Open WAL a bunch of times, validate all previous data can be read, + // write more data to it, close it. + for k := 0; k < numMetrics; k += numMetrics / iterations { + w, err := OpenWAL(dir, nil, 0) + require.NoError(t, err) - for i := 0; i < len(series); i += 100 { - require.NoError(t, w.Log(series[i:i+100], samples[i*10:(i+100)*10])) - } + // Set smaller segment size so we can actually write several files. + w.segmentSize = 1000 * 1000 - require.NoError(t, w.Close()) + r := w.Reader() - w, err = OpenWAL(dir, nil, 0) - r := w.Reader() + var ( + resultSeries [][]labels.Labels + resultSamples [][]refdSample + ) - var i, j int + for r.Next() { + lsets, smpls := r.At() - for r.Next() { - lsets, smpls := r.At() - - if l := len(lsets); l > 0 { - require.Equal(t, series[i:i+l], lsets) - i += l + if len(lsets) > 0 { + clsets := make([]labels.Labels, len(lsets)) + copy(clsets, lsets) + resultSeries = append(resultSeries, clsets) + } + if len(smpls) > 0 { + csmpls := make([]refdSample, len(smpls)) + copy(csmpls, smpls) + resultSamples = append(resultSamples, csmpls) + } } - if l := len(smpls); l > 0 { - require.Equal(t, samples[j:j+l], smpls) - j += l + require.NoError(t, r.Err()) + + require.Equal(t, recordedSamples, resultSamples) + require.Equal(t, recordedSeries, resultSeries) + + series := series[k : k+(numMetrics/iterations)] + + // Insert in batches and generate different amounts of samples for each. + for i := 0; i < len(series); i += stepSize { + var samples []refdSample + + for j := 0; j < i*10; j++ { + samples = append(samples, refdSample{ + ref: uint64(j % 10000), + t: int64(j * 2), + v: rand.Float64(), + }) + } + + lbls := series[i : i+stepSize] + + require.NoError(t, w.Log(lbls, samples)) + + if len(lbls) > 0 { + recordedSeries = append(recordedSeries, lbls) + } + if len(samples) > 0 { + recordedSamples = append(recordedSamples, samples) + totalSamples += len(samples) + } } + + require.NoError(t, w.Close()) } - require.NoError(t, r.Err()) }