Error on amending histograms on append (#11308)

* Error on amending histograms on append

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Rename Matches to Equals

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2022-09-19 13:10:30 +05:30 committed by GitHub
parent 7ad36505d5
commit 2474c6fb2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 253 additions and 4 deletions

View file

@ -155,6 +155,113 @@ func (h *Histogram) CumulativeBucketIterator() BucketIterator {
return &cumulativeBucketIterator{h: h, posSpansIdx: -1}
}
// Equals returns true if the given histogram matches exactly.
// Exact match is when there are no new buckets (even empty) and no missing buckets,
// and all the bucket values match. Spans can have different empty length spans in between,
// but they must represent the same bucket layout to match.
func (h *Histogram) Equals(h2 *Histogram) bool {
if h2 == nil {
return false
}
if h.Schema != h2.Schema || h.ZeroThreshold != h2.ZeroThreshold ||
h.ZeroCount != h2.ZeroCount || h.Count != h2.Count || h.Sum != h2.Sum {
return false
}
if !spansMatch(h.PositiveSpans, h2.PositiveSpans) {
return false
}
if !spansMatch(h.NegativeSpans, h2.NegativeSpans) {
return false
}
if !bucketsMatch(h.PositiveBuckets, h2.PositiveBuckets) {
return false
}
if !bucketsMatch(h.NegativeBuckets, h2.NegativeBuckets) {
return false
}
return true
}
// spansMatch returns true if both spans represent the same bucket layout
// after combining zero length spans with the next non-zero length span.
func spansMatch(s1, s2 []Span) bool {
if len(s1) == 0 && len(s2) == 0 {
return true
}
s1idx, s2idx := 0, 0
for {
if s1idx >= len(s1) {
return allEmptySpans(s2[s2idx:])
}
if s2idx >= len(s2) {
return allEmptySpans(s1[s1idx:])
}
currS1, currS2 := s1[s1idx], s2[s2idx]
s1idx++
s2idx++
if currS1.Length == 0 {
// This span is zero length, so we add consecutive such spans
// until we find a non-zero span.
for ; s1idx < len(s1) && s1[s1idx].Length == 0; s1idx++ {
currS1.Offset += s1[s1idx].Offset
}
if s1idx < len(s1) {
currS1.Offset += s1[s1idx].Offset
currS1.Length = s1[s1idx].Length
s1idx++
}
}
if currS2.Length == 0 {
// This span is zero length, so we add consecutive such spans
// until we find a non-zero span.
for ; s2idx < len(s2) && s2[s2idx].Length == 0; s2idx++ {
currS2.Offset += s2[s2idx].Offset
}
if s2idx < len(s2) {
currS2.Offset += s2[s2idx].Offset
currS2.Length = s2[s2idx].Length
s2idx++
}
}
if currS1.Length == 0 && currS2.Length == 0 {
// The last spans of both set are zero length. Previous spans match.
return true
}
if currS1.Offset != currS2.Offset || currS1.Length != currS2.Length {
return false
}
}
}
func allEmptySpans(s []Span) bool {
for _, ss := range s {
if ss.Length > 0 {
return false
}
}
return true
}
func bucketsMatch(b1, b2 []int64) bool {
if len(b1) != len(b2) {
return false
}
for i, b := range b1 {
if b != b2[i] {
return false
}
}
return true
}
// ToFloat returns a FloatHistogram representation of the Histogram. It is a
// deep copy (e.g. spans are not shared).
func (h *Histogram) ToFloat() *FloatHistogram {

View file

@ -410,3 +410,117 @@ func TestHistogramToFloat(t *testing.T) {
require.Equal(t, h.String(), fh.String())
}
func TestHistogramMatches(t *testing.T) {
h1 := Histogram{
Schema: 3,
Count: 61,
Sum: 2.7,
ZeroThreshold: 0.1,
ZeroCount: 42,
PositiveSpans: []Span{
{Offset: 0, Length: 4},
{Offset: 10, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []Span{
{Offset: 0, Length: 4},
{Offset: 10, Length: 3},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
}
h2 := h1.Copy()
require.True(t, h1.Equals(h2))
// Changed spans but same layout.
h2.PositiveSpans = append(h2.PositiveSpans, Span{Offset: 5})
h2.NegativeSpans = append(h2.NegativeSpans, Span{Offset: 2})
require.True(t, h1.Equals(h2))
require.True(t, h2.Equals(&h1))
// Adding empty spans in between.
h2.PositiveSpans[1].Offset = 6
h2.PositiveSpans = []Span{
h2.PositiveSpans[0],
{Offset: 1},
{Offset: 3},
h2.PositiveSpans[1],
h2.PositiveSpans[2],
}
h2.NegativeSpans[1].Offset = 5
h2.NegativeSpans = []Span{
h2.NegativeSpans[0],
{Offset: 2},
{Offset: 3},
h2.NegativeSpans[1],
h2.NegativeSpans[2],
}
require.True(t, h1.Equals(h2))
require.True(t, h2.Equals(&h1))
// All mismatches.
require.False(t, h1.Equals(nil))
h2.Schema = 1
require.False(t, h1.Equals(h2))
h2 = h1.Copy()
h2.Count++
require.False(t, h1.Equals(h2))
h2 = h1.Copy()
h2.Sum++
require.False(t, h1.Equals(h2))
h2 = h1.Copy()
h2.ZeroThreshold++
require.False(t, h1.Equals(h2))
h2 = h1.Copy()
h2.ZeroCount++
require.False(t, h1.Equals(h2))
// Changing value of buckets.
h2 = h1.Copy()
h2.PositiveBuckets[len(h2.PositiveBuckets)-1]++
require.False(t, h1.Equals(h2))
h2 = h1.Copy()
h2.NegativeBuckets[len(h2.NegativeBuckets)-1]++
require.False(t, h1.Equals(h2))
// Changing bucket layout.
h2 = h1.Copy()
h2.PositiveSpans[1].Offset++
require.False(t, h1.Equals(h2))
h2 = h1.Copy()
h2.NegativeSpans[1].Offset++
require.False(t, h1.Equals(h2))
// Adding an empty bucket.
h2 = h1.Copy()
h2.PositiveSpans[0].Offset--
h2.PositiveSpans[0].Length++
h2.PositiveBuckets = append([]int64{0}, h2.PositiveBuckets...)
require.False(t, h1.Equals(h2))
h2 = h1.Copy()
h2.NegativeSpans[0].Offset--
h2.NegativeSpans[0].Length++
h2.NegativeBuckets = append([]int64{0}, h2.NegativeBuckets...)
require.False(t, h1.Equals(h2))
// Adding new bucket.
h2 = h1.Copy()
h2.PositiveSpans = append(h2.PositiveSpans, Span{
Offset: 1,
Length: 1,
})
h2.PositiveBuckets = append(h2.PositiveBuckets, 1)
require.False(t, h1.Equals(h2))
h2 = h1.Copy()
h2.NegativeSpans = append(h2.NegativeSpans, Span{
Offset: 1,
Length: 1,
})
h2.NegativeBuckets = append(h2.NegativeBuckets, 1)
require.False(t, h1.Equals(h2))
}

View file

@ -475,9 +475,37 @@ func TestAmendDatapointCausesError(t *testing.T) {
require.NoError(t, app.Commit())
app = db.Appender(ctx)
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 0)
require.NoError(t, err)
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err)
require.NoError(t, app.Rollback())
h := histogram.Histogram{
Schema: 3,
Count: 61,
Sum: 2.7,
ZeroThreshold: 0.1,
ZeroCount: 42,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 10, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
}
app = db.Appender(ctx)
_, err = app.AppendHistogram(0, labels.Labels{{Name: "a", Value: "c"}}, 0, h.Copy())
require.NoError(t, err)
require.NoError(t, app.Commit())
app = db.Appender(ctx)
_, err = app.AppendHistogram(0, labels.Labels{{Name: "a", Value: "c"}}, 0, h.Copy())
require.NoError(t, err)
h.Schema = 2
_, err = app.AppendHistogram(0, labels.Labels{{Name: "a", Value: "c"}}, 0, h.Copy())
require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err)
require.NoError(t, app.Rollback())
}
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {

View file

@ -392,12 +392,12 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error {
if t < c.maxTime {
return storage.ErrOutOfOrderSample
}
// TODO(beorn7): do it for histogram.
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
//if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
// return storage.ErrDuplicateSampleForTimestamp
//}
if !h.Equals(s.sampleBuf[3].h) {
return storage.ErrDuplicateSampleForTimestamp
}
return nil
}