diff --git a/storage/errors.go b/storage/errors.go index eff70f678..dd48066db 100644 --- a/storage/errors.go +++ b/storage/errors.go @@ -16,9 +16,10 @@ package storage import "fmt" type errDuplicateSampleForTimestamp struct { - timestamp int64 - existing float64 - newValue float64 + timestamp int64 + existing float64 + existingIsHistogram bool + newValue float64 } func NewDuplicateFloatErr(t int64, existing, newValue float64) error { @@ -29,13 +30,26 @@ func NewDuplicateFloatErr(t int64, existing, newValue float64) error { } } +// NewDuplicateHistogramToFloatErr describes an error where a new float sample is sent for same timestamp as previous histogram. +func NewDuplicateHistogramToFloatErr(t int64, newValue float64) error { + return errDuplicateSampleForTimestamp{ + timestamp: t, + existingIsHistogram: true, + newValue: newValue, + } +} + func (e errDuplicateSampleForTimestamp) Error() string { if e.timestamp == 0 { return "duplicate sample for timestamp" } + if e.existingIsHistogram { + return fmt.Sprintf("duplicate sample for timestamp %d; overrides not allowed: existing is a histogram, new value %g", e.timestamp, e.newValue) + } return fmt.Sprintf("duplicate sample for timestamp %d; overrides not allowed: existing %g, new value %g", e.timestamp, e.existing, e.newValue) } +// Is implements the anonymous interface checked by errors.Is. // Every errDuplicateSampleForTimestamp compares equal to the global ErrDuplicateSampleForTimestamp. func (e errDuplicateSampleForTimestamp) Is(t error) bool { if t == ErrDuplicateSampleForTimestamp { diff --git a/storage/errors_test.go b/storage/errors_test.go new file mode 100644 index 000000000..b3e202b49 --- /dev/null +++ b/storage/errors_test.go @@ -0,0 +1,38 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestErrDuplicateSampleForTimestamp(t *testing.T) { + // All errDuplicateSampleForTimestamp are ErrDuplicateSampleForTimestamp + require.ErrorIs(t, ErrDuplicateSampleForTimestamp, errDuplicateSampleForTimestamp{}) + + // Same type only is if it has same properties. + err := NewDuplicateFloatErr(1_000, 10, 20) + sameErr := NewDuplicateFloatErr(1_000, 10, 20) + differentErr := NewDuplicateFloatErr(1_001, 30, 40) + + require.ErrorIs(t, err, sameErr) + require.NotErrorIs(t, err, differentErr) + + // Also works when err is wrapped. + require.ErrorIs(t, fmt.Errorf("failed: %w", err), sameErr) + require.NotErrorIs(t, fmt.Errorf("failed: %w", err), differentErr) +} diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 8d66d1e81..bdde0d7f8 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -466,6 +466,9 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. + if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil { + return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v) + } if math.Float64bits(s.lastValue) != math.Float64bits(v) { return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v) } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 09927c23c..fb73a3638 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5919,6 +5919,35 @@ func TestPostingsCardinalityStats(t *testing.T) { require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1)) } +func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing.T) { + head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + t.Cleanup(func() { head.Close() }) + + ls := labels.FromStrings(labels.MetricName, "test") + + { + // Append a float 10.0 @ 1_000 + app := head.Appender(context.Background()) + _, err := app.Append(0, ls, 1_000, 10.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + { + // Append a float histogram @ 2_000 + app := head.Appender(context.Background()) + h := tsdbutil.GenerateTestHistogram(1) + _, err := app.AppendHistogram(0, ls, 2_000, h, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + app := head.Appender(context.Background()) + _, err := app.Append(0, ls, 2_000, 10.0) + require.Error(t, err) + require.ErrorIs(t, err, storage.NewDuplicateHistogramToFloatErr(2_000, 10.0)) +} + func TestHeadAppender_AppendCTZeroSample(t *testing.T) { type appendableSamples struct { ts int64