Fix appendable: check whether last val was a histogram (#14613)

* Fix appendable: check whether last val was a histogram

When appending a float, we were checking whether lastValue was equal to
current value, but we didn't check whether last value was a float value.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
Oleg Zaytsev 2024-08-07 15:02:59 +02:00 committed by GitHub
parent ee5bba07c0
commit 0833d2a230
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 87 additions and 3 deletions

View file

@ -16,9 +16,10 @@ package storage
import "fmt" import "fmt"
type errDuplicateSampleForTimestamp struct { type errDuplicateSampleForTimestamp struct {
timestamp int64 timestamp int64
existing float64 existing float64
newValue float64 existingIsHistogram bool
newValue float64
} }
func NewDuplicateFloatErr(t int64, existing, newValue float64) error { func NewDuplicateFloatErr(t int64, existing, newValue float64) error {
@ -29,13 +30,26 @@ func NewDuplicateFloatErr(t int64, existing, newValue float64) error {
} }
} }
// NewDuplicateHistogramToFloatErr describes an error where a new float sample is sent for same timestamp as previous histogram.
func NewDuplicateHistogramToFloatErr(t int64, newValue float64) error {
return errDuplicateSampleForTimestamp{
timestamp: t,
existingIsHistogram: true,
newValue: newValue,
}
}
func (e errDuplicateSampleForTimestamp) Error() string { func (e errDuplicateSampleForTimestamp) Error() string {
if e.timestamp == 0 { if e.timestamp == 0 {
return "duplicate sample for timestamp" return "duplicate sample for timestamp"
} }
if e.existingIsHistogram {
return fmt.Sprintf("duplicate sample for timestamp %d; overrides not allowed: existing is a histogram, new value %g", e.timestamp, e.newValue)
}
return fmt.Sprintf("duplicate sample for timestamp %d; overrides not allowed: existing %g, new value %g", e.timestamp, e.existing, e.newValue) return fmt.Sprintf("duplicate sample for timestamp %d; overrides not allowed: existing %g, new value %g", e.timestamp, e.existing, e.newValue)
} }
// Is implements the anonymous interface checked by errors.Is.
// Every errDuplicateSampleForTimestamp compares equal to the global ErrDuplicateSampleForTimestamp. // Every errDuplicateSampleForTimestamp compares equal to the global ErrDuplicateSampleForTimestamp.
func (e errDuplicateSampleForTimestamp) Is(t error) bool { func (e errDuplicateSampleForTimestamp) Is(t error) bool {
if t == ErrDuplicateSampleForTimestamp { if t == ErrDuplicateSampleForTimestamp {

38
storage/errors_test.go Normal file
View file

@ -0,0 +1,38 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestErrDuplicateSampleForTimestamp(t *testing.T) {
// All errDuplicateSampleForTimestamp are ErrDuplicateSampleForTimestamp
require.ErrorIs(t, ErrDuplicateSampleForTimestamp, errDuplicateSampleForTimestamp{})
// Same type only is if it has same properties.
err := NewDuplicateFloatErr(1_000, 10, 20)
sameErr := NewDuplicateFloatErr(1_000, 10, 20)
differentErr := NewDuplicateFloatErr(1_001, 30, 40)
require.ErrorIs(t, err, sameErr)
require.NotErrorIs(t, err, differentErr)
// Also works when err is wrapped.
require.ErrorIs(t, fmt.Errorf("failed: %w", err), sameErr)
require.NotErrorIs(t, fmt.Errorf("failed: %w", err), differentErr)
}

View file

@ -466,6 +466,9 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
// like federation and erroring out at that time would be extremely noisy. // like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample. // This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates. // The OOO headchunk has its own method to detect these duplicates.
if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil {
return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v)
}
if math.Float64bits(s.lastValue) != math.Float64bits(v) { if math.Float64bits(s.lastValue) != math.Float64bits(v) {
return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v) return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v)
} }

View file

@ -5919,6 +5919,35 @@ func TestPostingsCardinalityStats(t *testing.T) {
require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1)) require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1))
} }
func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
t.Cleanup(func() { head.Close() })
ls := labels.FromStrings(labels.MetricName, "test")
{
// Append a float 10.0 @ 1_000
app := head.Appender(context.Background())
_, err := app.Append(0, ls, 1_000, 10.0)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
{
// Append a float histogram @ 2_000
app := head.Appender(context.Background())
h := tsdbutil.GenerateTestHistogram(1)
_, err := app.AppendHistogram(0, ls, 2_000, h, nil)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
app := head.Appender(context.Background())
_, err := app.Append(0, ls, 2_000, 10.0)
require.Error(t, err)
require.ErrorIs(t, err, storage.NewDuplicateHistogramToFloatErr(2_000, 10.0))
}
func TestHeadAppender_AppendCTZeroSample(t *testing.T) { func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
type appendableSamples struct { type appendableSamples struct {
ts int64 ts int64