From 9474610baf3a4e5aed8540813dd16c5afadf9751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Tudur=C3=AD?= Date: Wed, 28 Dec 2022 09:55:07 +0100 Subject: [PATCH] Support FloatHistogram in TSDB (#11522) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends Appender.AppendHistogram function to accept the FloatHistogram. TSDB supports appending, querying, WAL replay, for this new type of histogram. Signed-off-by: Marc Tudurí Signed-off-by: Ganesh Vernekar Co-authored-by: Ganesh Vernekar --- cmd/prometheus/main.go | 2 +- model/histogram/float_histogram.go | 31 ++ model/histogram/generic.go | 20 +- model/histogram/histogram.go | 12 - model/histogram/histogram_test.go | 52 ++- promql/engine_test.go | 15 +- scrape/helpers_test.go | 13 +- scrape/scrape.go | 8 +- storage/fanout.go | 6 +- storage/interface.go | 2 +- storage/remote/write.go | 2 +- storage/remote/write_handler.go | 3 +- storage/remote/write_handler_test.go | 13 +- storage/series.go | 11 +- tsdb/agent/db.go | 2 +- tsdb/block_test.go | 18 +- tsdb/compact_test.go | 201 +++++----- tsdb/db_test.go | 297 +++++++++----- tsdb/head.go | 49 ++- tsdb/head_append.go | 285 ++++++++++++-- tsdb/head_test.go | 565 ++++++++++++++++----------- tsdb/head_wal.go | 98 ++++- tsdb/querier.go | 42 +- tsdb/record/record.go | 148 ++++++- tsdb/record/record_test.go | 12 + tsdb/tsdbblockutil.go | 5 +- 26 files changed, 1329 insertions(+), 583 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4a5a70cadd..066b319e30 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1393,7 +1393,7 @@ func (n notReadyAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, return 0, tsdb.ErrNotReady } -func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { +func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { return 0, tsdb.ErrNotReady } diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index d75afd10ed..84d8a29129 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -244,6 +244,37 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram { return h } +// Equals returns true if the given float histogram matches exactly. +// Exact match is when there are no new buckets (even empty) and no missing buckets, +// and all the bucket values match. Spans can have different empty length spans in between, +// but they must represent the same bucket layout to match. +func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool { + if h2 == nil { + return false + } + + if h.Schema != h2.Schema || h.ZeroThreshold != h2.ZeroThreshold || + h.ZeroCount != h2.ZeroCount || h.Count != h2.Count || h.Sum != h2.Sum { + return false + } + + if !spansMatch(h.PositiveSpans, h2.PositiveSpans) { + return false + } + if !spansMatch(h.NegativeSpans, h2.NegativeSpans) { + return false + } + + if !bucketsMatch(h.PositiveBuckets, h2.PositiveBuckets) { + return false + } + if !bucketsMatch(h.NegativeBuckets, h2.NegativeBuckets) { + return false + } + + return true +} + // addBucket takes the "coordinates" of the last bucket that was handled and // adds the provided bucket after it. If a corresponding bucket exists, the // count is added. If not, the bucket is inserted. The updated slices and the diff --git a/model/histogram/generic.go b/model/histogram/generic.go index c62be0b08c..e1de5ffb52 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -25,14 +25,14 @@ type BucketCount interface { float64 | uint64 } -// internalBucketCount is used internally by Histogram and FloatHistogram. The +// InternalBucketCount is used internally by Histogram and FloatHistogram. The // difference to the BucketCount above is that Histogram internally uses deltas // between buckets rather than absolute counts (while FloatHistogram uses // absolute counts directly). Go type parameters don't allow type // specialization. Therefore, where special treatment of deltas between buckets // vs. absolute counts is important, this information has to be provided as a // separate boolean parameter "deltaBuckets" -type internalBucketCount interface { +type InternalBucketCount interface { float64 | int64 } @@ -86,7 +86,7 @@ type BucketIterator[BC BucketCount] interface { // implementations, together with an implementation of the At method. This // iterator can be embedded in full implementations of BucketIterator to save on // code replication. -type baseBucketIterator[BC BucketCount, IBC internalBucketCount] struct { +type baseBucketIterator[BC BucketCount, IBC InternalBucketCount] struct { schema int32 spans []Span buckets []IBC @@ -121,7 +121,7 @@ func (b baseBucketIterator[BC, IBC]) At() Bucket[BC] { // compactBuckets is a generic function used by both Histogram.Compact and // FloatHistogram.Compact. Set deltaBuckets to true if the provided buckets are // deltas. Set it to false if the buckets contain absolute counts. -func compactBuckets[IBC internalBucketCount](buckets []IBC, spans []Span, maxEmptyBuckets int, deltaBuckets bool) ([]IBC, []Span) { +func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmptyBuckets int, deltaBuckets bool) ([]IBC, []Span) { // Fast path: If there are no empty buckets AND no offset in any span is // <= maxEmptyBuckets AND no span has length 0, there is nothing to do and we can return // immediately. We check that first because it's cheap and presumably @@ -327,6 +327,18 @@ func compactBuckets[IBC internalBucketCount](buckets []IBC, spans []Span, maxEmp return buckets, spans } +func bucketsMatch[IBC InternalBucketCount](b1, b2 []IBC) bool { + if len(b1) != len(b2) { + return false + } + for i, b := range b1 { + if b != b2[i] { + return false + } + } + return true +} + func getBound(idx, schema int32) float64 { // Here a bit of context about the behavior for the last bucket counting // regular numbers (called simply "last bucket" below) and the bucket diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 934c4dde9c..4f63cc17d7 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -250,18 +250,6 @@ func allEmptySpans(s []Span) bool { return true } -func bucketsMatch(b1, b2 []int64) bool { - if len(b1) != len(b2) { - return false - } - for i, b := range b1 { - if b != b2[i] { - return false - } - } - return true -} - // Compact works like FloatHistogram.Compact. See there for detailed // explanations. func (h *Histogram) Compact(maxEmptyBuckets int) *Histogram { diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index f3b3c5dd0e..3975353d2a 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -411,6 +411,7 @@ func TestHistogramToFloat(t *testing.T) { require.Equal(t, h.String(), fh.String()) } +// TestHistogramMatches tests both Histogram and FloatHistogram. func TestHistogramMatches(t *testing.T) { h1 := Histogram{ Schema: 3, @@ -430,14 +431,28 @@ func TestHistogramMatches(t *testing.T) { NegativeBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, } + equals := func(h1, h2 Histogram) { + require.True(t, h1.Equals(&h2)) + require.True(t, h2.Equals(&h1)) + h1f, h2f := h1.ToFloat(), h2.ToFloat() + require.True(t, h1f.Equals(h2f)) + require.True(t, h2f.Equals(h1f)) + } + notEquals := func(h1, h2 Histogram) { + require.False(t, h1.Equals(&h2)) + require.False(t, h2.Equals(&h1)) + h1f, h2f := h1.ToFloat(), h2.ToFloat() + require.False(t, h1f.Equals(h2f)) + require.False(t, h2f.Equals(h1f)) + } + h2 := h1.Copy() - require.True(t, h1.Equals(h2)) + equals(h1, *h2) // Changed spans but same layout. h2.PositiveSpans = append(h2.PositiveSpans, Span{Offset: 5}) h2.NegativeSpans = append(h2.NegativeSpans, Span{Offset: 2}) - require.True(t, h1.Equals(h2)) - require.True(t, h2.Equals(&h1)) + equals(h1, *h2) // Adding empty spans in between. h2.PositiveSpans[1].Offset = 6 h2.PositiveSpans = []Span{ @@ -455,58 +470,57 @@ func TestHistogramMatches(t *testing.T) { h2.NegativeSpans[1], h2.NegativeSpans[2], } - require.True(t, h1.Equals(h2)) - require.True(t, h2.Equals(&h1)) + equals(h1, *h2) // All mismatches. - require.False(t, h1.Equals(nil)) + notEquals(h1, Histogram{}) h2.Schema = 1 - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) h2 = h1.Copy() h2.Count++ - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) h2 = h1.Copy() h2.Sum++ - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) h2 = h1.Copy() h2.ZeroThreshold++ - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) h2 = h1.Copy() h2.ZeroCount++ - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) // Changing value of buckets. h2 = h1.Copy() h2.PositiveBuckets[len(h2.PositiveBuckets)-1]++ - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) h2 = h1.Copy() h2.NegativeBuckets[len(h2.NegativeBuckets)-1]++ - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) // Changing bucket layout. h2 = h1.Copy() h2.PositiveSpans[1].Offset++ - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) h2 = h1.Copy() h2.NegativeSpans[1].Offset++ - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) // Adding an empty bucket. h2 = h1.Copy() h2.PositiveSpans[0].Offset-- h2.PositiveSpans[0].Length++ h2.PositiveBuckets = append([]int64{0}, h2.PositiveBuckets...) - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) h2 = h1.Copy() h2.NegativeSpans[0].Offset-- h2.NegativeSpans[0].Length++ h2.NegativeBuckets = append([]int64{0}, h2.NegativeBuckets...) - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) // Adding new bucket. h2 = h1.Copy() @@ -515,14 +529,14 @@ func TestHistogramMatches(t *testing.T) { Length: 1, }) h2.PositiveBuckets = append(h2.PositiveBuckets, 1) - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) h2 = h1.Copy() h2.NegativeSpans = append(h2.NegativeSpans, Span{ Offset: 1, Length: 1, }) h2.NegativeBuckets = append(h2.NegativeBuckets, 1) - require.False(t, h1.Equals(h2)) + notEquals(h1, *h2) } func TestHistogramCompact(t *testing.T) { diff --git a/promql/engine_test.go b/promql/engine_test.go index 5094df2a29..1950b2cbb2 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -3128,6 +3128,7 @@ func TestRangeQuery(t *testing.T) { func TestSparseHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. + // TODO(marctc): Add similar test for float histograms test, err := NewTest(t, "") require.NoError(t, err) defer test.Close() @@ -3137,7 +3138,7 @@ func TestSparseHistogramRate(t *testing.T) { app := test.Storage().Appender(context.TODO()) for i, h := range tsdb.GenerateTestHistograms(100) { - _, err := app.AppendHistogram(0, lbls, int64(i)*int64(15*time.Second/time.Millisecond), h) + _, err := app.AppendHistogram(0, lbls, int64(i)*int64(15*time.Second/time.Millisecond), h, nil) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -3169,6 +3170,7 @@ func TestSparseHistogramRate(t *testing.T) { func TestSparseHistogram_HistogramCountAndSum(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. + // TODO(marctc): Add similar test for float histograms h := &histogram.Histogram{ Count: 24, ZeroCount: 4, @@ -3197,7 +3199,7 @@ func TestSparseHistogram_HistogramCountAndSum(t *testing.T) { ts := int64(10 * time.Minute / time.Millisecond) app := test.Storage().Appender(context.TODO()) - _, err = app.AppendHistogram(0, lbls, ts, h) + _, err = app.AppendHistogram(0, lbls, ts, h, nil) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -3233,6 +3235,7 @@ func TestSparseHistogram_HistogramCountAndSum(t *testing.T) { func TestSparseHistogram_HistogramQuantile(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. + // TODO(marctc): Add similar test for float histograms type subCase struct { quantile string value float64 @@ -3434,7 +3437,7 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) { ts := int64(i+1) * int64(10*time.Minute/time.Millisecond) app := test.Storage().Appender(context.TODO()) - _, err = app.AppendHistogram(0, lbls, ts, c.h) + _, err = app.AppendHistogram(0, lbls, ts, c.h, nil) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -3462,6 +3465,7 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) { func TestSparseHistogram_HistogramFraction(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. + // TODO(marctc): Add similar test for float histograms type subCase struct { lower, upper string value float64 @@ -3858,7 +3862,7 @@ func TestSparseHistogram_HistogramFraction(t *testing.T) { ts := int64(i+1) * int64(10*time.Minute/time.Millisecond) app := test.Storage().Appender(context.TODO()) - _, err = app.AppendHistogram(0, lbls, ts, c.h) + _, err = app.AppendHistogram(0, lbls, ts, c.h, nil) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -3890,6 +3894,7 @@ func TestSparseHistogram_HistogramFraction(t *testing.T) { func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. + // TODO(marctc): Add similar test for float histograms cases := []struct { histograms []histogram.Histogram expected histogram.FloatHistogram @@ -3988,7 +3993,7 @@ func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) { for idx, h := range c.histograms { lbls := labels.FromStrings("__name__", seriesName, "idx", fmt.Sprintf("%d", idx)) // Since we mutate h later, we need to create a copy here. - _, err = app.AppendHistogram(0, lbls, ts, h.Copy()) + _, err = app.AppendHistogram(0, lbls, ts, h.Copy(), nil) require.NoError(t, err) } require.NoError(t, app.Commit()) diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 8c7c8cab14..4862886e5e 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -42,7 +42,7 @@ func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.E return 0, nil } -func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram) (storage.SeriesRef, error) { +func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) { return 0, nil } @@ -60,8 +60,9 @@ type sample struct { } type histogramSample struct { - t int64 - h *histogram.Histogram + t int64 + h *histogram.Histogram + fh *histogram.FloatHistogram } // collectResultAppender records all samples that were added through the appender. @@ -110,13 +111,13 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L return a.next.AppendExemplar(ref, l, e) } -func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { - a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, t: t}) +func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t}) if a.next == nil { return 0, nil } - return a.next.AppendHistogram(ref, l, t, h) + return a.next.AppendHistogram(ref, l, t, h, fh) } func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { diff --git a/scrape/scrape.go b/scrape/scrape.go index 10e9119a26..d3e10e6d2d 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1541,6 +1541,7 @@ loop: parsedTimestamp *int64 val float64 h *histogram.Histogram + fh *histogram.FloatHistogram ) if et, err = p.Next(); err != nil { if err == io.EOF { @@ -1568,8 +1569,7 @@ loop: t := defTime if isHistogram { - met, parsedTimestamp, h, _ = p.Histogram() - // TODO: ingest float histograms in tsdb. + met, parsedTimestamp, h, fh = p.Histogram() } else { met, parsedTimestamp, val = p.Series() } @@ -1636,7 +1636,9 @@ loop: if isHistogram { if h != nil { - ref, err = app.AppendHistogram(ref, lset, t, h) + ref, err = app.AppendHistogram(ref, lset, t, h, nil) + } else { + ref, err = app.AppendHistogram(ref, lset, t, nil, fh) } } else { ref, err = app.Append(ref, lset, t, val) diff --git a/storage/fanout.go b/storage/fanout.go index cdaf8194c6..4f995afbac 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -174,14 +174,14 @@ func (f *fanoutAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exempl return ref, nil } -func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (SeriesRef, error) { - ref, err := f.primary.AppendHistogram(ref, l, t, h) +func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) { + ref, err := f.primary.AppendHistogram(ref, l, t, h, fh) if err != nil { return ref, err } for _, appender := range f.secondaries { - if _, err := appender.AppendHistogram(ref, l, t, h); err != nil { + if _, err := appender.AppendHistogram(ref, l, t, h, fh); err != nil { return 0, err } } diff --git a/storage/interface.go b/storage/interface.go index 3e8dd1086b..5cf70a351b 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -282,7 +282,7 @@ type HistogramAppender interface { // For efficiency reasons, the histogram is passed as a // pointer. AppendHistogram won't mutate the histogram, but in turn // depends on the caller to not mutate it either. - AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (SeriesRef, error) + AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) } // MetadataUpdater provides an interface for associating metadata to stored series. diff --git a/storage/remote/write.go b/storage/remote/write.go index 2e38fb8e64..6a33c0adf9 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -278,7 +278,7 @@ func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, return 0, nil } -func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, ts int64, h *histogram.Histogram) (storage.SeriesRef, error) { +func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, ts int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { t.histograms++ if ts > t.highestTimestamp { t.highestTimestamp = ts diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 7a2a9951cf..4a8beeedcf 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -124,9 +124,10 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err } } + // TODO(codesome): support float histograms. for _, hp := range ts.Histograms { hs := HistogramProtoToHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) if err != nil { unwrappedErr := errors.Unwrap(err) if unwrappedErr == nil { diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index ec77d5df41..731e0e3e5e 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -67,7 +67,7 @@ func TestRemoteWriteHandler(t *testing.T) { for _, hp := range ts.Histograms { h := HistogramProtoToHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, h}, appendable.histograms[k]) + require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) k++ } } @@ -189,9 +189,10 @@ type mockExemplar struct { } type mockHistogram struct { - l labels.Labels - t int64 - h *histogram.Histogram + l labels.Labels + t int64 + h *histogram.Histogram + fh *histogram.FloatHistogram } func (m *mockAppendable) Appender(_ context.Context) storage.Appender { @@ -226,13 +227,13 @@ func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e return 0, nil } -func (m *mockAppendable) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { +func (m *mockAppendable) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if t < m.latestHistogram { return 0, storage.ErrOutOfOrderSample } m.latestHistogram = t - m.histograms = append(m.histograms, mockHistogram{l, t, h}) + m.histograms = append(m.histograms, mockHistogram{l, t, h, fh}) return 0, nil } diff --git a/storage/series.go b/storage/series.go index 81f7b7baf7..dcb6dd82e6 100644 --- a/storage/series.go +++ b/storage/series.go @@ -321,9 +321,10 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { lastType = typ var ( - t int64 - v float64 - h *histogram.Histogram + t int64 + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram ) switch typ { case chunkenc.ValFloat: @@ -332,6 +333,9 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { case chunkenc.ValHistogram: t, h = seriesIter.AtHistogram() app.AppendHistogram(t, h) + case chunkenc.ValFloatHistogram: + t, fh = seriesIter.AtFloatHistogram() + app.AppendFloatHistogram(t, fh) default: return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())} } @@ -397,7 +401,6 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, v float64, case chunkenc.ValFloatHistogram: t, fh := iter.AtFloatHistogram() result = append(result, newSampleFn(t, 0, nil, fh)) - } } } diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 7e959573bd..656725fe3d 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -820,7 +820,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem return storage.SeriesRef(s.ref), nil } -func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { +func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { // TODO: Add histogram support. return 0, nil } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 380fa68bbb..fdbc128f4a 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -528,7 +528,10 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str ref, err = app.Append(ref, lset, t, v) case chunkenc.ValHistogram: t, h := it.AtHistogram() - ref, err = app.AppendHistogram(ref, lset, t, h) + ref, err = app.AppendHistogram(ref, lset, t, h, nil) + case chunkenc.ValFloatHistogram: + t, fh := it.AtFloatHistogram() + ref, err = app.AppendHistogram(ref, lset, t, nil, fh) default: err = fmt.Errorf("unknown sample type %s", typ.String()) } @@ -615,7 +618,7 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []storage.Series { } // genHistogramSeries generates series of histogram samples with a given number of labels and values. -func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64) []storage.Series { +func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64, floatHistogram bool) []storage.Series { return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample { h := &histogram.Histogram{ Count: 5 + uint64(ts*4), @@ -629,12 +632,15 @@ func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64) []s }, PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, } + if floatHistogram { + return sample{t: ts, fh: h.ToFloat()} + } return sample{t: ts, h: h} }) } // genHistogramAndFloatSeries generates series of mixed histogram and float64 samples with a given number of labels and values. -func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step int64) []storage.Series { +func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step int64, floatHistogram bool) []storage.Series { floatSample := false count := 0 return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample { @@ -655,7 +661,11 @@ func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step in }, PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, } - s = sample{t: ts, h: h} + if floatHistogram { + s = sample{t: ts, fh: h.ToFloat()} + } else { + s = sample{t: ts, h: h} + } } if count%5 == 0 { diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index e7fe998ac3..58238bea51 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1298,105 +1298,114 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { } func TestHeadCompactionWithHistograms(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, false, false) - require.NoError(t, head.Init(0)) - t.Cleanup(func() { - require.NoError(t, head.Close()) - }) + for _, floatTest := range []bool{true, false} { + t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) { + head, _ := newTestHead(t, DefaultBlockDuration, false, false) + require.NoError(t, head.Init(0)) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) - minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } - ctx := context.Background() - appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { - t.Helper() - app := head.Appender(ctx) - for tsMinute := from; tsMinute <= to; tsMinute++ { - _, err := app.AppendHistogram(0, lbls, minute(tsMinute), h) + minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } + ctx := context.Background() + appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { + t.Helper() + app := head.Appender(ctx) + for tsMinute := from; tsMinute <= to; tsMinute++ { + var err error + if floatTest { + _, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat()) + *exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()}) + } else { + _, err = app.AppendHistogram(0, lbls, minute(tsMinute), h, nil) + *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) + } + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + appendFloat := func(lbls labels.Labels, from, to int, exp *[]tsdbutil.Sample) { + t.Helper() + app := head.Appender(ctx) + for tsMinute := from; tsMinute <= to; tsMinute++ { + _, err := app.Append(0, lbls, minute(tsMinute), float64(tsMinute)) + require.NoError(t, err) + *exp = append(*exp, sample{t: minute(tsMinute), v: float64(tsMinute)}) + } + require.NoError(t, app.Commit()) + } + + var ( + series1 = labels.FromStrings("foo", "bar1") + series2 = labels.FromStrings("foo", "bar2") + series3 = labels.FromStrings("foo", "bar3") + series4 = labels.FromStrings("foo", "bar4") + exp1, exp2, exp3, exp4 []tsdbutil.Sample + ) + h := &histogram.Histogram{ + Count: 11, + ZeroCount: 4, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []int64{1, 2, -1}, + } + + // Series with only histograms. + appendHistogram(series1, 100, 105, h, &exp1) + + // Series starting with float and then getting histograms. + appendFloat(series2, 100, 102, &exp2) + appendHistogram(series2, 103, 105, h.Copy(), &exp2) + appendFloat(series2, 106, 107, &exp2) + appendHistogram(series2, 108, 109, h.Copy(), &exp2) + + // Series starting with histogram and then getting float. + appendHistogram(series3, 101, 103, h.Copy(), &exp3) + appendFloat(series3, 104, 106, &exp3) + appendHistogram(series3, 107, 108, h.Copy(), &exp3) + appendFloat(series3, 109, 110, &exp3) + + // A float only series. + appendFloat(series4, 100, 102, &exp4) + + // Compaction. + mint := head.MinTime() + maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) - *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) - } - - require.NoError(t, app.Commit()) - } - appendFloat := func(lbls labels.Labels, from, to int, exp *[]tsdbutil.Sample) { - t.Helper() - app := head.Appender(ctx) - for tsMinute := from; tsMinute <= to; tsMinute++ { - _, err := app.Append(0, lbls, minute(tsMinute), float64(tsMinute)) + id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) require.NoError(t, err) - *exp = append(*exp, sample{t: minute(tsMinute), v: float64(tsMinute)}) - } - require.NoError(t, app.Commit()) + require.NotEqual(t, ulid.ULID{}, id) + + // Open the block and query it and check the histograms. + block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, block.Close()) + }) + + q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime()) + require.NoError(t, err) + + actHists := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + require.Equal(t, map[string][]tsdbutil.Sample{ + series1.String(): exp1, + series2.String(): exp2, + series3.String(): exp3, + series4.String(): exp4, + }, actHists) + }) } - - var ( - series1 = labels.FromStrings("foo", "bar1") - series2 = labels.FromStrings("foo", "bar2") - series3 = labels.FromStrings("foo", "bar3") - series4 = labels.FromStrings("foo", "bar4") - exp1, exp2, exp3, exp4 []tsdbutil.Sample - ) - h := &histogram.Histogram{ - Count: 11, - ZeroCount: 4, - ZeroThreshold: 0.001, - Sum: 35.5, - Schema: 1, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 2}, - {Offset: 2, Length: 2}, - }, - PositiveBuckets: []int64{1, 1, -1, 0}, - NegativeSpans: []histogram.Span{ - {Offset: 0, Length: 1}, - {Offset: 1, Length: 2}, - }, - NegativeBuckets: []int64{1, 2, -1}, - } - - // Series with only histograms. - appendHistogram(series1, 100, 105, h, &exp1) - - // Series starting with float and then getting histograms. - appendFloat(series2, 100, 102, &exp2) - appendHistogram(series2, 103, 105, h.Copy(), &exp2) - appendFloat(series2, 106, 107, &exp2) - appendHistogram(series2, 108, 109, h.Copy(), &exp2) - - // Series starting with histogram and then getting float. - appendHistogram(series3, 101, 103, h.Copy(), &exp3) - appendFloat(series3, 104, 106, &exp3) - appendHistogram(series3, 107, 108, h.Copy(), &exp3) - appendFloat(series3, 109, 110, &exp3) - - // A float only series. - appendFloat(series4, 100, 102, &exp4) - - // Compaction. - mint := head.MinTime() - maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) - require.NoError(t, err) - id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) - require.NoError(t, err) - require.NotEqual(t, ulid.ULID{}, id) - - // Open the block and query it and check the histograms. - block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, block.Close()) - }) - - q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime()) - require.NoError(t, err) - - actHists := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, map[string][]tsdbutil.Sample{ - series1.String(): exp1, - series2.String(): exp2, - series3.String(): exp3, - series4.String(): exp4, - }, actHists) } // Depending on numSeriesPerSchema, it can take few gigs of memory; @@ -1511,7 +1520,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { ) for i := 0; i < numHistograms; i++ { ts := int64(i) * timeStep - ref, err = sparseApp.AppendHistogram(ref, ah.baseLabels, ts, ah.hists[i]) + ref, err = sparseApp.AppendHistogram(ref, ah.baseLabels, ts, ah.hists[i], nil) require.NoError(t, err) } } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index aeb5fa4bee..4a550a5fd7 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -108,6 +108,9 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str case chunkenc.ValHistogram: ts, h := it.AtHistogram() samples = append(samples, sample{t: ts, h: h}) + case chunkenc.ValFloatHistogram: + ts, fh := it.AtFloatHistogram() + samples = append(samples, sample{t: ts, fh: fh}) default: t.Fatalf("unknown sample type in query %s", typ.String()) } @@ -465,7 +468,7 @@ Outer: } } -func TestAmendDatapointCausesError(t *testing.T) { +func TestAmendHistogramDatapointCausesError(t *testing.T) { db := openTestDB(t, nil, nil) defer func() { require.NoError(t, db.Close()) @@ -496,17 +499,32 @@ func TestAmendDatapointCausesError(t *testing.T) { }, PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, } + fh := h.ToFloat() app = db.Appender(ctx) - _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy()) + _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy(), nil) require.NoError(t, err) require.NoError(t, app.Commit()) app = db.Appender(ctx) - _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy()) + _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy(), nil) require.NoError(t, err) h.Schema = 2 - _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy()) + _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy(), nil) + require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err) + require.NoError(t, app.Rollback()) + + // Float histogram. + app = db.Appender(ctx) + _, err = app.AppendHistogram(0, labels.FromStrings("a", "d"), 0, nil, fh.Copy()) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + app = db.Appender(ctx) + _, err = app.AppendHistogram(0, labels.FromStrings("a", "d"), 0, nil, fh.Copy()) + require.NoError(t, err) + fh.Schema = 2 + _, err = app.AppendHistogram(0, labels.FromStrings("a", "d"), 0, nil, fh.Copy()) require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err) require.NoError(t, app.Rollback()) } @@ -5805,6 +5823,16 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { } func TestHistogramAppendAndQuery(t *testing.T) { + t.Run("integer histograms", func(t *testing.T) { + testHistogramAppendAndQueryHelper(t, false) + }) + t.Run("float histograms", func(t *testing.T) { + testHistogramAppendAndQueryHelper(t, true) + }) +} + +func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { + t.Helper() db := openTestDB(t, nil, nil) minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } t.Cleanup(func() { @@ -5814,11 +5842,17 @@ func TestHistogramAppendAndQuery(t *testing.T) { ctx := context.Background() appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { t.Helper() + var err error app := db.Appender(ctx) - _, err := app.AppendHistogram(0, lbls, minute(tsMinute), h) + if floatHistogram { + _, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat()) + *exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()}) + } else { + _, err = app.AppendHistogram(0, lbls, minute(tsMinute), h.Copy(), nil) + *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) + } require.NoError(t, err) require.NoError(t, app.Commit()) - *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) } appendFloat := func(lbls labels.Labels, tsMinute int, val float64, exp *[]tsdbutil.Sample) { t.Helper() @@ -5867,23 +5901,23 @@ func TestHistogramAppendAndQuery(t *testing.T) { t.Run("series with only histograms", func(t *testing.T) { h := baseH.Copy() // This is shared across all sub tests. - appendHistogram(series1, 100, h.Copy(), &exp1) + appendHistogram(series1, 100, h, &exp1) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) h.PositiveBuckets[0]++ h.NegativeBuckets[0] += 2 h.Count += 10 - appendHistogram(series1, 101, h.Copy(), &exp1) + appendHistogram(series1, 101, h, &exp1) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) t.Run("changing schema", func(t *testing.T) { h.Schema = 2 - appendHistogram(series1, 102, h.Copy(), &exp1) + appendHistogram(series1, 102, h, &exp1) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // Schema back to old. h.Schema = 1 - appendHistogram(series1, 103, h.Copy(), &exp1) + appendHistogram(series1, 103, h, &exp1) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) }) @@ -5894,10 +5928,17 @@ func TestHistogramAppendAndQuery(t *testing.T) { // because the chunk will be re-encoded. So this forces us to modify // the last histogram in exp1 so when we query we get the expected // results. - lh := exp1[len(exp1)-1].H().Copy() - lh.PositiveSpans[1].Length++ - lh.PositiveBuckets = append(lh.PositiveBuckets, -2) // -2 makes the last bucket 0. - exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh} + if floatHistogram { + lh := exp1[len(exp1)-1].FH().Copy() + lh.PositiveSpans[1].Length++ + lh.PositiveBuckets = append(lh.PositiveBuckets, 0) + exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), fh: lh} + } else { + lh := exp1[len(exp1)-1].H().Copy() + lh.PositiveSpans[1].Length++ + lh.PositiveBuckets = append(lh.PositiveBuckets, -2) // -2 makes the last bucket 0. + exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh} + } // This histogram with new bucket at the end causes the re-encoding of the previous histogram. // Hence the previous histogram is recoded into this new layout. @@ -5905,23 +5946,37 @@ func TestHistogramAppendAndQuery(t *testing.T) { h.PositiveSpans[1].Length++ h.PositiveBuckets = append(h.PositiveBuckets, 1) h.Count += 3 - appendHistogram(series1, 104, h.Copy(), &exp1) + appendHistogram(series1, 104, h, &exp1) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // Because of the previous two histograms being on the active chunk, // and the next append is only adding a new bucket, the active chunk // will be re-encoded to the new layout. - lh = exp1[len(exp1)-2].H().Copy() - lh.PositiveSpans[0].Length++ - lh.PositiveSpans[1].Offset-- - lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} - exp1[len(exp1)-2] = sample{t: exp1[len(exp1)-2].T(), h: lh} + if floatHistogram { + lh := exp1[len(exp1)-2].FH().Copy() + lh.PositiveSpans[0].Length++ + lh.PositiveSpans[1].Offset-- + lh.PositiveBuckets = []float64{2, 3, 0, 2, 2, 0} + exp1[len(exp1)-2] = sample{t: exp1[len(exp1)-2].T(), fh: lh} - lh = exp1[len(exp1)-1].H().Copy() - lh.PositiveSpans[0].Length++ - lh.PositiveSpans[1].Offset-- - lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} - exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh} + lh = exp1[len(exp1)-1].FH().Copy() + lh.PositiveSpans[0].Length++ + lh.PositiveSpans[1].Offset-- + lh.PositiveBuckets = []float64{2, 3, 0, 2, 2, 3} + exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), fh: lh} + } else { + lh := exp1[len(exp1)-2].H().Copy() + lh.PositiveSpans[0].Length++ + lh.PositiveSpans[1].Offset-- + lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} + exp1[len(exp1)-2] = sample{t: exp1[len(exp1)-2].T(), h: lh} + + lh = exp1[len(exp1)-1].H().Copy() + lh.PositiveSpans[0].Length++ + lh.PositiveSpans[1].Offset-- + lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} + exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh} + } // Now we add the new buckets in between. Empty bucket is again not present for the old histogram. h.PositiveSpans[0].Length++ @@ -5929,26 +5984,39 @@ func TestHistogramAppendAndQuery(t *testing.T) { h.Count += 3 // {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1} h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...) - appendHistogram(series1, 105, h.Copy(), &exp1) + appendHistogram(series1, 105, h, &exp1) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // We add 4 more histograms to clear out the buffer and see the re-encoded histograms. - appendHistogram(series1, 106, h.Copy(), &exp1) - appendHistogram(series1, 107, h.Copy(), &exp1) - appendHistogram(series1, 108, h.Copy(), &exp1) - appendHistogram(series1, 109, h.Copy(), &exp1) + appendHistogram(series1, 106, h, &exp1) + appendHistogram(series1, 107, h, &exp1) + appendHistogram(series1, 108, h, &exp1) + appendHistogram(series1, 109, h, &exp1) // Update the expected histograms to reflect the re-encoding. - l := len(exp1) - h7 := exp1[l-7].H() - h7.PositiveSpans = exp1[l-1].H().PositiveSpans - h7.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} // -3 and -2 are the empty buckets. - exp1[l-7] = sample{t: exp1[l-7].T(), h: h7} + if floatHistogram { + l := len(exp1) + h7 := exp1[l-7].FH() + h7.PositiveSpans = exp1[l-1].FH().PositiveSpans + h7.PositiveBuckets = []float64{2, 3, 0, 2, 2, 0} + exp1[l-7] = sample{t: exp1[l-7].T(), fh: h7} - h6 := exp1[l-6].H() - h6.PositiveSpans = exp1[l-1].H().PositiveSpans - h6.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} // -3 is the empty bucket. - exp1[l-6] = sample{t: exp1[l-6].T(), h: h6} + h6 := exp1[l-6].FH() + h6.PositiveSpans = exp1[l-1].FH().PositiveSpans + h6.PositiveBuckets = []float64{2, 3, 0, 2, 2, 3} + exp1[l-6] = sample{t: exp1[l-6].T(), fh: h6} + } else { + l := len(exp1) + h7 := exp1[l-7].H() + h7.PositiveSpans = exp1[l-1].H().PositiveSpans + h7.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} // -3 and -2 are the empty buckets. + exp1[l-7] = sample{t: exp1[l-7].T(), h: h7} + + h6 := exp1[l-6].H() + h6.PositiveSpans = exp1[l-1].H().PositiveSpans + h6.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} // -3 is the empty bucket. + exp1[l-6] = sample{t: exp1[l-6].T(), h: h6} + } testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) }) @@ -5956,7 +6024,7 @@ func TestHistogramAppendAndQuery(t *testing.T) { t.Run("buckets disappearing", func(t *testing.T) { h.PositiveSpans[1].Length-- h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1] - appendHistogram(series1, 110, h.Copy(), &exp1) + appendHistogram(series1, 110, h, &exp1) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) }) }) @@ -5968,9 +6036,9 @@ func TestHistogramAppendAndQuery(t *testing.T) { testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) h := baseH.Copy() - appendHistogram(series2, 103, h.Copy(), &exp2) - appendHistogram(series2, 104, h.Copy(), &exp2) - appendHistogram(series2, 105, h.Copy(), &exp2) + appendHistogram(series2, 103, h, &exp2) + appendHistogram(series2, 104, h, &exp2) + appendHistogram(series2, 105, h, &exp2) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) // Switching between float and histograms again. @@ -5978,16 +6046,16 @@ func TestHistogramAppendAndQuery(t *testing.T) { appendFloat(series2, 107, 107, &exp2) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) - appendHistogram(series2, 108, h.Copy(), &exp2) - appendHistogram(series2, 109, h.Copy(), &exp2) + appendHistogram(series2, 108, h, &exp2) + appendHistogram(series2, 109, h, &exp2) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) }) t.Run("series starting with histogram and then getting float", func(t *testing.T) { h := baseH.Copy() - appendHistogram(series3, 101, h.Copy(), &exp3) - appendHistogram(series3, 102, h.Copy(), &exp3) - appendHistogram(series3, 103, h.Copy(), &exp3) + appendHistogram(series3, 101, h, &exp3) + appendHistogram(series3, 102, h, &exp3) + appendHistogram(series3, 103, h, &exp3) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) appendFloat(series3, 104, 100, &exp3) @@ -5996,8 +6064,8 @@ func TestHistogramAppendAndQuery(t *testing.T) { testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) // Switching between histogram and float again. - appendHistogram(series3, 107, h.Copy(), &exp3) - appendHistogram(series3, 108, h.Copy(), &exp3) + appendHistogram(series3, 107, h, &exp3) + appendHistogram(series3, 108, h, &exp3) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) appendFloat(series3, 109, 106, &exp3) @@ -6052,6 +6120,11 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { case chunkenc.ValHistogram: ts, h := it.AtHistogram() slice = append(slice, sample{t: ts, h: h}) + case chunkenc.ValFloatHistogram: + ts, h := it.AtFloatHistogram() + slice = append(slice, sample{t: ts, fh: h}) + default: + t.Fatalf("unexpected sample value type %d", typ) } } sort.Slice(slice, func(i, j int) bool { @@ -6088,63 +6161,67 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { require.Equal(t, exp, res) } - t.Run("serial blocks with only histograms", func(t *testing.T) { - testBlockQuerying(t, - genHistogramSeries(10, 5, minute(0), minute(119), minute(1)), - genHistogramSeries(10, 5, minute(120), minute(239), minute(1)), - genHistogramSeries(10, 5, minute(240), minute(359), minute(1)), - ) - }) + for _, floatHistogram := range []bool{true} { + t.Run(fmt.Sprintf("floatHistogram=%t", floatHistogram), func(t *testing.T) { + t.Run("serial blocks with only histograms", func(t *testing.T) { + testBlockQuerying(t, + genHistogramSeries(10, 5, minute(0), minute(119), minute(1), floatHistogram), + genHistogramSeries(10, 5, minute(120), minute(239), minute(1), floatHistogram), + genHistogramSeries(10, 5, minute(240), minute(359), minute(1), floatHistogram), + ) + }) - t.Run("serial blocks with either histograms or floats in a block and not both", func(t *testing.T) { - testBlockQuerying(t, - genHistogramSeries(10, 5, minute(0), minute(119), minute(1)), - genSeriesFromSampleGenerator(10, 5, minute(120), minute(239), minute(1), func(ts int64) tsdbutil.Sample { - return sample{t: ts, v: rand.Float64()} - }), - genHistogramSeries(10, 5, minute(240), minute(359), minute(1)), - ) - }) + t.Run("serial blocks with either histograms or floats in a block and not both", func(t *testing.T) { + testBlockQuerying(t, + genHistogramSeries(10, 5, minute(0), minute(119), minute(1), floatHistogram), + genSeriesFromSampleGenerator(10, 5, minute(120), minute(239), minute(1), func(ts int64) tsdbutil.Sample { + return sample{t: ts, v: rand.Float64()} + }), + genHistogramSeries(10, 5, minute(240), minute(359), minute(1), floatHistogram), + ) + }) - t.Run("serial blocks with mix of histograms and float64", func(t *testing.T) { - testBlockQuerying(t, - genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(1)), - genHistogramSeries(10, 5, minute(61), minute(120), minute(1)), - genHistogramAndFloatSeries(10, 5, minute(121), minute(180), minute(1)), - genSeriesFromSampleGenerator(10, 5, minute(181), minute(240), minute(1), func(ts int64) tsdbutil.Sample { - return sample{t: ts, v: rand.Float64()} - }), - ) - }) + t.Run("serial blocks with mix of histograms and float64", func(t *testing.T) { + testBlockQuerying(t, + genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(1), floatHistogram), + genHistogramSeries(10, 5, minute(61), minute(120), minute(1), floatHistogram), + genHistogramAndFloatSeries(10, 5, minute(121), minute(180), minute(1), floatHistogram), + genSeriesFromSampleGenerator(10, 5, minute(181), minute(240), minute(1), func(ts int64) tsdbutil.Sample { + return sample{t: ts, v: rand.Float64()} + }), + ) + }) - t.Run("overlapping blocks with only histograms", func(t *testing.T) { - testBlockQuerying(t, - genHistogramSeries(10, 5, minute(0), minute(120), minute(3)), - genHistogramSeries(10, 5, minute(1), minute(120), minute(3)), - genHistogramSeries(10, 5, minute(2), minute(120), minute(3)), - ) - }) + t.Run("overlapping blocks with only histograms", func(t *testing.T) { + testBlockQuerying(t, + genHistogramSeries(10, 5, minute(0), minute(120), minute(3), floatHistogram), + genHistogramSeries(10, 5, minute(1), minute(120), minute(3), floatHistogram), + genHistogramSeries(10, 5, minute(2), minute(120), minute(3), floatHistogram), + ) + }) - t.Run("overlapping blocks with only histograms and only float in a series", func(t *testing.T) { - testBlockQuerying(t, - genHistogramSeries(10, 5, minute(0), minute(120), minute(3)), - genSeriesFromSampleGenerator(10, 5, minute(1), minute(120), minute(3), func(ts int64) tsdbutil.Sample { - return sample{t: ts, v: rand.Float64()} - }), - genHistogramSeries(10, 5, minute(2), minute(120), minute(3)), - ) - }) + t.Run("overlapping blocks with only histograms and only float in a series", func(t *testing.T) { + testBlockQuerying(t, + genHistogramSeries(10, 5, minute(0), minute(120), minute(3), floatHistogram), + genSeriesFromSampleGenerator(10, 5, minute(1), minute(120), minute(3), func(ts int64) tsdbutil.Sample { + return sample{t: ts, v: rand.Float64()} + }), + genHistogramSeries(10, 5, minute(2), minute(120), minute(3), floatHistogram), + ) + }) - t.Run("overlapping blocks with mix of histograms and float64", func(t *testing.T) { - testBlockQuerying(t, - genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(3)), - genHistogramSeries(10, 5, minute(46), minute(100), minute(3)), - genHistogramAndFloatSeries(10, 5, minute(89), minute(140), minute(3)), - genSeriesFromSampleGenerator(10, 5, minute(126), minute(200), minute(3), func(ts int64) tsdbutil.Sample { - return sample{t: ts, v: rand.Float64()} - }), - ) - }) + t.Run("overlapping blocks with mix of histograms and float64", func(t *testing.T) { + testBlockQuerying(t, + genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(3), floatHistogram), + genHistogramSeries(10, 5, minute(46), minute(100), minute(3), floatHistogram), + genHistogramAndFloatSeries(10, 5, minute(89), minute(140), minute(3), floatHistogram), + genSeriesFromSampleGenerator(10, 5, minute(126), minute(200), minute(3), func(ts int64) tsdbutil.Sample { + return sample{t: ts, v: rand.Float64()} + }), + ) + }) + }) + } } func TestNativeHistogramFlag(t *testing.T) { @@ -6172,16 +6249,22 @@ func TestNativeHistogramFlag(t *testing.T) { app := db.Appender(context.Background()) // Disabled by default. - _, err = app.AppendHistogram(0, l, 100, h) + _, err = app.AppendHistogram(0, l, 100, h, nil) + require.Equal(t, storage.ErrNativeHistogramsDisabled, err) + _, err = app.AppendHistogram(0, l, 105, nil, h.ToFloat()) require.Equal(t, storage.ErrNativeHistogramsDisabled, err) // Enable and append. db.EnableNativeHistograms() - _, err = app.AppendHistogram(0, l, 200, h) + _, err = app.AppendHistogram(0, l, 200, h, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, l, 205, nil, h.ToFloat()) require.NoError(t, err) db.DisableNativeHistograms() - _, err = app.AppendHistogram(0, l, 300, h) + _, err = app.AppendHistogram(0, l, 300, h, nil) + require.Equal(t, storage.ErrNativeHistogramsDisabled, err) + _, err = app.AppendHistogram(0, l, 305, nil, h.ToFloat()) require.Equal(t, storage.ErrNativeHistogramsDisabled, err) require.NoError(t, app.Commit()) @@ -6189,5 +6272,7 @@ func TestNativeHistogramFlag(t *testing.T) { q, err := db.Querier(context.Background(), math.MinInt, math.MaxInt64) require.NoError(t, err) act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - require.Equal(t, map[string][]tsdbutil.Sample{l.String(): {sample{t: 200, h: h}}}, act) + require.Equal(t, map[string][]tsdbutil.Sample{ + l.String(): {sample{t: 200, h: h}, sample{t: 205, fh: h.ToFloat()}}, + }, act) } diff --git a/tsdb/head.go b/tsdb/head.go index be4b3b6a95..37e12d8a42 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -74,19 +74,20 @@ type Head struct { // This should be typecasted to chunks.ChunkDiskMapperRef after loading. minOOOMmapRef atomic.Uint64 - metrics *headMetrics - opts *HeadOptions - wal, wbl *wlog.WL - exemplarMetrics *ExemplarMetrics - exemplars ExemplarStorage - logger log.Logger - appendPool sync.Pool - exemplarsPool sync.Pool - histogramsPool sync.Pool - metadataPool sync.Pool - seriesPool sync.Pool - bytesPool sync.Pool - memChunkPool sync.Pool + metrics *headMetrics + opts *HeadOptions + wal, wbl *wlog.WL + exemplarMetrics *ExemplarMetrics + exemplars ExemplarStorage + logger log.Logger + appendPool sync.Pool + exemplarsPool sync.Pool + histogramsPool sync.Pool + floatHistogramsPool sync.Pool + metadataPool sync.Pool + seriesPool sync.Pool + bytesPool sync.Pool + memChunkPool sync.Pool // All series addressable by their ID or hash. series *stripeSeries @@ -1850,7 +1851,8 @@ type memSeries struct { lastValue float64 // We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates. - lastHistogramValue *histogram.Histogram + lastHistogramValue *histogram.Histogram + lastFloatHistogramValue *histogram.FloatHistogram // Current appender for the head chunk. Set when a new head chunk is cut. // It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit @@ -2043,3 +2045,22 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { return r } + +func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { + for i := 0; i < n; i++ { + r = append(r, &histogram.FloatHistogram{ + Count: 5 + float64(i*4), + ZeroCount: 2 + float64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, + }) + } + + return r +} diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 8228632763..e541f013fc 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -68,14 +68,14 @@ func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e return a.app.AppendExemplar(ref, l, e) } -func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { +func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if a.app != nil { - return a.app.AppendHistogram(ref, l, t, h) + return a.app.AppendHistogram(ref, l, t, h, fh) } a.head.initTime(t) a.app = a.head.appender() - return a.app.AppendHistogram(ref, l, t, h) + return a.app.AppendHistogram(ref, l, t, h, fh) } func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { @@ -156,6 +156,7 @@ func (h *Head) appender() *headAppender { sampleSeries: h.getSeriesBuffer(), exemplars: exemplarsBuf, histograms: h.getHistogramBuffer(), + floatHistograms: h.getFloatHistogramBuffer(), metadata: h.getMetadataBuffer(), appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, @@ -236,6 +237,19 @@ func (h *Head) putHistogramBuffer(b []record.RefHistogramSample) { h.histogramsPool.Put(b[:0]) } +func (h *Head) getFloatHistogramBuffer() []record.RefFloatHistogramSample { + b := h.floatHistogramsPool.Get() + if b == nil { + return make([]record.RefFloatHistogramSample, 0, 512) + } + return b.([]record.RefFloatHistogramSample) +} + +func (h *Head) putFloatHistogramBuffer(b []record.RefFloatHistogramSample) { + //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + h.floatHistogramsPool.Put(b[:0]) +} + func (h *Head) getMetadataBuffer() []record.RefMetadata { b := h.metadataPool.Get() if b == nil { @@ -287,14 +301,16 @@ type headAppender struct { headMaxt int64 // We track it here to not take the lock for every sample appended. oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. - series []record.RefSeries // New series held by this appender. - samples []record.RefSample // New float samples held by this appender. - exemplars []exemplarWithSeriesRef // New exemplars held by this appender. - sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). - histograms []record.RefHistogramSample // New histogram samples held by this appender. - histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). - metadata []record.RefMetadata // New metadata held by this appender. - metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. + series []record.RefSeries // New series held by this appender. + samples []record.RefSample // New float samples held by this appender. + sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). + histograms []record.RefHistogramSample // New histogram samples held by this appender. + histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). + floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender. + floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). + metadata []record.RefMetadata // New metadata held by this appender. + metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. + exemplars []exemplarWithSeriesRef // New exemplars held by this appender. appendID, cleanupAppendIDsBelow uint64 closed bool @@ -335,7 +351,8 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } if value.IsStaleNaN(v) && s.isHistogramSeries { - return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}) + // TODO(marctc): do we have do to the same for float histograms? + return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) } s.Lock() @@ -439,6 +456,28 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error { return nil } +// appendableFloatHistogram checks whether the given sample is valid for appending to the series. +func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram) error { + c := s.head() + if c == nil { + return nil + } + + if t > c.maxTime { + return nil + } + if t < c.maxTime { + return storage.ErrOutOfOrderSample + } + + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + if !fh.Equals(s.lastFloatHistogramValue) { + return storage.ErrDuplicateSampleForTimestamp + } + return nil +} + // AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't // use getOrCreate or make any of the lset validity checks that Append does. func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { @@ -476,7 +515,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, return storage.SeriesRef(s.ref), nil } -func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { +func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if !a.head.opts.EnableNativeHistograms.Load() { return 0, storage.ErrNativeHistogramsDisabled } @@ -486,8 +525,16 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels return 0, storage.ErrOutOfBounds } - if err := ValidateHistogram(h); err != nil { - return 0, err + if h != nil { + if err := ValidateHistogram(h); err != nil { + return 0, err + } + } + + if fh != nil { + if err := ValidateFloatHistogram(fh); err != nil { + return 0, err + } } s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) @@ -517,16 +564,41 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } } - s.Lock() - if err := s.appendableHistogram(t, h); err != nil { - s.Unlock() - if err == storage.ErrOutOfOrderSample { - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + if h != nil { + s.Lock() + if err := s.appendableHistogram(t, h); err != nil { + s.Unlock() + if err == storage.ErrOutOfOrderSample { + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + } + return 0, err } - return 0, err + s.pendingCommit = true + s.Unlock() + a.histograms = append(a.histograms, record.RefHistogramSample{ + Ref: s.ref, + T: t, + H: h, + }) + a.histogramSeries = append(a.histogramSeries, s) + } else if fh != nil { + s.Lock() + if err := s.appendableFloatHistogram(t, fh); err != nil { + s.Unlock() + if err == storage.ErrOutOfOrderSample { + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + } + return 0, err + } + s.pendingCommit = true + s.Unlock() + a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ + Ref: s.ref, + T: t, + FH: fh, + }) + a.floatHistogramSeries = append(a.floatHistogramSeries, s) } - s.pendingCommit = true - s.Unlock() if t < a.mint { a.mint = t @@ -535,12 +607,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels a.maxt = t } - a.histograms = append(a.histograms, record.RefHistogramSample{ - Ref: s.ref, - T: t, - H: h, - }) - a.histogramSeries = append(a.histogramSeries, s) return storage.SeriesRef(s.ref), nil } @@ -582,17 +648,17 @@ func ValidateHistogram(h *histogram.Histogram) error { if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return errors.Wrap(err, "positive side") } - - negativeCount, err := checkHistogramBuckets(h.NegativeBuckets) + var nCount, pCount uint64 + err := checkHistogramBuckets(h.NegativeBuckets, &nCount, true) if err != nil { return errors.Wrap(err, "negative side") } - positiveCount, err := checkHistogramBuckets(h.PositiveBuckets) + err = checkHistogramBuckets(h.PositiveBuckets, &pCount, true) if err != nil { return errors.Wrap(err, "positive side") } - if c := negativeCount + positiveCount; c > h.Count { + if c := nCount + pCount; c > h.Count { return errors.Wrap( storage.ErrHistogramCountNotBigEnough, fmt.Sprintf("%d observations found in buckets, but the Count field is %d", c, h.Count), @@ -602,6 +668,33 @@ func ValidateHistogram(h *histogram.Histogram) error { return nil } +func ValidateFloatHistogram(h *histogram.FloatHistogram) error { + if err := checkHistogramSpans(h.NegativeSpans, len(h.NegativeBuckets)); err != nil { + return errors.Wrap(err, "negative side") + } + if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil { + return errors.Wrap(err, "positive side") + } + var nCount, pCount float64 + err := checkHistogramBuckets(h.NegativeBuckets, &nCount, false) + if err != nil { + return errors.Wrap(err, "negative side") + } + err = checkHistogramBuckets(h.PositiveBuckets, &pCount, false) + if err != nil { + return errors.Wrap(err, "positive side") + } + + if c := nCount + pCount; c > h.Count { + return errors.Wrap( + storage.ErrHistogramCountNotBigEnough, + fmt.Sprintf("%f observations found in buckets, but the Count field is %f", c, h.Count), + ) + } + + return nil +} + func checkHistogramSpans(spans []histogram.Span, numBuckets int) error { var spanBuckets int for n, span := range spans { @@ -622,27 +715,30 @@ func checkHistogramSpans(spans []histogram.Span, numBuckets int) error { return nil } -func checkHistogramBuckets(buckets []int64) (uint64, error) { +func checkHistogramBuckets[BC histogram.BucketCount, IBC histogram.InternalBucketCount](buckets []IBC, count *BC, deltas bool) error { if len(buckets) == 0 { - return 0, nil + return nil } - var count uint64 - var last int64 - + var last IBC for i := 0; i < len(buckets); i++ { - c := last + buckets[i] + var c IBC + if deltas { + c = last + buckets[i] + } else { + c = buckets[i] + } if c < 0 { - return 0, errors.Wrap( + return errors.Wrap( storage.ErrHistogramNegativeBucketCount, - fmt.Sprintf("bucket number %d has observation count of %d", i+1, c), + fmt.Sprintf("bucket number %d has observation count of %v", i+1, c), ) } last = c - count += uint64(c) + *count += BC(c) } - return count, nil + return nil } var _ storage.GetRef = &headAppender{} @@ -707,6 +803,13 @@ func (a *headAppender) log() error { return errors.Wrap(err, "log histograms") } } + if len(a.floatHistograms) > 0 { + rec = enc.FloatHistogramSamples(a.floatHistograms, buf) + buf = rec[:0] + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log float histograms") + } + } return nil } @@ -753,6 +856,7 @@ func (a *headAppender) Commit() (err error) { defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.putExemplarBuffer(a.exemplars) defer a.head.putHistogramBuffer(a.histograms) + defer a.head.putFloatHistogramBuffer(a.floatHistograms) defer a.head.putMetadataBuffer(a.metadata) defer a.head.iso.closeAppend(a.appendID) @@ -924,6 +1028,32 @@ func (a *headAppender) Commit() (err error) { } } + histogramsTotal += len(a.floatHistograms) + for i, s := range a.floatHistograms { + series = a.floatHistogramSeries[i] + series.Lock() + ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange) + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + + if ok { + if s.T < inOrderMint { + inOrderMint = s.T + } + if s.T > inOrderMaxt { + inOrderMaxt = s.T + } + } else { + histogramsTotal-- + histoOOORejected++ + } + if chunkCreated { + a.head.metrics.chunks.Inc() + a.head.metrics.chunksCreated.Inc() + } + } + for i, m := range a.metadata { series = a.metadataSeries[i] series.Lock() @@ -1067,6 +1197,74 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui return true, chunkCreated } +// appendFloatHistogram adds the float histogram. +// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. +func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { + // Head controls the execution of recoding, so that we own the proper + // chunk reference afterwards. We check for Appendable before + // appendPreprocessor because in case it ends up creating a new chunk, + // we need to know if there was also a counter reset or not to set the + // meta properly. + app, _ := s.app.(*chunkenc.FloatHistogramAppender) + var ( + positiveInterjections, negativeInterjections []chunkenc.Interjection + okToAppend, counterReset bool + ) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) + if !sampleInOrder { + return sampleInOrder, chunkCreated + } + if app != nil { + positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh) + } + + if !chunkCreated { + // We have 3 cases here + // - !okToAppend -> We need to cut a new chunk. + // - okToAppend but we have interjections → Existing chunk needs + // recoding before we can append our histogram. + // - okToAppend and no interjections → Chunk is ready to support our histogram. + if !okToAppend || counterReset { + c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) + chunkCreated = true + } else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 { + // New buckets have appeared. We need to recode all + // prior histogram samples within the chunk before we + // can process this one. + chunk, app := app.Recode( + positiveInterjections, negativeInterjections, + fh.PositiveSpans, fh.NegativeSpans, + ) + c.chunk = chunk + s.app = app + } + } + + if chunkCreated { + hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk) + header := chunkenc.UnknownCounterReset + if counterReset { + header = chunkenc.CounterReset + } else if okToAppend { + header = chunkenc.NotCounterReset + } + hc.SetCounterResetHeader(header) + } + + s.app.AppendFloatHistogram(t, fh) + s.isHistogramSeries = true + + c.maxTime = t + + s.lastFloatHistogramValue = fh + + if appendID > 0 { + s.txs.add(appendID) + } + + return true, chunkCreated +} + // appendPreprocessor takes care of cutting new chunks and m-mapping old chunks. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. @@ -1254,6 +1452,7 @@ func (a *headAppender) Rollback() (err error) { a.head.putAppendBuffer(a.samples) a.head.putExemplarBuffer(a.exemplars) a.head.putHistogramBuffer(a.histograms) + a.head.putFloatHistogramBuffer(a.floatHistograms) a.head.putMetadataBuffer(a.metadata) a.samples = nil a.exemplars = nil diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 16800d75e2..b8c68eddc4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2821,6 +2821,7 @@ func TestAppendHistogram(t *testing.T) { }) require.NoError(t, head.Init(0)) + ingestTs := int64(0) app := head.Appender(context.Background()) type timedHistogram struct { @@ -2828,10 +2829,31 @@ func TestAppendHistogram(t *testing.T) { h *histogram.Histogram } expHistograms := make([]timedHistogram, 0, numHistograms) - for i, h := range GenerateTestHistograms(numHistograms) { - _, err := app.AppendHistogram(0, l, int64(i), h) + for _, h := range GenerateTestHistograms(numHistograms) { + _, err := app.AppendHistogram(0, l, ingestTs, h, nil) require.NoError(t, err) - expHistograms = append(expHistograms, timedHistogram{int64(i), h}) + expHistograms = append(expHistograms, timedHistogram{ingestTs, h}) + ingestTs++ + if ingestTs%50 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + + type timedFloatHistogram struct { + t int64 + h *histogram.FloatHistogram + } + expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms) + for _, fh := range GenerateTestFloatHistograms(numHistograms) { + _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) + require.NoError(t, err) + expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) + ingestTs++ + if ingestTs%50 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } } require.NoError(t, app.Commit()) @@ -2849,18 +2871,25 @@ func TestAppendHistogram(t *testing.T) { it := s.Iterator(nil) actHistograms := make([]timedHistogram, 0, len(expHistograms)) - for it.Next() == chunkenc.ValHistogram { - t, h := it.AtHistogram() - actHistograms = append(actHistograms, timedHistogram{t, h}) + actFloatHistograms := make([]timedFloatHistogram, 0, len(expFloatHistograms)) + for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { + if typ == chunkenc.ValHistogram { + ts, h := it.AtHistogram() + actHistograms = append(actHistograms, timedHistogram{ts, h}) + } else if typ == chunkenc.ValFloatHistogram { + ts, fh := it.AtFloatHistogram() + actFloatHistograms = append(actFloatHistograms, timedFloatHistogram{ts, fh}) + } } require.Equal(t, expHistograms, actHistograms) + require.Equal(t, expFloatHistograms, actFloatHistograms) }) } } func TestHistogramInWALAndMmapChunk(t *testing.T) { - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 2000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -2872,24 +2901,41 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { numHistograms := 450 exp := map[string][]tsdbutil.Sample{} app := head.Appender(context.Background()) - for i, h := range GenerateTestHistograms(numHistograms) { + ts := int64(0) + for _, h := range GenerateTestHistograms(numHistograms) { h.Count = h.Count * 2 h.NegativeSpans = h.PositiveSpans h.NegativeBuckets = h.PositiveBuckets - _, err := app.AppendHistogram(0, s1, int64(i), h) + _, err := app.AppendHistogram(0, s1, ts, h, nil) require.NoError(t, err) - exp[k1] = append(exp[k1], sample{t: int64(i), h: h.Copy()}) - if i%5 == 0 { + exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()}) + ts++ + if ts%5 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + for _, h := range GenerateTestFloatHistograms(numHistograms) { + h.Count = h.Count * 2 + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, s1, ts, nil, h) + require.NoError(t, err) + exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) + ts++ + if ts%5 == 0 { require.NoError(t, app.Commit()) app = head.Appender(context.Background()) } } require.NoError(t, app.Commit()) - // There should be 3 mmap chunks in s1. + // There should be 7 mmap chunks in s1. ms := head.series.getByHash(s1.Hash(), s1) - require.Len(t, ms.mmappedChunks, 3) - expMmapChunks := make([]*mmappedChunk, 0, 3) + require.Len(t, ms.mmappedChunks, 7) + expMmapChunks := make([]*mmappedChunk, 0, 7) for _, mmap := range ms.mmappedChunks { require.Greater(t, mmap.numSamples, uint16(0)) cpy := *mmap @@ -2902,13 +2948,13 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { s2 := labels.FromStrings("a", "b2") k2 := s2.String() app = head.Appender(context.Background()) - ts := 0 - for _, h := range GenerateTestHistograms(200) { + ts = 0 + for _, h := range GenerateTestHistograms(100) { ts++ h.Count = h.Count * 2 h.NegativeSpans = h.PositiveSpans h.NegativeBuckets = h.PositiveBuckets - _, err := app.AppendHistogram(0, s2, int64(ts), h) + _, err := app.AppendHistogram(0, s2, int64(ts), h, nil) require.NoError(t, err) exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()}) if ts%20 == 0 { @@ -2926,6 +2972,30 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } } require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + for _, h := range GenerateTestFloatHistograms(100) { + ts++ + h.Count = h.Count * 2 + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, s2, int64(ts), nil, h) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) + if ts%20 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + // Add some float. + for i := 0; i < 10; i++ { + ts++ + _, err := app.Append(0, s2, int64(ts), float64(ts)) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) + } + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) // Restart head. require.NoError(t, head.Close()) @@ -3250,6 +3320,7 @@ func TestSnapshotError(t *testing.T) { } func TestHistogramMetrics(t *testing.T) { + numHistograms := 10 head, _ := newTestHead(t, 1000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) @@ -3261,9 +3332,16 @@ func TestHistogramMetrics(t *testing.T) { for x := 0; x < 5; x++ { expHSeries++ l := labels.FromStrings("a", fmt.Sprintf("b%d", x)) - for i, h := range GenerateTestHistograms(10) { + for i, h := range GenerateTestHistograms(numHistograms) { app := head.Appender(context.Background()) - _, err := app.AppendHistogram(0, l, int64(i), h) + _, err := app.AppendHistogram(0, l, int64(i), h, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + expHSamples++ + } + for i, fh := range GenerateTestFloatHistograms(numHistograms) { + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, int64(numHistograms+i), nil, fh) require.NoError(t, err) require.NoError(t, app.Commit()) expHSamples++ @@ -3283,6 +3361,8 @@ func TestHistogramMetrics(t *testing.T) { } func TestHistogramStaleSample(t *testing.T) { + // TODO(marctc): Add similar test for float histograms + l := labels.FromStrings("a", "b") numHistograms := 20 head, _ := newTestHead(t, 100000, false, false) @@ -3338,7 +3418,7 @@ func TestHistogramStaleSample(t *testing.T) { // Adding stale in the same appender. app := head.Appender(context.Background()) for _, h := range GenerateTestHistograms(numHistograms) { - _, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h) + _, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h, nil) require.NoError(t, err) expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h}) } @@ -3357,7 +3437,7 @@ func TestHistogramStaleSample(t *testing.T) { // Adding stale in different appender and continuing series after a stale sample. app = head.Appender(context.Background()) for _, h := range GenerateTestHistograms(2 * numHistograms)[numHistograms:] { - _, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h) + _, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h, nil) require.NoError(t, err) expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h}) } @@ -3378,104 +3458,121 @@ func TestHistogramStaleSample(t *testing.T) { } func TestHistogramCounterResetHeader(t *testing.T) { - l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, false, false) - t.Cleanup(func() { - require.NoError(t, head.Close()) - }) - require.NoError(t, head.Init(0)) + for _, floatHisto := range []bool{true, false} { + t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { + l := labels.FromStrings("a", "b") + head, _ := newTestHead(t, 1000, false, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) - ts := int64(0) - appendHistogram := func(h *histogram.Histogram) { - ts++ - app := head.Appender(context.Background()) - _, err := app.AppendHistogram(0, l, ts, h) - require.NoError(t, err) - require.NoError(t, app.Commit()) + ts := int64(0) + appendHistogram := func(h *histogram.Histogram) { + ts++ + app := head.Appender(context.Background()) + var err error + if floatHisto { + _, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat()) + } else { + _, err = app.AppendHistogram(0, l, ts, h, nil) + } + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + var expHeaders []chunkenc.CounterResetHeader + checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) { + expHeaders = append(expHeaders, newHeaders...) + + ms, _, err := head.getOrCreate(l.Hash(), l) + require.NoError(t, err) + require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk. + + for i, mmapChunk := range ms.mmappedChunks { + chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) + require.NoError(t, err) + if floatHisto { + require.Equal(t, expHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + } else { + require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + } + if floatHisto { + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + } else { + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + } + + h := GenerateTestHistograms(1)[0] + if len(h.NegativeBuckets) == 0 { + h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...) + h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...) + } + h.PositiveBuckets = []int64{100, 1, 1, 1} + h.NegativeBuckets = []int64{100, 1, 1, 1} + h.Count = 1000 + + // First histogram is UnknownCounterReset. + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.UnknownCounterReset) + + // Another normal histogram. + h.Count++ + appendHistogram(h) + checkExpCounterResetHeader() + + // Counter reset via Count. + h.Count-- + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + + // Add 2 non-counter reset histograms. + for i := 0; i < 250; i++ { + appendHistogram(h) + } + checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset) + + // Changing schema will cut a new chunk with unknown counter reset. + h.Schema++ + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.UnknownCounterReset) + + // Changing schema will zero threshold a new chunk with unknown counter reset. + h.ZeroThreshold += 0.01 + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.UnknownCounterReset) + + // Counter reset by removing a positive bucket. + h.PositiveSpans[1].Length-- + h.PositiveBuckets = h.PositiveBuckets[1:] + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + + // Counter reset by removing a negative bucket. + h.NegativeSpans[1].Length-- + h.NegativeBuckets = h.NegativeBuckets[1:] + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + + // Add 2 non-counter reset histograms. Just to have some non-counter reset chunks in between. + for i := 0; i < 250; i++ { + appendHistogram(h) + } + checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset) + + // Counter reset with counter reset in a positive bucket. + h.PositiveBuckets[len(h.PositiveBuckets)-1]-- + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + + // Counter reset with counter reset in a negative bucket. + h.NegativeBuckets[len(h.NegativeBuckets)-1]-- + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + }) } - - var expHeaders []chunkenc.CounterResetHeader - checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) { - expHeaders = append(expHeaders, newHeaders...) - - ms, _, err := head.getOrCreate(l.Hash(), l) - require.NoError(t, err) - require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk. - - for i, mmapChunk := range ms.mmappedChunks { - chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) - require.NoError(t, err) - require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) - } - require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) - } - - h := GenerateTestHistograms(1)[0] - if len(h.NegativeBuckets) == 0 { - h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...) - h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...) - } - h.PositiveBuckets = []int64{100, 1, 1, 1} - h.NegativeBuckets = []int64{100, 1, 1, 1} - h.Count = 1000 - - // First histogram is UnknownCounterReset. - appendHistogram(h) - checkExpCounterResetHeader(chunkenc.UnknownCounterReset) - - // Another normal histogram. - h.Count++ - appendHistogram(h) - checkExpCounterResetHeader() - - // Counter reset via Count. - h.Count-- - appendHistogram(h) - checkExpCounterResetHeader(chunkenc.CounterReset) - - // Add 2 non-counter reset histograms. - for i := 0; i < 250; i++ { - appendHistogram(h) - } - checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset) - - // Changing schema will cut a new chunk with unknown counter reset. - h.Schema++ - appendHistogram(h) - checkExpCounterResetHeader(chunkenc.UnknownCounterReset) - - // Changing schema will zero threshold a new chunk with unknown counter reset. - h.ZeroThreshold += 0.01 - appendHistogram(h) - checkExpCounterResetHeader(chunkenc.UnknownCounterReset) - - // Counter reset by removing a positive bucket. - h.PositiveSpans[1].Length-- - h.PositiveBuckets = h.PositiveBuckets[1:] - appendHistogram(h) - checkExpCounterResetHeader(chunkenc.CounterReset) - - // Counter reset by removing a negative bucket. - h.NegativeSpans[1].Length-- - h.NegativeBuckets = h.NegativeBuckets[1:] - appendHistogram(h) - checkExpCounterResetHeader(chunkenc.CounterReset) - - // Add 2 non-counter reset histograms. Just to have some non-counter reset chunks in between. - for i := 0; i < 250; i++ { - appendHistogram(h) - } - checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset) - - // Counter reset with counter reset in a positive bucket. - h.PositiveBuckets[len(h.PositiveBuckets)-1]-- - appendHistogram(h) - checkExpCounterResetHeader(chunkenc.CounterReset) - - // Counter reset with counter reset in a negative bucket. - h.NegativeBuckets[len(h.NegativeBuckets)-1]-- - appendHistogram(h) - checkExpCounterResetHeader(chunkenc.CounterReset) } func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { @@ -3490,34 +3587,10 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { db.DisableCompactions() hists := GenerateTestHistograms(10) + floatHists := GenerateTestFloatHistograms(10) lbls := labels.FromStrings("a", "b") - type result struct { - t int64 - v float64 - h *histogram.Histogram - vt chunkenc.ValueType - } - expResult := []result{} - ref := storage.SeriesRef(0) - addFloat64Sample := func(app storage.Appender, ts int64, v float64) { - ref, err = app.Append(ref, lbls, ts, v) - require.NoError(t, err) - expResult = append(expResult, result{ - t: ts, - v: v, - vt: chunkenc.ValFloat, - }) - } - addHistogramSample := func(app storage.Appender, ts int64, h *histogram.Histogram) { - ref, err = app.AppendHistogram(ref, lbls, ts, h) - require.NoError(t, err) - expResult = append(expResult, result{ - t: ts, - h: h, - vt: chunkenc.ValHistogram, - }) - } + var expResult []tsdbutil.Sample checkExpChunks := func(count int) { ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls) require.NoError(t, err) @@ -3526,94 +3599,120 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { require.Len(t, ms.mmappedChunks, count-1) // One will be the head chunk. } - // Only histograms in first commit. - app := db.Appender(context.Background()) - addHistogramSample(app, 1, hists[1]) - require.NoError(t, app.Commit()) - checkExpChunks(1) + appends := []struct { + samples []tsdbutil.Sample + expChunks int + err error + // If this is empty, samples above will be taken instead of this. + addToExp []tsdbutil.Sample + }{ + { + samples: []tsdbutil.Sample{sample{t: 100, h: hists[1]}}, + expChunks: 1, + }, + { + samples: []tsdbutil.Sample{sample{t: 200, v: 2}}, + expChunks: 2, + }, + { + samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[1]}}, + expChunks: 3, + }, + { + samples: []tsdbutil.Sample{sample{t: 220, h: hists[1]}}, + expChunks: 4, + }, + { + samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3]}}, + expChunks: 5, + }, + { + samples: []tsdbutil.Sample{sample{t: 100, h: hists[2]}}, + err: storage.ErrOutOfOrderSample, + }, + { + samples: []tsdbutil.Sample{sample{t: 300, h: hists[3]}}, + expChunks: 6, + }, + { + samples: []tsdbutil.Sample{sample{t: 100, v: 2}}, + err: storage.ErrOutOfOrderSample, + }, + { + samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4]}}, + err: storage.ErrOutOfOrderSample, + }, + { + // Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also + // verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order. + samples: []tsdbutil.Sample{ + sample{t: 400, v: 4}, + sample{t: 500, h: hists[5]}, // This won't be committed. + sample{t: 600, v: 6}, + }, + addToExp: []tsdbutil.Sample{ + sample{t: 400, v: 4}, + sample{t: 600, v: 6}, + }, + expChunks: 7, // Only 1 new chunk for float64. + }, + { + // Here the histogram is appended at the end, hence the first histogram is out of order. + samples: []tsdbutil.Sample{ + sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first. + sample{t: 800, v: 8}, + sample{t: 900, h: hists[9]}, + }, + addToExp: []tsdbutil.Sample{ + sample{t: 800, v: 8}, + sample{t: 900, h: hists[9]}, + }, + expChunks: 8, // float64 added to old chunk, only 1 new for histograms. + }, + { + // Float histogram is appended at the end. + samples: []tsdbutil.Sample{ + sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram. + sample{t: 1100, h: hists[9]}, + }, + addToExp: []tsdbutil.Sample{ + sample{t: 1100, h: hists[9]}, + }, + expChunks: 8, + }, + } - // Only float64 in second commit, a new chunk should be cut. - app = db.Appender(context.Background()) - addFloat64Sample(app, 2, 2) - require.NoError(t, app.Commit()) - checkExpChunks(2) + for _, a := range appends { + app := db.Appender(context.Background()) + for _, s := range a.samples { + var err error + if s.H() != nil || s.FH() != nil { + _, err = app.AppendHistogram(0, lbls, s.T(), s.H(), s.FH()) + } else { + _, err = app.Append(0, lbls, s.T(), s.V()) + } + require.Equal(t, a.err, err) + } - // Out of order histogram is shown correctly for a float64 chunk. No new chunk. - app = db.Appender(context.Background()) - _, err = app.AppendHistogram(ref, lbls, 1, hists[2]) - require.Equal(t, storage.ErrOutOfOrderSample, err) - require.NoError(t, app.Commit()) - - // Only histograms in third commit to check float64 -> histogram transition. - app = db.Appender(context.Background()) - addHistogramSample(app, 3, hists[3]) - require.NoError(t, app.Commit()) - checkExpChunks(3) - - // Out of order float64 is shown correctly for a histogram chunk. No new chunk. - app = db.Appender(context.Background()) - _, err = app.Append(ref, lbls, 1, 2) - require.Equal(t, storage.ErrOutOfOrderSample, err) - require.NoError(t, app.Commit()) - - // Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also - // verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order. - app = db.Appender(context.Background()) - addFloat64Sample(app, 4, 4) - // This won't be committed. - addHistogramSample(app, 5, hists[5]) - expResult = expResult[0 : len(expResult)-1] - addFloat64Sample(app, 6, 6) - require.NoError(t, app.Commit()) - checkExpChunks(4) // Only 1 new chunk for float64. - - // Here the histogram is appended at the end, hence the first histogram is out of order. - app = db.Appender(context.Background()) - // Out of order w.r.t. the next float64 sample that is appended first. - addHistogramSample(app, 7, hists[7]) - expResult = expResult[0 : len(expResult)-1] - addFloat64Sample(app, 8, 9) - addHistogramSample(app, 9, hists[9]) - require.NoError(t, app.Commit()) - checkExpChunks(5) // float64 added to old chunk, only 1 new for histograms. + if a.err == nil { + require.NoError(t, app.Commit()) + if len(a.addToExp) > 0 { + expResult = append(expResult, a.addToExp...) + } else { + expResult = append(expResult, a.samples...) + } + checkExpChunks(a.expChunks) + } else { + require.NoError(t, app.Rollback()) + } + } // Query back and expect same order of samples. q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, q.Close()) - }) - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) - require.True(t, ss.Next()) - s := ss.At() - it := s.Iterator(nil) - expIdx := 0 -loop: - for { - vt := it.Next() - switch vt { - case chunkenc.ValNone: - require.Equal(t, len(expResult), expIdx) - break loop - case chunkenc.ValFloat: - ts, v := it.At() - require.Equal(t, expResult[expIdx].t, ts) - require.Equal(t, expResult[expIdx].v, v) - case chunkenc.ValHistogram: - ts, h := it.AtHistogram() - require.Equal(t, expResult[expIdx].t, ts) - require.Equal(t, expResult[expIdx].h, h) - default: - require.Error(t, fmt.Errorf("unexpected ValueType %v", vt)) - } - require.Equal(t, expResult[expIdx].vt, vt) - expIdx++ - } - require.NoError(t, it.Err()) - require.NoError(t, ss.Err()) - require.Equal(t, len(expResult), expIdx) - require.False(t, ss.Next()) // Only 1 series. + series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.Equal(t, map[string][]tsdbutil.Sample{lbls.String(): expResult}, series) } // Tests https://github.com/prometheus/prometheus/issues/9725. @@ -4101,8 +4200,9 @@ func TestReplayAfterMmapReplayError(t *testing.T) { func TestHistogramValidation(t *testing.T) { tests := map[string]struct { - h *histogram.Histogram - errMsg string + h *histogram.Histogram + errMsg string + errMsgFloat string // To be considered for float histogram only if it is non-empty. }{ "valid histogram": { h: GenerateTestHistograms(1)[0], @@ -4171,7 +4271,8 @@ func TestHistogramValidation(t *testing.T) { NegativeBuckets: []int64{1}, PositiveBuckets: []int64{1}, }, - errMsg: `2 observations found in buckets, but the Count field is 0`, + errMsg: `2 observations found in buckets, but the Count field is 0`, + errMsgFloat: `2.000000 observations found in buckets, but the Count field is 0.000000`, }, } @@ -4183,12 +4284,22 @@ func TestHistogramValidation(t *testing.T) { } else { require.NoError(t, err) } + + err = ValidateFloatHistogram(tc.h.ToFloat()) + if tc.errMsgFloat != "" { + require.ErrorContains(t, err, tc.errMsgFloat) + } else if tc.errMsg != "" { + require.ErrorContains(t, err, tc.errMsg) + } else { + require.NoError(t, err) + } }) } } func BenchmarkHistogramValidation(b *testing.B) { histograms := generateBigTestHistograms(b.N) + b.ResetTimer() for _, h := range histograms { require.NoError(b, ValidateHistogram(h)) } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index b0fa7eb292..df1896b25e 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -29,6 +29,7 @@ import ( "go.uber.org/atomic" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" @@ -42,6 +43,15 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" ) +// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample +// to simplify the WAL replay. +type histogramRecord struct { + ref chunks.HeadSeriesRef + t int64 + h *histogram.Histogram + fh *histogram.FloatHistogram +} + func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. @@ -61,7 +71,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. dec record.Decoder shards = make([][]record.RefSample, n) - histogramShards = make([][]record.RefHistogramSample, n) + histogramShards = make([][]histogramRecord, n) decoded = make(chan interface{}, 10) decodeErr, seriesCreationErr error @@ -90,6 +100,11 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. return []record.RefHistogramSample{} }, } + floatHistogramsPool = sync.Pool{ + New: func() interface{} { + return []record.RefFloatHistogramSample{} + }, + } metadataPool = sync.Pool{ New: func() interface{} { return []record.RefMetadata{} @@ -212,6 +227,18 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. return } decoded <- hists + case record.FloatHistogramSamples: + hists := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0] + hists, err = dec.FloatHistogramSamples(rec, hists) + if err != nil { + decodeErr = &wlog.CorruptionErr{ + Err: errors.Wrap(err, "decode float histograms"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- hists case record.Metadata: meta := metadataPool.Get().([]record.RefMetadata)[:0] meta, err := dec.Metadata(rec, meta) @@ -337,7 +364,7 @@ Outer: sam.Ref = r } mod := uint64(sam.Ref) % uint64(n) - histogramShards[mod] = append(histogramShards[mod], sam) + histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) } for i := 0; i < n; i++ { if len(histogramShards[i]) > 0 { @@ -349,6 +376,43 @@ Outer: } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. histogramsPool.Put(v) + case []record.RefFloatHistogramSample: + samples := v + minValidTime := h.minValidTime.Load() + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + m := 5000 + if len(samples) < m { + m = len(samples) + } + for i := 0; i < n; i++ { + if histogramShards[i] == nil { + histogramShards[i] = processors[i].reuseHistogramBuf() + } + } + for _, sam := range samples[:m] { + if sam.T < minValidTime { + continue // Before minValidTime: discard. + } + if r, ok := multiRef[sam.Ref]; ok { + sam.Ref = r + } + mod := uint64(sam.Ref) % uint64(n) + histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) + } + for i := 0; i < n; i++ { + if len(histogramShards[i]) > 0 { + processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]} + histogramShards[i] = nil + } + } + samples = samples[m:] + } + //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. + floatHistogramsPool.Put(v) case []record.RefMetadata: for _, m := range v { s := h.series.getByID(chunks.HeadSeriesRef(m.Ref)) @@ -467,12 +531,12 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m type walSubsetProcessor struct { input chan walSubsetProcessorInputItem output chan []record.RefSample - histogramsOutput chan []record.RefHistogramSample + histogramsOutput chan []histogramRecord } type walSubsetProcessorInputItem struct { samples []record.RefSample - histogramSamples []record.RefHistogramSample + histogramSamples []histogramRecord existingSeries *memSeries walSeriesRef chunks.HeadSeriesRef } @@ -480,7 +544,7 @@ type walSubsetProcessorInputItem struct { func (wp *walSubsetProcessor) setup() { wp.input = make(chan walSubsetProcessorInputItem, 300) wp.output = make(chan []record.RefSample, 300) - wp.histogramsOutput = make(chan []record.RefHistogramSample, 300) + wp.histogramsOutput = make(chan []histogramRecord, 300) } func (wp *walSubsetProcessor) closeAndDrain() { @@ -502,7 +566,7 @@ func (wp *walSubsetProcessor) reuseBuf() []record.RefSample { } // If there is a buffer in the output chan, return it for reuse, otherwise return nil. -func (wp *walSubsetProcessor) reuseHistogramBuf() []record.RefHistogramSample { +func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord { select { case buf := <-wp.histogramsOutput: return buf[:0] @@ -562,27 +626,33 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp } for _, s := range in.histogramSamples { - if s.T < minValidTime { + if s.t < minValidTime { continue } - ms := h.series.getByID(s.Ref) + ms := h.series.getByID(s.ref) if ms == nil { unknownHistogramRefs++ continue } ms.isHistogramSeries = true - if s.T <= ms.mmMaxTime { + if s.t <= ms.mmMaxTime { continue } - if _, chunkCreated := ms.appendHistogram(s.T, s.H, 0, h.chunkDiskMapper, chunkRange); chunkCreated { + var chunkCreated bool + if s.h != nil { + _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange) + } else { + _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange) + } + if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } - if s.T > maxt { - maxt = s.T + if s.t > maxt { + maxt = s.t } - if s.T < mint { - mint = s.T + if s.t < mint { + mint = s.t } } diff --git a/tsdb/querier.go b/tsdb/querier.go index 16e774b0b5..b9966c2bab 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -531,7 +531,6 @@ func (b *blockBaseSeriesSet) Next() bool { b.curr.labels = b.builder.Labels() b.curr.chks = chks b.curr.intervals = intervals - return true } return false @@ -750,7 +749,6 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { if p.currDelIter == nil { return true } - valueType := p.currDelIter.Next() if valueType == chunkenc.ValNone { if err := p.currDelIter.Err(); err != nil { @@ -825,9 +823,47 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { t, v = p.currDelIter.At() app.Append(t, v) } + case chunkenc.ValFloatHistogram: + newChunk = chunkenc.NewFloatHistogramChunk() + if app, err = newChunk.Appender(); err != nil { + break + } + if hc, ok := p.currChkMeta.Chunk.(*chunkenc.FloatHistogramChunk); ok { + newChunk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader()) + } + var h *histogram.FloatHistogram + t, h = p.currDelIter.AtFloatHistogram() + p.curr.MinTime = t + app.AppendFloatHistogram(t, h) + for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() { + if vt != chunkenc.ValFloatHistogram { + err = fmt.Errorf("found value type %v in histogram chunk", vt) + break + } + t, h = p.currDelIter.AtFloatHistogram() + + // Defend against corrupted chunks. + pI, nI, okToAppend, counterReset := app.(*chunkenc.FloatHistogramAppender).Appendable(h) + if len(pI)+len(nI) > 0 { + err = fmt.Errorf( + "bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required", + len(pI), len(nI), + ) + break + } + if counterReset { + err = errors.New("detected unexpected counter reset in histogram") + break + } + if !okToAppend { + err = errors.New("unable to append histogram due to unexpected schema change") + break + } + + app.AppendFloatHistogram(t, h) + } default: - // TODO(beorn7): Need FloatHistogram eventually. err = fmt.Errorf("populateWithDelChunkSeriesIterator: value type %v unsupported", valueType) } diff --git a/tsdb/record/record.go b/tsdb/record/record.go index a1f05425f7..98894bb429 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -49,6 +49,8 @@ const ( Metadata Type = 6 // HistogramSamples is used to match WAL records of type Histograms. HistogramSamples Type = 7 + // FloatHistogramSamples is used to match WAL records of type Float Histograms. + FloatHistogramSamples Type = 8 ) func (rt Type) String() string { @@ -63,6 +65,8 @@ func (rt Type) String() string { return "exemplars" case HistogramSamples: return "histogram_samples" + case FloatHistogramSamples: + return "float_histogram_samples" case MmapMarkers: return "mmapmarkers" case Metadata: @@ -173,6 +177,13 @@ type RefHistogramSample struct { H *histogram.Histogram } +// RefFloatHistogramSample is a float histogram. +type RefFloatHistogramSample struct { + Ref chunks.HeadSeriesRef + T int64 + FH *histogram.FloatHistogram +} + // RefMmapMarker marks that the all the samples of the given series until now have been m-mapped to disk. type RefMmapMarker struct { Ref chunks.HeadSeriesRef @@ -192,7 +203,7 @@ func (d *Decoder) Type(rec []byte) Type { return Unknown } switch t := Type(rec[0]); t { - case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples: + case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples: return t } return Unknown @@ -427,13 +438,7 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) rh := RefHistogramSample{ Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)), T: baseTime + dtime, - H: &histogram.Histogram{ - Schema: 0, - ZeroThreshold: 0, - ZeroCount: 0, - Count: 0, - Sum: 0, - }, + H: &histogram.Histogram{}, } rh.H.Schema = int32(dec.Varint64()) @@ -489,6 +494,82 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) return histograms, nil } +func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) { + dec := encoding.Decbuf{B: rec} + t := Type(dec.Byte()) + if t != FloatHistogramSamples { + return nil, errors.New("invalid record type") + } + if dec.Len() == 0 { + return histograms, nil + } + var ( + baseRef = dec.Be64() + baseTime = dec.Be64int64() + ) + for len(dec.B) > 0 && dec.Err() == nil { + dref := dec.Varint64() + dtime := dec.Varint64() + + rh := RefFloatHistogramSample{ + Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)), + T: baseTime + dtime, + FH: &histogram.FloatHistogram{}, + } + + rh.FH.Schema = int32(dec.Varint64()) + rh.FH.ZeroThreshold = dec.Be64Float64() + + rh.FH.ZeroCount = dec.Be64Float64() + rh.FH.Count = dec.Be64Float64() + rh.FH.Sum = dec.Be64Float64() + + l := dec.Uvarint() + if l > 0 { + rh.FH.PositiveSpans = make([]histogram.Span, l) + } + for i := range rh.FH.PositiveSpans { + rh.FH.PositiveSpans[i].Offset = int32(dec.Varint64()) + rh.FH.PositiveSpans[i].Length = dec.Uvarint32() + } + + l = dec.Uvarint() + if l > 0 { + rh.FH.NegativeSpans = make([]histogram.Span, l) + } + for i := range rh.FH.NegativeSpans { + rh.FH.NegativeSpans[i].Offset = int32(dec.Varint64()) + rh.FH.NegativeSpans[i].Length = dec.Uvarint32() + } + + l = dec.Uvarint() + if l > 0 { + rh.FH.PositiveBuckets = make([]float64, l) + } + for i := range rh.FH.PositiveBuckets { + rh.FH.PositiveBuckets[i] = dec.Be64Float64() + } + + l = dec.Uvarint() + if l > 0 { + rh.FH.NegativeBuckets = make([]float64, l) + } + for i := range rh.FH.NegativeBuckets { + rh.FH.NegativeBuckets[i] = dec.Be64Float64() + } + + histograms = append(histograms, rh) + } + + if dec.Err() != nil { + return nil, errors.Wrapf(dec.Err(), "decode error after %d histograms", len(histograms)) + } + if len(dec.B) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) + } + return histograms, nil +} + // Encoder encodes series, sample, and tombstones records. // The zero value is ready to use. type Encoder struct{} @@ -666,3 +747,54 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) [] return buf.Get() } + +func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte { + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(FloatHistogramSamples)) + + if len(histograms) == 0 { + return buf.Get() + } + + // Store base timestamp and base reference number of first histogram. + // All histograms encode their timestamp and ref as delta to those. + first := histograms[0] + buf.PutBE64(uint64(first.Ref)) + buf.PutBE64int64(first.T) + + for _, h := range histograms { + buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + buf.PutVarint64(h.T - first.T) + + buf.PutVarint64(int64(h.FH.Schema)) + buf.PutBEFloat64(h.FH.ZeroThreshold) + + buf.PutBEFloat64(h.FH.ZeroCount) + buf.PutBEFloat64(h.FH.Count) + buf.PutBEFloat64(h.FH.Sum) + + buf.PutUvarint(len(h.FH.PositiveSpans)) + for _, s := range h.FH.PositiveSpans { + buf.PutVarint64(int64(s.Offset)) + buf.PutUvarint32(s.Length) + } + + buf.PutUvarint(len(h.FH.NegativeSpans)) + for _, s := range h.FH.NegativeSpans { + buf.PutVarint64(int64(s.Offset)) + buf.PutUvarint32(s.Length) + } + + buf.PutUvarint(len(h.FH.PositiveBuckets)) + for _, b := range h.FH.PositiveBuckets { + buf.PutBEFloat64(b) + } + + buf.PutUvarint(len(h.FH.NegativeBuckets)) + for _, b := range h.FH.NegativeBuckets { + buf.PutBEFloat64(b) + } + } + + return buf.Get() +} diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index fe5b0bac5d..4ad7685a06 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -153,6 +153,18 @@ func TestRecord_EncodeDecode(t *testing.T) { decHistograms, err := dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil) require.NoError(t, err) require.Equal(t, histograms, decHistograms) + + floatHistograms := make([]RefFloatHistogramSample, len(histograms)) + for i, h := range histograms { + floatHistograms[i] = RefFloatHistogramSample{ + Ref: h.Ref, + T: h.T, + FH: h.H.ToFloat(), + } + } + decFloatHistograms, err := dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil) + require.NoError(t, err) + require.Equal(t, floatHistograms, decFloatHistograms) } // TestRecord_Corrupted ensures that corrupted records return the correct error. diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 8117f431c5..d4e43b73cc 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -74,7 +74,10 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l ref, err = app.Append(ref, lset, t, v) case chunkenc.ValHistogram: t, h := it.AtHistogram() - ref, err = app.AppendHistogram(ref, lset, t, h) + ref, err = app.AppendHistogram(ref, lset, t, h, nil) + case chunkenc.ValFloatHistogram: + t, fh := it.AtFloatHistogram() + ref, err = app.AppendHistogram(ref, lset, t, nil, fh) default: return "", fmt.Errorf("unknown sample type %s", typ.String()) }