mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
35aab01de0
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
371 lines
13 KiB
Go
371 lines
13 KiB
Go
// Copyright 2018 The Prometheus Authors
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package wlog
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
func TestLastCheckpoint(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
_, _, err := LastCheckpoint(dir)
|
|
require.Equal(t, record.ErrNotFound, err)
|
|
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0o777))
|
|
s, k, err := LastCheckpoint(dir)
|
|
require.NoError(t, err)
|
|
require.Equal(t, filepath.Join(dir, "checkpoint.0000"), s)
|
|
require.Equal(t, 0, k)
|
|
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0o777))
|
|
s, k, err = LastCheckpoint(dir)
|
|
require.NoError(t, err)
|
|
require.Equal(t, filepath.Join(dir, "checkpoint.0000"), s)
|
|
require.Equal(t, 0, k)
|
|
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0o777))
|
|
s, k, err = LastCheckpoint(dir)
|
|
require.NoError(t, err)
|
|
require.Equal(t, filepath.Join(dir, "checkpoint.1"), s)
|
|
require.Equal(t, 1, k)
|
|
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0o777))
|
|
s, k, err = LastCheckpoint(dir)
|
|
require.NoError(t, err)
|
|
require.Equal(t, filepath.Join(dir, "checkpoint.1000"), s)
|
|
require.Equal(t, 1000, k)
|
|
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.99999999"), 0o777))
|
|
s, k, err = LastCheckpoint(dir)
|
|
require.NoError(t, err)
|
|
require.Equal(t, filepath.Join(dir, "checkpoint.99999999"), s)
|
|
require.Equal(t, 99999999, k)
|
|
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000000"), 0o777))
|
|
s, k, err = LastCheckpoint(dir)
|
|
require.NoError(t, err)
|
|
require.Equal(t, filepath.Join(dir, "checkpoint.100000000"), s)
|
|
require.Equal(t, 100000000, k)
|
|
}
|
|
|
|
func TestDeleteCheckpoints(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
require.NoError(t, DeleteCheckpoints(dir, 0))
|
|
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00"), 0o777))
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.01"), 0o777))
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.02"), 0o777))
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.03"), 0o777))
|
|
|
|
require.NoError(t, DeleteCheckpoints(dir, 2))
|
|
|
|
files, err := os.ReadDir(dir)
|
|
require.NoError(t, err)
|
|
fns := []string{}
|
|
for _, f := range files {
|
|
fns = append(fns, f.Name())
|
|
}
|
|
require.Equal(t, []string{"checkpoint.02", "checkpoint.03"}, fns)
|
|
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.99999999"), 0o777))
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000000"), 0o777))
|
|
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000001"), 0o777))
|
|
|
|
require.NoError(t, DeleteCheckpoints(dir, 100000000))
|
|
|
|
files, err = os.ReadDir(dir)
|
|
require.NoError(t, err)
|
|
fns = []string{}
|
|
for _, f := range files {
|
|
fns = append(fns, f.Name())
|
|
}
|
|
require.Equal(t, []string{"checkpoint.100000000", "checkpoint.100000001"}, fns)
|
|
}
|
|
|
|
func TestCheckpoint(t *testing.T) {
|
|
makeHistogram := func(i int) *histogram.Histogram {
|
|
return &histogram.Histogram{
|
|
Count: 5 + uint64(i*4),
|
|
ZeroCount: 2 + uint64(i),
|
|
ZeroThreshold: 0.001,
|
|
Sum: 18.4 * float64(i+1),
|
|
Schema: 1,
|
|
PositiveSpans: []histogram.Span{
|
|
{Offset: 0, Length: 2},
|
|
{Offset: 1, Length: 2},
|
|
},
|
|
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
|
|
}
|
|
}
|
|
makeFloatHistogram := func(i int) *histogram.FloatHistogram {
|
|
return &histogram.FloatHistogram{
|
|
Count: 5 + float64(i*4),
|
|
ZeroCount: 2 + float64(i),
|
|
ZeroThreshold: 0.001,
|
|
Sum: 18.4 * float64(i+1),
|
|
Schema: 1,
|
|
PositiveSpans: []histogram.Span{
|
|
{Offset: 0, Length: 2},
|
|
{Offset: 1, Length: 2},
|
|
},
|
|
PositiveBuckets: []float64{float64(i + 1), 1, -1, 0},
|
|
}
|
|
}
|
|
|
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
var enc record.Encoder
|
|
// Create a dummy segment to bump the initial number.
|
|
seg, err := CreateSegment(dir, 100)
|
|
require.NoError(t, err)
|
|
require.NoError(t, seg.Close())
|
|
|
|
// Manually create checkpoint for 99 and earlier.
|
|
w, err := New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress)
|
|
require.NoError(t, err)
|
|
|
|
// Add some data we expect to be around later.
|
|
err = w.Log(enc.Series([]record.RefSeries{
|
|
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
|
|
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
|
|
}, nil))
|
|
require.NoError(t, err)
|
|
// Log an unknown record, that might have come from a future Prometheus version.
|
|
require.NoError(t, w.Log([]byte{255}))
|
|
require.NoError(t, w.Close())
|
|
|
|
// Start a WAL and write records to it as usual.
|
|
w, err = NewSize(nil, nil, dir, 64*1024, compress)
|
|
require.NoError(t, err)
|
|
|
|
samplesInWAL, histogramsInWAL, floatHistogramsInWAL := 0, 0, 0
|
|
var last int64
|
|
for i := 0; ; i++ {
|
|
_, n, err := Segments(w.Dir())
|
|
require.NoError(t, err)
|
|
if n >= 106 {
|
|
break
|
|
}
|
|
// Write some series initially.
|
|
if i == 0 {
|
|
b := enc.Series([]record.RefSeries{
|
|
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
|
|
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
|
|
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
|
|
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
|
|
}, nil)
|
|
require.NoError(t, w.Log(b))
|
|
|
|
b = enc.Metadata([]record.RefMetadata{
|
|
{Ref: 2, Unit: "unit", Help: "help"},
|
|
{Ref: 3, Unit: "unit", Help: "help"},
|
|
{Ref: 4, Unit: "unit", Help: "help"},
|
|
{Ref: 5, Unit: "unit", Help: "help"},
|
|
}, nil)
|
|
require.NoError(t, w.Log(b))
|
|
}
|
|
// Write samples until the WAL has enough segments.
|
|
// Make them have drifting timestamps within a record to see that they
|
|
// get filtered properly.
|
|
b := enc.Samples([]record.RefSample{
|
|
{Ref: 0, T: last, V: float64(i)},
|
|
{Ref: 1, T: last + 10000, V: float64(i)},
|
|
{Ref: 2, T: last + 20000, V: float64(i)},
|
|
{Ref: 3, T: last + 30000, V: float64(i)},
|
|
}, nil)
|
|
require.NoError(t, w.Log(b))
|
|
samplesInWAL += 4
|
|
h := makeHistogram(i)
|
|
b = enc.HistogramSamples([]record.RefHistogramSample{
|
|
{Ref: 0, T: last, H: h},
|
|
{Ref: 1, T: last + 10000, H: h},
|
|
{Ref: 2, T: last + 20000, H: h},
|
|
{Ref: 3, T: last + 30000, H: h},
|
|
}, nil)
|
|
require.NoError(t, w.Log(b))
|
|
histogramsInWAL += 4
|
|
fh := makeFloatHistogram(i)
|
|
b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
|
|
{Ref: 0, T: last, FH: fh},
|
|
{Ref: 1, T: last + 10000, FH: fh},
|
|
{Ref: 2, T: last + 20000, FH: fh},
|
|
{Ref: 3, T: last + 30000, FH: fh},
|
|
}, nil)
|
|
require.NoError(t, w.Log(b))
|
|
floatHistogramsInWAL += 4
|
|
|
|
b = enc.Exemplars([]record.RefExemplar{
|
|
{Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i))},
|
|
}, nil)
|
|
require.NoError(t, w.Log(b))
|
|
|
|
// Write changing metadata for each series. In the end, only the latest
|
|
// version should end up in the checkpoint.
|
|
b = enc.Metadata([]record.RefMetadata{
|
|
{Ref: 0, Unit: fmt.Sprintf("%d", last), Help: fmt.Sprintf("%d", last)},
|
|
{Ref: 1, Unit: fmt.Sprintf("%d", last), Help: fmt.Sprintf("%d", last)},
|
|
{Ref: 2, Unit: fmt.Sprintf("%d", last), Help: fmt.Sprintf("%d", last)},
|
|
{Ref: 3, Unit: fmt.Sprintf("%d", last), Help: fmt.Sprintf("%d", last)},
|
|
}, nil)
|
|
require.NoError(t, w.Log(b))
|
|
|
|
last += 100
|
|
}
|
|
require.NoError(t, w.Close())
|
|
|
|
stats, err := Checkpoint(log.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool {
|
|
return x%2 == 0
|
|
}, last/2)
|
|
require.NoError(t, err)
|
|
require.NoError(t, w.Truncate(107))
|
|
require.NoError(t, DeleteCheckpoints(w.Dir(), 106))
|
|
require.Equal(t, histogramsInWAL+floatHistogramsInWAL+samplesInWAL, stats.TotalSamples)
|
|
require.Greater(t, stats.DroppedSamples, 0)
|
|
|
|
// Only the new checkpoint should be left.
|
|
files, err := os.ReadDir(dir)
|
|
require.NoError(t, err)
|
|
require.Len(t, files, 1)
|
|
require.Equal(t, "checkpoint.00000106", files[0].Name())
|
|
|
|
sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.00000106"))
|
|
require.NoError(t, err)
|
|
defer sr.Close()
|
|
|
|
dec := record.NewDecoder(labels.NewSymbolTable())
|
|
var series []record.RefSeries
|
|
var metadata []record.RefMetadata
|
|
r := NewReader(sr)
|
|
|
|
samplesInCheckpoint, histogramsInCheckpoint, floatHistogramsInCheckpoint := 0, 0, 0
|
|
for r.Next() {
|
|
rec := r.Record()
|
|
|
|
switch dec.Type(rec) {
|
|
case record.Series:
|
|
series, err = dec.Series(rec, series)
|
|
require.NoError(t, err)
|
|
case record.Samples:
|
|
samples, err := dec.Samples(rec, nil)
|
|
require.NoError(t, err)
|
|
for _, s := range samples {
|
|
require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp")
|
|
}
|
|
samplesInCheckpoint += len(samples)
|
|
case record.HistogramSamples:
|
|
histograms, err := dec.HistogramSamples(rec, nil)
|
|
require.NoError(t, err)
|
|
for _, h := range histograms {
|
|
require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp")
|
|
}
|
|
histogramsInCheckpoint += len(histograms)
|
|
case record.FloatHistogramSamples:
|
|
floatHistograms, err := dec.FloatHistogramSamples(rec, nil)
|
|
require.NoError(t, err)
|
|
for _, h := range floatHistograms {
|
|
require.GreaterOrEqual(t, h.T, last/2, "float histogram with wrong timestamp")
|
|
}
|
|
floatHistogramsInCheckpoint += len(floatHistograms)
|
|
case record.Exemplars:
|
|
exemplars, err := dec.Exemplars(rec, nil)
|
|
require.NoError(t, err)
|
|
for _, e := range exemplars {
|
|
require.GreaterOrEqual(t, e.T, last/2, "exemplar with wrong timestamp")
|
|
}
|
|
case record.Metadata:
|
|
metadata, err = dec.Metadata(rec, metadata)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
require.NoError(t, r.Err())
|
|
// Making sure we replayed some samples. We expect >50% samples to be still present.
|
|
require.Greater(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.5)
|
|
require.Less(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.8)
|
|
require.Greater(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.5)
|
|
require.Less(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.8)
|
|
require.Greater(t, float64(floatHistogramsInCheckpoint)/float64(floatHistogramsInWAL), 0.5)
|
|
require.Less(t, float64(floatHistogramsInCheckpoint)/float64(floatHistogramsInWAL), 0.8)
|
|
|
|
expectedRefSeries := []record.RefSeries{
|
|
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
|
|
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
|
|
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
|
|
}
|
|
testutil.RequireEqual(t, expectedRefSeries, series)
|
|
|
|
expectedRefMetadata := []record.RefMetadata{
|
|
{Ref: 0, Unit: fmt.Sprintf("%d", last-100), Help: fmt.Sprintf("%d", last-100)},
|
|
{Ref: 2, Unit: fmt.Sprintf("%d", last-100), Help: fmt.Sprintf("%d", last-100)},
|
|
{Ref: 4, Unit: "unit", Help: "help"},
|
|
}
|
|
sort.Slice(metadata, func(i, j int) bool { return metadata[i].Ref < metadata[j].Ref })
|
|
require.Equal(t, expectedRefMetadata, metadata)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
|
// Create a new wlog with invalid data.
|
|
dir := t.TempDir()
|
|
w, err := NewSize(nil, nil, dir, 64*1024, CompressionNone)
|
|
require.NoError(t, err)
|
|
var enc record.Encoder
|
|
require.NoError(t, w.Log(enc.Series([]record.RefSeries{
|
|
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "2")},
|
|
}, nil)))
|
|
require.NoError(t, w.Close())
|
|
|
|
// Corrupt data.
|
|
f, err := os.OpenFile(filepath.Join(w.Dir(), "00000000"), os.O_WRONLY, 0o666)
|
|
require.NoError(t, err)
|
|
_, err = f.WriteAt([]byte{42}, 1)
|
|
require.NoError(t, err)
|
|
require.NoError(t, f.Close())
|
|
|
|
// Run the checkpoint and since the wlog contains corrupt data this should return an error.
|
|
_, err = Checkpoint(log.NewNopLogger(), w, 0, 1, nil, 0)
|
|
require.Error(t, err)
|
|
|
|
// Walk the wlog dir to make sure there are no tmp folder left behind after the error.
|
|
err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return fmt.Errorf("access err %q: %w", path, err)
|
|
}
|
|
if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") {
|
|
return fmt.Errorf("wlog dir contains temporary folder:%s", info.Name())
|
|
}
|
|
return nil
|
|
})
|
|
require.NoError(t, err)
|
|
}
|