mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Merge pull request #73 from Gouthamve/skip-duplicate
Handle duplicate & out of order values in same txn
This commit is contained in:
commit
77c937b8e1
7
head.go
7
head.go
|
@ -357,6 +357,9 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
|||
if t < c.maxTime {
|
||||
return ErrOutOfOrderSample
|
||||
}
|
||||
|
||||
// We are allowing exact duplicates as we can encounter them in valid cases
|
||||
// like federation and erroring out at that time would be extremely noisy.
|
||||
if c.maxTime == t && math.Float64bits(ms.lastValue) != math.Float64bits(v) {
|
||||
return ErrAmendSample
|
||||
}
|
||||
|
@ -634,8 +637,8 @@ func (s *memSeries) append(t int64, v float64) bool {
|
|||
c.minTime = t
|
||||
} else {
|
||||
c = s.head()
|
||||
// Skip duplicate samples.
|
||||
if c.maxTime == t && s.lastValue != v {
|
||||
// Skip duplicate and out of order samples.
|
||||
if c.maxTime >= t {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
96
head_test.go
96
head_test.go
|
@ -92,24 +92,16 @@ func TestAmendDatapointCausesError(t *testing.T) {
|
|||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating head block: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Error creating head block")
|
||||
|
||||
app := hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add sample: %s", err)
|
||||
}
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatalf("Unexpected error committing appender: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Failed to add sample")
|
||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||
|
||||
app = hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, 1)
|
||||
if err != ErrAmendSample {
|
||||
t.Fatalf("Expected error amending sample, got: %s", err)
|
||||
}
|
||||
require.Equal(t, ErrAmendSample, err)
|
||||
}
|
||||
|
||||
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
||||
|
@ -117,24 +109,16 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
|||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating head block: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Error creating head block")
|
||||
|
||||
app := hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, math.NaN())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add sample: %s", err)
|
||||
}
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatalf("Unexpected error committing appender: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Failed to add sample")
|
||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||
|
||||
app = hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, math.NaN())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding duplicate NaN sample, got: %s", err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
||||
|
@ -142,22 +126,62 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
|||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating head block: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Error creating head block")
|
||||
|
||||
app := hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add sample: %s", err)
|
||||
}
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatalf("Unexpected error committing appender: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Failed to add sample")
|
||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||
|
||||
app = hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002))
|
||||
if err != ErrAmendSample {
|
||||
t.Fatalf("Expected error amending sample, got: %s", err)
|
||||
}
|
||||
require.Equal(t, ErrAmendSample, err)
|
||||
}
|
||||
|
||||
func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
||||
tmpdir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Append AmendedValue.
|
||||
app := hb.Appender()
|
||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 1)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, uint64(1), hb.Meta().Stats.NumSamples)
|
||||
|
||||
// Make sure the right value is stored.
|
||||
q := hb.Querier(0, 10)
|
||||
ss := q.Select(labels.NewEqualMatcher("a", "b"))
|
||||
ssMap, err := readSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, map[string][]sample{
|
||||
labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}},
|
||||
}, ssMap)
|
||||
|
||||
require.NoError(t, q.Close())
|
||||
|
||||
// Append Out of Order Value.
|
||||
app = hb.Appender()
|
||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 10, 3)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 7, 5)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, uint64(2), hb.Meta().Stats.NumSamples)
|
||||
|
||||
q = hb.Querier(0, 10)
|
||||
ss = q.Select(labels.NewEqualMatcher("a", "b"))
|
||||
ssMap, err = readSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, map[string][]sample{
|
||||
labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}},
|
||||
}, ssMap)
|
||||
require.NoError(t, q.Close())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue