mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Handle duplicate & out of order values in same txn
Add docs about not erroring out on exact dupes. Moved tests to require.* Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
parent
085991c9da
commit
adaf4d2099
7
head.go
7
head.go
|
@ -357,6 +357,9 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
|||
if t < c.maxTime {
|
||||
return ErrOutOfOrderSample
|
||||
}
|
||||
|
||||
// We are allowing exact duplicates as we can encounter them in valid cases
|
||||
// like federation and erroring out at that time would be extremely noisy.
|
||||
if c.maxTime == t && math.Float64bits(ms.lastValue) != math.Float64bits(v) {
|
||||
return ErrAmendSample
|
||||
}
|
||||
|
@ -634,8 +637,8 @@ func (s *memSeries) append(t int64, v float64) bool {
|
|||
c.minTime = t
|
||||
} else {
|
||||
c = s.head()
|
||||
// Skip duplicate samples.
|
||||
if c.maxTime == t && s.lastValue != v {
|
||||
// Skip duplicate and out of order samples.
|
||||
if c.maxTime >= t {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
103
head_test.go
103
head_test.go
|
@ -92,24 +92,16 @@ func TestAmendDatapointCausesError(t *testing.T) {
|
|||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating head block: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Error creating head block")
|
||||
|
||||
app := hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add sample: %s", err)
|
||||
}
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatalf("Unexpected error committing appender: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Failed to add sample")
|
||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||
|
||||
app = hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, 1)
|
||||
if err != ErrAmendSample {
|
||||
t.Fatalf("Expected error amending sample, got: %s", err)
|
||||
}
|
||||
require.Equal(t, ErrAmendSample, err)
|
||||
}
|
||||
|
||||
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
||||
|
@ -117,24 +109,16 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
|||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating head block: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Error creating head block")
|
||||
|
||||
app := hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, math.NaN())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add sample: %s", err)
|
||||
}
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatalf("Unexpected error committing appender: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Failed to add sample")
|
||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||
|
||||
app = hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, math.NaN())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding duplicate NaN sample, got: %s", err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
||||
|
@ -142,22 +126,69 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
|||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating head block: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Error creating head block")
|
||||
|
||||
app := hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add sample: %s", err)
|
||||
}
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatalf("Unexpected error committing appender: %s", err)
|
||||
}
|
||||
require.NoError(t, err, "Failed to add sample")
|
||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||
|
||||
app = hb.Appender()
|
||||
_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002))
|
||||
if err != ErrAmendSample {
|
||||
t.Fatalf("Expected error amending sample, got: %s", err)
|
||||
}
|
||||
require.Equal(t, ErrAmendSample, err)
|
||||
}
|
||||
|
||||
func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
||||
tmpdir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Append AmendedValue
|
||||
app := hb.Appender()
|
||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 1)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, uint64(1), hb.Meta().Stats.NumSamples)
|
||||
|
||||
// Make sure the right value is stored.
|
||||
q := hb.Querier(0, 10)
|
||||
ss := q.Select(labels.NewEqualMatcher("a", "b"))
|
||||
require.True(t, ss.Next())
|
||||
it := ss.At().Iterator()
|
||||
require.True(t, it.Next())
|
||||
ts, v := it.At()
|
||||
require.Equal(t, int64(0), ts)
|
||||
require.Equal(t, float64(1), v)
|
||||
require.False(t, it.Next())
|
||||
require.False(t, ss.Next())
|
||||
require.NoError(t, q.Close())
|
||||
|
||||
// Append Out of Order Value
|
||||
app = hb.Appender()
|
||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 10, 3)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 7, 5)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, uint64(2), hb.Meta().Stats.NumSamples)
|
||||
|
||||
q = hb.Querier(0, 10)
|
||||
ss = q.Select(labels.NewEqualMatcher("a", "b"))
|
||||
require.True(t, ss.Next())
|
||||
it = ss.At().Iterator()
|
||||
require.True(t, it.Next())
|
||||
ts, v = it.At()
|
||||
require.Equal(t, int64(0), ts)
|
||||
require.Equal(t, float64(1), v)
|
||||
require.True(t, it.Next())
|
||||
ts, v = it.At()
|
||||
require.Equal(t, int64(10), ts)
|
||||
require.Equal(t, float64(3), v)
|
||||
require.False(t, it.Next())
|
||||
require.False(t, ss.Next())
|
||||
require.NoError(t, q.Close())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue