Support FloatHistogram in TSDB (#11522)

Extends Appender.AppendHistogram function to accept the FloatHistogram. TSDB supports appending, querying, WAL replay, for this new type of histogram.

Signed-off-by: Marc Tudurí <marctc@protonmail.com>
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Marc Tudurí 2022-12-28 09:55:07 +01:00 committed by GitHub
parent ae72c752a1
commit 9474610baf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1329 additions and 583 deletions

View file

@ -1393,7 +1393,7 @@ func (n notReadyAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels,
return 0, tsdb.ErrNotReady return 0, tsdb.ErrNotReady
} }
func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady return 0, tsdb.ErrNotReady
} }

View file

@ -244,6 +244,37 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram {
return h return h
} }
// Equals returns true if the given float histogram matches exactly.
// Exact match is when there are no new buckets (even empty) and no missing buckets,
// and all the bucket values match. Spans can have different empty length spans in between,
// but they must represent the same bucket layout to match.
func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool {
if h2 == nil {
return false
}
if h.Schema != h2.Schema || h.ZeroThreshold != h2.ZeroThreshold ||
h.ZeroCount != h2.ZeroCount || h.Count != h2.Count || h.Sum != h2.Sum {
return false
}
if !spansMatch(h.PositiveSpans, h2.PositiveSpans) {
return false
}
if !spansMatch(h.NegativeSpans, h2.NegativeSpans) {
return false
}
if !bucketsMatch(h.PositiveBuckets, h2.PositiveBuckets) {
return false
}
if !bucketsMatch(h.NegativeBuckets, h2.NegativeBuckets) {
return false
}
return true
}
// addBucket takes the "coordinates" of the last bucket that was handled and // addBucket takes the "coordinates" of the last bucket that was handled and
// adds the provided bucket after it. If a corresponding bucket exists, the // adds the provided bucket after it. If a corresponding bucket exists, the
// count is added. If not, the bucket is inserted. The updated slices and the // count is added. If not, the bucket is inserted. The updated slices and the

View file

@ -25,14 +25,14 @@ type BucketCount interface {
float64 | uint64 float64 | uint64
} }
// internalBucketCount is used internally by Histogram and FloatHistogram. The // InternalBucketCount is used internally by Histogram and FloatHistogram. The
// difference to the BucketCount above is that Histogram internally uses deltas // difference to the BucketCount above is that Histogram internally uses deltas
// between buckets rather than absolute counts (while FloatHistogram uses // between buckets rather than absolute counts (while FloatHistogram uses
// absolute counts directly). Go type parameters don't allow type // absolute counts directly). Go type parameters don't allow type
// specialization. Therefore, where special treatment of deltas between buckets // specialization. Therefore, where special treatment of deltas between buckets
// vs. absolute counts is important, this information has to be provided as a // vs. absolute counts is important, this information has to be provided as a
// separate boolean parameter "deltaBuckets" // separate boolean parameter "deltaBuckets"
type internalBucketCount interface { type InternalBucketCount interface {
float64 | int64 float64 | int64
} }
@ -86,7 +86,7 @@ type BucketIterator[BC BucketCount] interface {
// implementations, together with an implementation of the At method. This // implementations, together with an implementation of the At method. This
// iterator can be embedded in full implementations of BucketIterator to save on // iterator can be embedded in full implementations of BucketIterator to save on
// code replication. // code replication.
type baseBucketIterator[BC BucketCount, IBC internalBucketCount] struct { type baseBucketIterator[BC BucketCount, IBC InternalBucketCount] struct {
schema int32 schema int32
spans []Span spans []Span
buckets []IBC buckets []IBC
@ -121,7 +121,7 @@ func (b baseBucketIterator[BC, IBC]) At() Bucket[BC] {
// compactBuckets is a generic function used by both Histogram.Compact and // compactBuckets is a generic function used by both Histogram.Compact and
// FloatHistogram.Compact. Set deltaBuckets to true if the provided buckets are // FloatHistogram.Compact. Set deltaBuckets to true if the provided buckets are
// deltas. Set it to false if the buckets contain absolute counts. // deltas. Set it to false if the buckets contain absolute counts.
func compactBuckets[IBC internalBucketCount](buckets []IBC, spans []Span, maxEmptyBuckets int, deltaBuckets bool) ([]IBC, []Span) { func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmptyBuckets int, deltaBuckets bool) ([]IBC, []Span) {
// Fast path: If there are no empty buckets AND no offset in any span is // Fast path: If there are no empty buckets AND no offset in any span is
// <= maxEmptyBuckets AND no span has length 0, there is nothing to do and we can return // <= maxEmptyBuckets AND no span has length 0, there is nothing to do and we can return
// immediately. We check that first because it's cheap and presumably // immediately. We check that first because it's cheap and presumably
@ -327,6 +327,18 @@ func compactBuckets[IBC internalBucketCount](buckets []IBC, spans []Span, maxEmp
return buckets, spans return buckets, spans
} }
func bucketsMatch[IBC InternalBucketCount](b1, b2 []IBC) bool {
if len(b1) != len(b2) {
return false
}
for i, b := range b1 {
if b != b2[i] {
return false
}
}
return true
}
func getBound(idx, schema int32) float64 { func getBound(idx, schema int32) float64 {
// Here a bit of context about the behavior for the last bucket counting // Here a bit of context about the behavior for the last bucket counting
// regular numbers (called simply "last bucket" below) and the bucket // regular numbers (called simply "last bucket" below) and the bucket

View file

@ -250,18 +250,6 @@ func allEmptySpans(s []Span) bool {
return true return true
} }
func bucketsMatch(b1, b2 []int64) bool {
if len(b1) != len(b2) {
return false
}
for i, b := range b1 {
if b != b2[i] {
return false
}
}
return true
}
// Compact works like FloatHistogram.Compact. See there for detailed // Compact works like FloatHistogram.Compact. See there for detailed
// explanations. // explanations.
func (h *Histogram) Compact(maxEmptyBuckets int) *Histogram { func (h *Histogram) Compact(maxEmptyBuckets int) *Histogram {

View file

@ -411,6 +411,7 @@ func TestHistogramToFloat(t *testing.T) {
require.Equal(t, h.String(), fh.String()) require.Equal(t, h.String(), fh.String())
} }
// TestHistogramMatches tests both Histogram and FloatHistogram.
func TestHistogramMatches(t *testing.T) { func TestHistogramMatches(t *testing.T) {
h1 := Histogram{ h1 := Histogram{
Schema: 3, Schema: 3,
@ -430,14 +431,28 @@ func TestHistogramMatches(t *testing.T) {
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, NegativeBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
} }
equals := func(h1, h2 Histogram) {
require.True(t, h1.Equals(&h2))
require.True(t, h2.Equals(&h1))
h1f, h2f := h1.ToFloat(), h2.ToFloat()
require.True(t, h1f.Equals(h2f))
require.True(t, h2f.Equals(h1f))
}
notEquals := func(h1, h2 Histogram) {
require.False(t, h1.Equals(&h2))
require.False(t, h2.Equals(&h1))
h1f, h2f := h1.ToFloat(), h2.ToFloat()
require.False(t, h1f.Equals(h2f))
require.False(t, h2f.Equals(h1f))
}
h2 := h1.Copy() h2 := h1.Copy()
require.True(t, h1.Equals(h2)) equals(h1, *h2)
// Changed spans but same layout. // Changed spans but same layout.
h2.PositiveSpans = append(h2.PositiveSpans, Span{Offset: 5}) h2.PositiveSpans = append(h2.PositiveSpans, Span{Offset: 5})
h2.NegativeSpans = append(h2.NegativeSpans, Span{Offset: 2}) h2.NegativeSpans = append(h2.NegativeSpans, Span{Offset: 2})
require.True(t, h1.Equals(h2)) equals(h1, *h2)
require.True(t, h2.Equals(&h1))
// Adding empty spans in between. // Adding empty spans in between.
h2.PositiveSpans[1].Offset = 6 h2.PositiveSpans[1].Offset = 6
h2.PositiveSpans = []Span{ h2.PositiveSpans = []Span{
@ -455,58 +470,57 @@ func TestHistogramMatches(t *testing.T) {
h2.NegativeSpans[1], h2.NegativeSpans[1],
h2.NegativeSpans[2], h2.NegativeSpans[2],
} }
require.True(t, h1.Equals(h2)) equals(h1, *h2)
require.True(t, h2.Equals(&h1))
// All mismatches. // All mismatches.
require.False(t, h1.Equals(nil)) notEquals(h1, Histogram{})
h2.Schema = 1 h2.Schema = 1
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
h2 = h1.Copy() h2 = h1.Copy()
h2.Count++ h2.Count++
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
h2 = h1.Copy() h2 = h1.Copy()
h2.Sum++ h2.Sum++
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
h2 = h1.Copy() h2 = h1.Copy()
h2.ZeroThreshold++ h2.ZeroThreshold++
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
h2 = h1.Copy() h2 = h1.Copy()
h2.ZeroCount++ h2.ZeroCount++
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
// Changing value of buckets. // Changing value of buckets.
h2 = h1.Copy() h2 = h1.Copy()
h2.PositiveBuckets[len(h2.PositiveBuckets)-1]++ h2.PositiveBuckets[len(h2.PositiveBuckets)-1]++
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
h2 = h1.Copy() h2 = h1.Copy()
h2.NegativeBuckets[len(h2.NegativeBuckets)-1]++ h2.NegativeBuckets[len(h2.NegativeBuckets)-1]++
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
// Changing bucket layout. // Changing bucket layout.
h2 = h1.Copy() h2 = h1.Copy()
h2.PositiveSpans[1].Offset++ h2.PositiveSpans[1].Offset++
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
h2 = h1.Copy() h2 = h1.Copy()
h2.NegativeSpans[1].Offset++ h2.NegativeSpans[1].Offset++
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
// Adding an empty bucket. // Adding an empty bucket.
h2 = h1.Copy() h2 = h1.Copy()
h2.PositiveSpans[0].Offset-- h2.PositiveSpans[0].Offset--
h2.PositiveSpans[0].Length++ h2.PositiveSpans[0].Length++
h2.PositiveBuckets = append([]int64{0}, h2.PositiveBuckets...) h2.PositiveBuckets = append([]int64{0}, h2.PositiveBuckets...)
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
h2 = h1.Copy() h2 = h1.Copy()
h2.NegativeSpans[0].Offset-- h2.NegativeSpans[0].Offset--
h2.NegativeSpans[0].Length++ h2.NegativeSpans[0].Length++
h2.NegativeBuckets = append([]int64{0}, h2.NegativeBuckets...) h2.NegativeBuckets = append([]int64{0}, h2.NegativeBuckets...)
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
// Adding new bucket. // Adding new bucket.
h2 = h1.Copy() h2 = h1.Copy()
@ -515,14 +529,14 @@ func TestHistogramMatches(t *testing.T) {
Length: 1, Length: 1,
}) })
h2.PositiveBuckets = append(h2.PositiveBuckets, 1) h2.PositiveBuckets = append(h2.PositiveBuckets, 1)
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
h2 = h1.Copy() h2 = h1.Copy()
h2.NegativeSpans = append(h2.NegativeSpans, Span{ h2.NegativeSpans = append(h2.NegativeSpans, Span{
Offset: 1, Offset: 1,
Length: 1, Length: 1,
}) })
h2.NegativeBuckets = append(h2.NegativeBuckets, 1) h2.NegativeBuckets = append(h2.NegativeBuckets, 1)
require.False(t, h1.Equals(h2)) notEquals(h1, *h2)
} }
func TestHistogramCompact(t *testing.T) { func TestHistogramCompact(t *testing.T) {

View file

@ -3128,6 +3128,7 @@ func TestRangeQuery(t *testing.T) {
func TestSparseHistogramRate(t *testing.T) { func TestSparseHistogramRate(t *testing.T) {
// TODO(beorn7): Integrate histograms into the PromQL testing framework // TODO(beorn7): Integrate histograms into the PromQL testing framework
// and write more tests there. // and write more tests there.
// TODO(marctc): Add similar test for float histograms
test, err := NewTest(t, "") test, err := NewTest(t, "")
require.NoError(t, err) require.NoError(t, err)
defer test.Close() defer test.Close()
@ -3137,7 +3138,7 @@ func TestSparseHistogramRate(t *testing.T) {
app := test.Storage().Appender(context.TODO()) app := test.Storage().Appender(context.TODO())
for i, h := range tsdb.GenerateTestHistograms(100) { for i, h := range tsdb.GenerateTestHistograms(100) {
_, err := app.AppendHistogram(0, lbls, int64(i)*int64(15*time.Second/time.Millisecond), h) _, err := app.AppendHistogram(0, lbls, int64(i)*int64(15*time.Second/time.Millisecond), h, nil)
require.NoError(t, err) require.NoError(t, err)
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
@ -3169,6 +3170,7 @@ func TestSparseHistogramRate(t *testing.T) {
func TestSparseHistogram_HistogramCountAndSum(t *testing.T) { func TestSparseHistogram_HistogramCountAndSum(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework // TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there. // and write more tests there.
// TODO(marctc): Add similar test for float histograms
h := &histogram.Histogram{ h := &histogram.Histogram{
Count: 24, Count: 24,
ZeroCount: 4, ZeroCount: 4,
@ -3197,7 +3199,7 @@ func TestSparseHistogram_HistogramCountAndSum(t *testing.T) {
ts := int64(10 * time.Minute / time.Millisecond) ts := int64(10 * time.Minute / time.Millisecond)
app := test.Storage().Appender(context.TODO()) app := test.Storage().Appender(context.TODO())
_, err = app.AppendHistogram(0, lbls, ts, h) _, err = app.AppendHistogram(0, lbls, ts, h, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
@ -3233,6 +3235,7 @@ func TestSparseHistogram_HistogramCountAndSum(t *testing.T) {
func TestSparseHistogram_HistogramQuantile(t *testing.T) { func TestSparseHistogram_HistogramQuantile(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework // TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there. // and write more tests there.
// TODO(marctc): Add similar test for float histograms
type subCase struct { type subCase struct {
quantile string quantile string
value float64 value float64
@ -3434,7 +3437,7 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) {
ts := int64(i+1) * int64(10*time.Minute/time.Millisecond) ts := int64(i+1) * int64(10*time.Minute/time.Millisecond)
app := test.Storage().Appender(context.TODO()) app := test.Storage().Appender(context.TODO())
_, err = app.AppendHistogram(0, lbls, ts, c.h) _, err = app.AppendHistogram(0, lbls, ts, c.h, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
@ -3462,6 +3465,7 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) {
func TestSparseHistogram_HistogramFraction(t *testing.T) { func TestSparseHistogram_HistogramFraction(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework // TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there. // and write more tests there.
// TODO(marctc): Add similar test for float histograms
type subCase struct { type subCase struct {
lower, upper string lower, upper string
value float64 value float64
@ -3858,7 +3862,7 @@ func TestSparseHistogram_HistogramFraction(t *testing.T) {
ts := int64(i+1) * int64(10*time.Minute/time.Millisecond) ts := int64(i+1) * int64(10*time.Minute/time.Millisecond)
app := test.Storage().Appender(context.TODO()) app := test.Storage().Appender(context.TODO())
_, err = app.AppendHistogram(0, lbls, ts, c.h) _, err = app.AppendHistogram(0, lbls, ts, c.h, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
@ -3890,6 +3894,7 @@ func TestSparseHistogram_HistogramFraction(t *testing.T) {
func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) { func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework // TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there. // and write more tests there.
// TODO(marctc): Add similar test for float histograms
cases := []struct { cases := []struct {
histograms []histogram.Histogram histograms []histogram.Histogram
expected histogram.FloatHistogram expected histogram.FloatHistogram
@ -3988,7 +3993,7 @@ func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) {
for idx, h := range c.histograms { for idx, h := range c.histograms {
lbls := labels.FromStrings("__name__", seriesName, "idx", fmt.Sprintf("%d", idx)) lbls := labels.FromStrings("__name__", seriesName, "idx", fmt.Sprintf("%d", idx))
// Since we mutate h later, we need to create a copy here. // Since we mutate h later, we need to create a copy here.
_, err = app.AppendHistogram(0, lbls, ts, h.Copy()) _, err = app.AppendHistogram(0, lbls, ts, h.Copy(), nil)
require.NoError(t, err) require.NoError(t, err)
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())

View file

@ -42,7 +42,7 @@ func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.E
return 0, nil return 0, nil
} }
func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram) (storage.SeriesRef, error) { func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil return 0, nil
} }
@ -60,8 +60,9 @@ type sample struct {
} }
type histogramSample struct { type histogramSample struct {
t int64 t int64
h *histogram.Histogram h *histogram.Histogram
fh *histogram.FloatHistogram
} }
// collectResultAppender records all samples that were added through the appender. // collectResultAppender records all samples that were added through the appender.
@ -110,13 +111,13 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
return a.next.AppendExemplar(ref, l, e) return a.next.AppendExemplar(ref, l, e)
} }
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, t: t}) a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t})
if a.next == nil { if a.next == nil {
return 0, nil return 0, nil
} }
return a.next.AppendHistogram(ref, l, t, h) return a.next.AppendHistogram(ref, l, t, h, fh)
} }
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {

View file

@ -1541,6 +1541,7 @@ loop:
parsedTimestamp *int64 parsedTimestamp *int64
val float64 val float64
h *histogram.Histogram h *histogram.Histogram
fh *histogram.FloatHistogram
) )
if et, err = p.Next(); err != nil { if et, err = p.Next(); err != nil {
if err == io.EOF { if err == io.EOF {
@ -1568,8 +1569,7 @@ loop:
t := defTime t := defTime
if isHistogram { if isHistogram {
met, parsedTimestamp, h, _ = p.Histogram() met, parsedTimestamp, h, fh = p.Histogram()
// TODO: ingest float histograms in tsdb.
} else { } else {
met, parsedTimestamp, val = p.Series() met, parsedTimestamp, val = p.Series()
} }
@ -1636,7 +1636,9 @@ loop:
if isHistogram { if isHistogram {
if h != nil { if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h) ref, err = app.AppendHistogram(ref, lset, t, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
} }
} else { } else {
ref, err = app.Append(ref, lset, t, val) ref, err = app.Append(ref, lset, t, val)

View file

@ -174,14 +174,14 @@ func (f *fanoutAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exempl
return ref, nil return ref, nil
} }
func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (SeriesRef, error) { func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
ref, err := f.primary.AppendHistogram(ref, l, t, h) ref, err := f.primary.AppendHistogram(ref, l, t, h, fh)
if err != nil { if err != nil {
return ref, err return ref, err
} }
for _, appender := range f.secondaries { for _, appender := range f.secondaries {
if _, err := appender.AppendHistogram(ref, l, t, h); err != nil { if _, err := appender.AppendHistogram(ref, l, t, h, fh); err != nil {
return 0, err return 0, err
} }
} }

View file

@ -282,7 +282,7 @@ type HistogramAppender interface {
// For efficiency reasons, the histogram is passed as a // For efficiency reasons, the histogram is passed as a
// pointer. AppendHistogram won't mutate the histogram, but in turn // pointer. AppendHistogram won't mutate the histogram, but in turn
// depends on the caller to not mutate it either. // depends on the caller to not mutate it either.
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (SeriesRef, error) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
} }
// MetadataUpdater provides an interface for associating metadata to stored series. // MetadataUpdater provides an interface for associating metadata to stored series.

View file

@ -278,7 +278,7 @@ func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels,
return 0, nil return 0, nil
} }
func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, ts int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, ts int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
t.histograms++ t.histograms++
if ts > t.highestTimestamp { if ts > t.highestTimestamp {
t.highestTimestamp = ts t.highestTimestamp = ts

View file

@ -124,9 +124,10 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
} }
} }
// TODO(codesome): support float histograms.
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
hs := HistogramProtoToHistogram(hp) hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs) _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
if err != nil { if err != nil {
unwrappedErr := errors.Unwrap(err) unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil { if unwrappedErr == nil {

View file

@ -67,7 +67,7 @@ func TestRemoteWriteHandler(t *testing.T) {
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
h := HistogramProtoToHistogram(hp) h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, h}, appendable.histograms[k]) require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
k++ k++
} }
} }
@ -189,9 +189,10 @@ type mockExemplar struct {
} }
type mockHistogram struct { type mockHistogram struct {
l labels.Labels l labels.Labels
t int64 t int64
h *histogram.Histogram h *histogram.Histogram
fh *histogram.FloatHistogram
} }
func (m *mockAppendable) Appender(_ context.Context) storage.Appender { func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
@ -226,13 +227,13 @@ func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e
return 0, nil return 0, nil
} }
func (m *mockAppendable) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (m *mockAppendable) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if t < m.latestHistogram { if t < m.latestHistogram {
return 0, storage.ErrOutOfOrderSample return 0, storage.ErrOutOfOrderSample
} }
m.latestHistogram = t m.latestHistogram = t
m.histograms = append(m.histograms, mockHistogram{l, t, h}) m.histograms = append(m.histograms, mockHistogram{l, t, h, fh})
return 0, nil return 0, nil
} }

View file

@ -321,9 +321,10 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
lastType = typ lastType = typ
var ( var (
t int64 t int64
v float64 v float64
h *histogram.Histogram h *histogram.Histogram
fh *histogram.FloatHistogram
) )
switch typ { switch typ {
case chunkenc.ValFloat: case chunkenc.ValFloat:
@ -332,6 +333,9 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
case chunkenc.ValHistogram: case chunkenc.ValHistogram:
t, h = seriesIter.AtHistogram() t, h = seriesIter.AtHistogram()
app.AppendHistogram(t, h) app.AppendHistogram(t, h)
case chunkenc.ValFloatHistogram:
t, fh = seriesIter.AtFloatHistogram()
app.AppendFloatHistogram(t, fh)
default: default:
return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())} return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())}
} }
@ -397,7 +401,6 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, v float64,
case chunkenc.ValFloatHistogram: case chunkenc.ValFloatHistogram:
t, fh := iter.AtFloatHistogram() t, fh := iter.AtFloatHistogram()
result = append(result, newSampleFn(t, 0, nil, fh)) result = append(result, newSampleFn(t, 0, nil, fh))
} }
} }
} }

View file

@ -820,7 +820,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
return storage.SeriesRef(s.ref), nil return storage.SeriesRef(s.ref), nil
} }
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO: Add histogram support. // TODO: Add histogram support.
return 0, nil return 0, nil
} }

View file

@ -528,7 +528,10 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str
ref, err = app.Append(ref, lset, t, v) ref, err = app.Append(ref, lset, t, v)
case chunkenc.ValHistogram: case chunkenc.ValHistogram:
t, h := it.AtHistogram() t, h := it.AtHistogram()
ref, err = app.AppendHistogram(ref, lset, t, h) ref, err = app.AppendHistogram(ref, lset, t, h, nil)
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
default: default:
err = fmt.Errorf("unknown sample type %s", typ.String()) err = fmt.Errorf("unknown sample type %s", typ.String())
} }
@ -615,7 +618,7 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []storage.Series {
} }
// genHistogramSeries generates series of histogram samples with a given number of labels and values. // genHistogramSeries generates series of histogram samples with a given number of labels and values.
func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64) []storage.Series { func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64, floatHistogram bool) []storage.Series {
return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample { return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample {
h := &histogram.Histogram{ h := &histogram.Histogram{
Count: 5 + uint64(ts*4), Count: 5 + uint64(ts*4),
@ -629,12 +632,15 @@ func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64) []s
}, },
PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0},
} }
if floatHistogram {
return sample{t: ts, fh: h.ToFloat()}
}
return sample{t: ts, h: h} return sample{t: ts, h: h}
}) })
} }
// genHistogramAndFloatSeries generates series of mixed histogram and float64 samples with a given number of labels and values. // genHistogramAndFloatSeries generates series of mixed histogram and float64 samples with a given number of labels and values.
func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step int64) []storage.Series { func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step int64, floatHistogram bool) []storage.Series {
floatSample := false floatSample := false
count := 0 count := 0
return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample { return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample {
@ -655,7 +661,11 @@ func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step in
}, },
PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0},
} }
s = sample{t: ts, h: h} if floatHistogram {
s = sample{t: ts, fh: h.ToFloat()}
} else {
s = sample{t: ts, h: h}
}
} }
if count%5 == 0 { if count%5 == 0 {

View file

@ -1298,105 +1298,114 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
} }
func TestHeadCompactionWithHistograms(t *testing.T) { func TestHeadCompactionWithHistograms(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, false, false) for _, floatTest := range []bool{true, false} {
require.NoError(t, head.Init(0)) t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) {
t.Cleanup(func() { head, _ := newTestHead(t, DefaultBlockDuration, false, false)
require.NoError(t, head.Close()) require.NoError(t, head.Init(0))
}) t.Cleanup(func() {
require.NoError(t, head.Close())
})
minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() }
ctx := context.Background() ctx := context.Background()
appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) {
t.Helper() t.Helper()
app := head.Appender(ctx) app := head.Appender(ctx)
for tsMinute := from; tsMinute <= to; tsMinute++ { for tsMinute := from; tsMinute <= to; tsMinute++ {
_, err := app.AppendHistogram(0, lbls, minute(tsMinute), h) var err error
if floatTest {
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat())
*exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()})
} else {
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), h, nil)
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
}
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
appendFloat := func(lbls labels.Labels, from, to int, exp *[]tsdbutil.Sample) {
t.Helper()
app := head.Appender(ctx)
for tsMinute := from; tsMinute <= to; tsMinute++ {
_, err := app.Append(0, lbls, minute(tsMinute), float64(tsMinute))
require.NoError(t, err)
*exp = append(*exp, sample{t: minute(tsMinute), v: float64(tsMinute)})
}
require.NoError(t, app.Commit())
}
var (
series1 = labels.FromStrings("foo", "bar1")
series2 = labels.FromStrings("foo", "bar2")
series3 = labels.FromStrings("foo", "bar3")
series4 = labels.FromStrings("foo", "bar4")
exp1, exp2, exp3, exp4 []tsdbutil.Sample
)
h := &histogram.Histogram{
Count: 11,
ZeroCount: 4,
ZeroThreshold: 0.001,
Sum: 35.5,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
},
PositiveBuckets: []int64{1, 1, -1, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 1},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{1, 2, -1},
}
// Series with only histograms.
appendHistogram(series1, 100, 105, h, &exp1)
// Series starting with float and then getting histograms.
appendFloat(series2, 100, 102, &exp2)
appendHistogram(series2, 103, 105, h.Copy(), &exp2)
appendFloat(series2, 106, 107, &exp2)
appendHistogram(series2, 108, 109, h.Copy(), &exp2)
// Series starting with histogram and then getting float.
appendHistogram(series3, 101, 103, h.Copy(), &exp3)
appendFloat(series3, 104, 106, &exp3)
appendHistogram(series3, 107, 108, h.Copy(), &exp3)
appendFloat(series3, 109, 110, &exp3)
// A float only series.
appendFloat(series4, 100, 102, &exp4)
// Compaction.
mint := head.MinTime()
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err) require.NoError(t, err)
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
}
require.NoError(t, app.Commit())
}
appendFloat := func(lbls labels.Labels, from, to int, exp *[]tsdbutil.Sample) {
t.Helper()
app := head.Appender(ctx)
for tsMinute := from; tsMinute <= to; tsMinute++ {
_, err := app.Append(0, lbls, minute(tsMinute), float64(tsMinute))
require.NoError(t, err) require.NoError(t, err)
*exp = append(*exp, sample{t: minute(tsMinute), v: float64(tsMinute)}) require.NotEqual(t, ulid.ULID{}, id)
}
require.NoError(t, app.Commit()) // Open the block and query it and check the histograms.
block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, block.Close())
})
q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime())
require.NoError(t, err)
actHists := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, map[string][]tsdbutil.Sample{
series1.String(): exp1,
series2.String(): exp2,
series3.String(): exp3,
series4.String(): exp4,
}, actHists)
})
} }
var (
series1 = labels.FromStrings("foo", "bar1")
series2 = labels.FromStrings("foo", "bar2")
series3 = labels.FromStrings("foo", "bar3")
series4 = labels.FromStrings("foo", "bar4")
exp1, exp2, exp3, exp4 []tsdbutil.Sample
)
h := &histogram.Histogram{
Count: 11,
ZeroCount: 4,
ZeroThreshold: 0.001,
Sum: 35.5,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
},
PositiveBuckets: []int64{1, 1, -1, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 1},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{1, 2, -1},
}
// Series with only histograms.
appendHistogram(series1, 100, 105, h, &exp1)
// Series starting with float and then getting histograms.
appendFloat(series2, 100, 102, &exp2)
appendHistogram(series2, 103, 105, h.Copy(), &exp2)
appendFloat(series2, 106, 107, &exp2)
appendHistogram(series2, 108, 109, h.Copy(), &exp2)
// Series starting with histogram and then getting float.
appendHistogram(series3, 101, 103, h.Copy(), &exp3)
appendFloat(series3, 104, 106, &exp3)
appendHistogram(series3, 107, 108, h.Copy(), &exp3)
appendFloat(series3, 109, 110, &exp3)
// A float only series.
appendFloat(series4, 100, 102, &exp4)
// Compaction.
mint := head.MinTime()
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
require.NoError(t, err)
require.NotEqual(t, ulid.ULID{}, id)
// Open the block and query it and check the histograms.
block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, block.Close())
})
q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime())
require.NoError(t, err)
actHists := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, map[string][]tsdbutil.Sample{
series1.String(): exp1,
series2.String(): exp2,
series3.String(): exp3,
series4.String(): exp4,
}, actHists)
} }
// Depending on numSeriesPerSchema, it can take few gigs of memory; // Depending on numSeriesPerSchema, it can take few gigs of memory;
@ -1511,7 +1520,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
) )
for i := 0; i < numHistograms; i++ { for i := 0; i < numHistograms; i++ {
ts := int64(i) * timeStep ts := int64(i) * timeStep
ref, err = sparseApp.AppendHistogram(ref, ah.baseLabels, ts, ah.hists[i]) ref, err = sparseApp.AppendHistogram(ref, ah.baseLabels, ts, ah.hists[i], nil)
require.NoError(t, err) require.NoError(t, err)
} }
} }

View file

@ -108,6 +108,9 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
case chunkenc.ValHistogram: case chunkenc.ValHistogram:
ts, h := it.AtHistogram() ts, h := it.AtHistogram()
samples = append(samples, sample{t: ts, h: h}) samples = append(samples, sample{t: ts, h: h})
case chunkenc.ValFloatHistogram:
ts, fh := it.AtFloatHistogram()
samples = append(samples, sample{t: ts, fh: fh})
default: default:
t.Fatalf("unknown sample type in query %s", typ.String()) t.Fatalf("unknown sample type in query %s", typ.String())
} }
@ -465,7 +468,7 @@ Outer:
} }
} }
func TestAmendDatapointCausesError(t *testing.T) { func TestAmendHistogramDatapointCausesError(t *testing.T) {
db := openTestDB(t, nil, nil) db := openTestDB(t, nil, nil)
defer func() { defer func() {
require.NoError(t, db.Close()) require.NoError(t, db.Close())
@ -496,17 +499,32 @@ func TestAmendDatapointCausesError(t *testing.T) {
}, },
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
} }
fh := h.ToFloat()
app = db.Appender(ctx) app = db.Appender(ctx)
_, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy()) _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy(), nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
app = db.Appender(ctx) app = db.Appender(ctx)
_, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy()) _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy(), nil)
require.NoError(t, err) require.NoError(t, err)
h.Schema = 2 h.Schema = 2
_, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy()) _, err = app.AppendHistogram(0, labels.FromStrings("a", "c"), 0, h.Copy(), nil)
require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err)
require.NoError(t, app.Rollback())
// Float histogram.
app = db.Appender(ctx)
_, err = app.AppendHistogram(0, labels.FromStrings("a", "d"), 0, nil, fh.Copy())
require.NoError(t, err)
require.NoError(t, app.Commit())
app = db.Appender(ctx)
_, err = app.AppendHistogram(0, labels.FromStrings("a", "d"), 0, nil, fh.Copy())
require.NoError(t, err)
fh.Schema = 2
_, err = app.AppendHistogram(0, labels.FromStrings("a", "d"), 0, nil, fh.Copy())
require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err) require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err)
require.NoError(t, app.Rollback()) require.NoError(t, app.Rollback())
} }
@ -5805,6 +5823,16 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
} }
func TestHistogramAppendAndQuery(t *testing.T) { func TestHistogramAppendAndQuery(t *testing.T) {
t.Run("integer histograms", func(t *testing.T) {
testHistogramAppendAndQueryHelper(t, false)
})
t.Run("float histograms", func(t *testing.T) {
testHistogramAppendAndQueryHelper(t, true)
})
}
func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
t.Helper()
db := openTestDB(t, nil, nil) db := openTestDB(t, nil, nil)
minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() }
t.Cleanup(func() { t.Cleanup(func() {
@ -5814,11 +5842,17 @@ func TestHistogramAppendAndQuery(t *testing.T) {
ctx := context.Background() ctx := context.Background()
appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) {
t.Helper() t.Helper()
var err error
app := db.Appender(ctx) app := db.Appender(ctx)
_, err := app.AppendHistogram(0, lbls, minute(tsMinute), h) if floatHistogram {
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat())
*exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()})
} else {
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), h.Copy(), nil)
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
}
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
} }
appendFloat := func(lbls labels.Labels, tsMinute int, val float64, exp *[]tsdbutil.Sample) { appendFloat := func(lbls labels.Labels, tsMinute int, val float64, exp *[]tsdbutil.Sample) {
t.Helper() t.Helper()
@ -5867,23 +5901,23 @@ func TestHistogramAppendAndQuery(t *testing.T) {
t.Run("series with only histograms", func(t *testing.T) { t.Run("series with only histograms", func(t *testing.T) {
h := baseH.Copy() // This is shared across all sub tests. h := baseH.Copy() // This is shared across all sub tests.
appendHistogram(series1, 100, h.Copy(), &exp1) appendHistogram(series1, 100, h, &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
h.PositiveBuckets[0]++ h.PositiveBuckets[0]++
h.NegativeBuckets[0] += 2 h.NegativeBuckets[0] += 2
h.Count += 10 h.Count += 10
appendHistogram(series1, 101, h.Copy(), &exp1) appendHistogram(series1, 101, h, &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
t.Run("changing schema", func(t *testing.T) { t.Run("changing schema", func(t *testing.T) {
h.Schema = 2 h.Schema = 2
appendHistogram(series1, 102, h.Copy(), &exp1) appendHistogram(series1, 102, h, &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// Schema back to old. // Schema back to old.
h.Schema = 1 h.Schema = 1
appendHistogram(series1, 103, h.Copy(), &exp1) appendHistogram(series1, 103, h, &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
}) })
@ -5894,10 +5928,17 @@ func TestHistogramAppendAndQuery(t *testing.T) {
// because the chunk will be re-encoded. So this forces us to modify // because the chunk will be re-encoded. So this forces us to modify
// the last histogram in exp1 so when we query we get the expected // the last histogram in exp1 so when we query we get the expected
// results. // results.
lh := exp1[len(exp1)-1].H().Copy() if floatHistogram {
lh.PositiveSpans[1].Length++ lh := exp1[len(exp1)-1].FH().Copy()
lh.PositiveBuckets = append(lh.PositiveBuckets, -2) // -2 makes the last bucket 0. lh.PositiveSpans[1].Length++
exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh} lh.PositiveBuckets = append(lh.PositiveBuckets, 0)
exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), fh: lh}
} else {
lh := exp1[len(exp1)-1].H().Copy()
lh.PositiveSpans[1].Length++
lh.PositiveBuckets = append(lh.PositiveBuckets, -2) // -2 makes the last bucket 0.
exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh}
}
// This histogram with new bucket at the end causes the re-encoding of the previous histogram. // This histogram with new bucket at the end causes the re-encoding of the previous histogram.
// Hence the previous histogram is recoded into this new layout. // Hence the previous histogram is recoded into this new layout.
@ -5905,23 +5946,37 @@ func TestHistogramAppendAndQuery(t *testing.T) {
h.PositiveSpans[1].Length++ h.PositiveSpans[1].Length++
h.PositiveBuckets = append(h.PositiveBuckets, 1) h.PositiveBuckets = append(h.PositiveBuckets, 1)
h.Count += 3 h.Count += 3
appendHistogram(series1, 104, h.Copy(), &exp1) appendHistogram(series1, 104, h, &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// Because of the previous two histograms being on the active chunk, // Because of the previous two histograms being on the active chunk,
// and the next append is only adding a new bucket, the active chunk // and the next append is only adding a new bucket, the active chunk
// will be re-encoded to the new layout. // will be re-encoded to the new layout.
lh = exp1[len(exp1)-2].H().Copy() if floatHistogram {
lh.PositiveSpans[0].Length++ lh := exp1[len(exp1)-2].FH().Copy()
lh.PositiveSpans[1].Offset-- lh.PositiveSpans[0].Length++
lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} lh.PositiveSpans[1].Offset--
exp1[len(exp1)-2] = sample{t: exp1[len(exp1)-2].T(), h: lh} lh.PositiveBuckets = []float64{2, 3, 0, 2, 2, 0}
exp1[len(exp1)-2] = sample{t: exp1[len(exp1)-2].T(), fh: lh}
lh = exp1[len(exp1)-1].H().Copy() lh = exp1[len(exp1)-1].FH().Copy()
lh.PositiveSpans[0].Length++ lh.PositiveSpans[0].Length++
lh.PositiveSpans[1].Offset-- lh.PositiveSpans[1].Offset--
lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} lh.PositiveBuckets = []float64{2, 3, 0, 2, 2, 3}
exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh} exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), fh: lh}
} else {
lh := exp1[len(exp1)-2].H().Copy()
lh.PositiveSpans[0].Length++
lh.PositiveSpans[1].Offset--
lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2}
exp1[len(exp1)-2] = sample{t: exp1[len(exp1)-2].T(), h: lh}
lh = exp1[len(exp1)-1].H().Copy()
lh.PositiveSpans[0].Length++
lh.PositiveSpans[1].Offset--
lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1}
exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh}
}
// Now we add the new buckets in between. Empty bucket is again not present for the old histogram. // Now we add the new buckets in between. Empty bucket is again not present for the old histogram.
h.PositiveSpans[0].Length++ h.PositiveSpans[0].Length++
@ -5929,26 +5984,39 @@ func TestHistogramAppendAndQuery(t *testing.T) {
h.Count += 3 h.Count += 3
// {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1} // {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1}
h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...) h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...)
appendHistogram(series1, 105, h.Copy(), &exp1) appendHistogram(series1, 105, h, &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// We add 4 more histograms to clear out the buffer and see the re-encoded histograms. // We add 4 more histograms to clear out the buffer and see the re-encoded histograms.
appendHistogram(series1, 106, h.Copy(), &exp1) appendHistogram(series1, 106, h, &exp1)
appendHistogram(series1, 107, h.Copy(), &exp1) appendHistogram(series1, 107, h, &exp1)
appendHistogram(series1, 108, h.Copy(), &exp1) appendHistogram(series1, 108, h, &exp1)
appendHistogram(series1, 109, h.Copy(), &exp1) appendHistogram(series1, 109, h, &exp1)
// Update the expected histograms to reflect the re-encoding. // Update the expected histograms to reflect the re-encoding.
l := len(exp1) if floatHistogram {
h7 := exp1[l-7].H() l := len(exp1)
h7.PositiveSpans = exp1[l-1].H().PositiveSpans h7 := exp1[l-7].FH()
h7.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} // -3 and -2 are the empty buckets. h7.PositiveSpans = exp1[l-1].FH().PositiveSpans
exp1[l-7] = sample{t: exp1[l-7].T(), h: h7} h7.PositiveBuckets = []float64{2, 3, 0, 2, 2, 0}
exp1[l-7] = sample{t: exp1[l-7].T(), fh: h7}
h6 := exp1[l-6].H() h6 := exp1[l-6].FH()
h6.PositiveSpans = exp1[l-1].H().PositiveSpans h6.PositiveSpans = exp1[l-1].FH().PositiveSpans
h6.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} // -3 is the empty bucket. h6.PositiveBuckets = []float64{2, 3, 0, 2, 2, 3}
exp1[l-6] = sample{t: exp1[l-6].T(), h: h6} exp1[l-6] = sample{t: exp1[l-6].T(), fh: h6}
} else {
l := len(exp1)
h7 := exp1[l-7].H()
h7.PositiveSpans = exp1[l-1].H().PositiveSpans
h7.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} // -3 and -2 are the empty buckets.
exp1[l-7] = sample{t: exp1[l-7].T(), h: h7}
h6 := exp1[l-6].H()
h6.PositiveSpans = exp1[l-1].H().PositiveSpans
h6.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} // -3 is the empty bucket.
exp1[l-6] = sample{t: exp1[l-6].T(), h: h6}
}
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
}) })
@ -5956,7 +6024,7 @@ func TestHistogramAppendAndQuery(t *testing.T) {
t.Run("buckets disappearing", func(t *testing.T) { t.Run("buckets disappearing", func(t *testing.T) {
h.PositiveSpans[1].Length-- h.PositiveSpans[1].Length--
h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1] h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1]
appendHistogram(series1, 110, h.Copy(), &exp1) appendHistogram(series1, 110, h, &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
}) })
}) })
@ -5968,9 +6036,9 @@ func TestHistogramAppendAndQuery(t *testing.T) {
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
h := baseH.Copy() h := baseH.Copy()
appendHistogram(series2, 103, h.Copy(), &exp2) appendHistogram(series2, 103, h, &exp2)
appendHistogram(series2, 104, h.Copy(), &exp2) appendHistogram(series2, 104, h, &exp2)
appendHistogram(series2, 105, h.Copy(), &exp2) appendHistogram(series2, 105, h, &exp2)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
// Switching between float and histograms again. // Switching between float and histograms again.
@ -5978,16 +6046,16 @@ func TestHistogramAppendAndQuery(t *testing.T) {
appendFloat(series2, 107, 107, &exp2) appendFloat(series2, 107, 107, &exp2)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
appendHistogram(series2, 108, h.Copy(), &exp2) appendHistogram(series2, 108, h, &exp2)
appendHistogram(series2, 109, h.Copy(), &exp2) appendHistogram(series2, 109, h, &exp2)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
}) })
t.Run("series starting with histogram and then getting float", func(t *testing.T) { t.Run("series starting with histogram and then getting float", func(t *testing.T) {
h := baseH.Copy() h := baseH.Copy()
appendHistogram(series3, 101, h.Copy(), &exp3) appendHistogram(series3, 101, h, &exp3)
appendHistogram(series3, 102, h.Copy(), &exp3) appendHistogram(series3, 102, h, &exp3)
appendHistogram(series3, 103, h.Copy(), &exp3) appendHistogram(series3, 103, h, &exp3)
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
appendFloat(series3, 104, 100, &exp3) appendFloat(series3, 104, 100, &exp3)
@ -5996,8 +6064,8 @@ func TestHistogramAppendAndQuery(t *testing.T) {
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
// Switching between histogram and float again. // Switching between histogram and float again.
appendHistogram(series3, 107, h.Copy(), &exp3) appendHistogram(series3, 107, h, &exp3)
appendHistogram(series3, 108, h.Copy(), &exp3) appendHistogram(series3, 108, h, &exp3)
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
appendFloat(series3, 109, 106, &exp3) appendFloat(series3, 109, 106, &exp3)
@ -6052,6 +6120,11 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
case chunkenc.ValHistogram: case chunkenc.ValHistogram:
ts, h := it.AtHistogram() ts, h := it.AtHistogram()
slice = append(slice, sample{t: ts, h: h}) slice = append(slice, sample{t: ts, h: h})
case chunkenc.ValFloatHistogram:
ts, h := it.AtFloatHistogram()
slice = append(slice, sample{t: ts, fh: h})
default:
t.Fatalf("unexpected sample value type %d", typ)
} }
} }
sort.Slice(slice, func(i, j int) bool { sort.Slice(slice, func(i, j int) bool {
@ -6088,63 +6161,67 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
require.Equal(t, exp, res) require.Equal(t, exp, res)
} }
t.Run("serial blocks with only histograms", func(t *testing.T) { for _, floatHistogram := range []bool{true} {
testBlockQuerying(t, t.Run(fmt.Sprintf("floatHistogram=%t", floatHistogram), func(t *testing.T) {
genHistogramSeries(10, 5, minute(0), minute(119), minute(1)), t.Run("serial blocks with only histograms", func(t *testing.T) {
genHistogramSeries(10, 5, minute(120), minute(239), minute(1)), testBlockQuerying(t,
genHistogramSeries(10, 5, minute(240), minute(359), minute(1)), genHistogramSeries(10, 5, minute(0), minute(119), minute(1), floatHistogram),
) genHistogramSeries(10, 5, minute(120), minute(239), minute(1), floatHistogram),
}) genHistogramSeries(10, 5, minute(240), minute(359), minute(1), floatHistogram),
)
})
t.Run("serial blocks with either histograms or floats in a block and not both", func(t *testing.T) { t.Run("serial blocks with either histograms or floats in a block and not both", func(t *testing.T) {
testBlockQuerying(t, testBlockQuerying(t,
genHistogramSeries(10, 5, minute(0), minute(119), minute(1)), genHistogramSeries(10, 5, minute(0), minute(119), minute(1), floatHistogram),
genSeriesFromSampleGenerator(10, 5, minute(120), minute(239), minute(1), func(ts int64) tsdbutil.Sample { genSeriesFromSampleGenerator(10, 5, minute(120), minute(239), minute(1), func(ts int64) tsdbutil.Sample {
return sample{t: ts, v: rand.Float64()} return sample{t: ts, v: rand.Float64()}
}), }),
genHistogramSeries(10, 5, minute(240), minute(359), minute(1)), genHistogramSeries(10, 5, minute(240), minute(359), minute(1), floatHistogram),
) )
}) })
t.Run("serial blocks with mix of histograms and float64", func(t *testing.T) { t.Run("serial blocks with mix of histograms and float64", func(t *testing.T) {
testBlockQuerying(t, testBlockQuerying(t,
genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(1)), genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(1), floatHistogram),
genHistogramSeries(10, 5, minute(61), minute(120), minute(1)), genHistogramSeries(10, 5, minute(61), minute(120), minute(1), floatHistogram),
genHistogramAndFloatSeries(10, 5, minute(121), minute(180), minute(1)), genHistogramAndFloatSeries(10, 5, minute(121), minute(180), minute(1), floatHistogram),
genSeriesFromSampleGenerator(10, 5, minute(181), minute(240), minute(1), func(ts int64) tsdbutil.Sample { genSeriesFromSampleGenerator(10, 5, minute(181), minute(240), minute(1), func(ts int64) tsdbutil.Sample {
return sample{t: ts, v: rand.Float64()} return sample{t: ts, v: rand.Float64()}
}), }),
) )
}) })
t.Run("overlapping blocks with only histograms", func(t *testing.T) { t.Run("overlapping blocks with only histograms", func(t *testing.T) {
testBlockQuerying(t, testBlockQuerying(t,
genHistogramSeries(10, 5, minute(0), minute(120), minute(3)), genHistogramSeries(10, 5, minute(0), minute(120), minute(3), floatHistogram),
genHistogramSeries(10, 5, minute(1), minute(120), minute(3)), genHistogramSeries(10, 5, minute(1), minute(120), minute(3), floatHistogram),
genHistogramSeries(10, 5, minute(2), minute(120), minute(3)), genHistogramSeries(10, 5, minute(2), minute(120), minute(3), floatHistogram),
) )
}) })
t.Run("overlapping blocks with only histograms and only float in a series", func(t *testing.T) { t.Run("overlapping blocks with only histograms and only float in a series", func(t *testing.T) {
testBlockQuerying(t, testBlockQuerying(t,
genHistogramSeries(10, 5, minute(0), minute(120), minute(3)), genHistogramSeries(10, 5, minute(0), minute(120), minute(3), floatHistogram),
genSeriesFromSampleGenerator(10, 5, minute(1), minute(120), minute(3), func(ts int64) tsdbutil.Sample { genSeriesFromSampleGenerator(10, 5, minute(1), minute(120), minute(3), func(ts int64) tsdbutil.Sample {
return sample{t: ts, v: rand.Float64()} return sample{t: ts, v: rand.Float64()}
}), }),
genHistogramSeries(10, 5, minute(2), minute(120), minute(3)), genHistogramSeries(10, 5, minute(2), minute(120), minute(3), floatHistogram),
) )
}) })
t.Run("overlapping blocks with mix of histograms and float64", func(t *testing.T) { t.Run("overlapping blocks with mix of histograms and float64", func(t *testing.T) {
testBlockQuerying(t, testBlockQuerying(t,
genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(3)), genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(3), floatHistogram),
genHistogramSeries(10, 5, minute(46), minute(100), minute(3)), genHistogramSeries(10, 5, minute(46), minute(100), minute(3), floatHistogram),
genHistogramAndFloatSeries(10, 5, minute(89), minute(140), minute(3)), genHistogramAndFloatSeries(10, 5, minute(89), minute(140), minute(3), floatHistogram),
genSeriesFromSampleGenerator(10, 5, minute(126), minute(200), minute(3), func(ts int64) tsdbutil.Sample { genSeriesFromSampleGenerator(10, 5, minute(126), minute(200), minute(3), func(ts int64) tsdbutil.Sample {
return sample{t: ts, v: rand.Float64()} return sample{t: ts, v: rand.Float64()}
}), }),
) )
}) })
})
}
} }
func TestNativeHistogramFlag(t *testing.T) { func TestNativeHistogramFlag(t *testing.T) {
@ -6172,16 +6249,22 @@ func TestNativeHistogramFlag(t *testing.T) {
app := db.Appender(context.Background()) app := db.Appender(context.Background())
// Disabled by default. // Disabled by default.
_, err = app.AppendHistogram(0, l, 100, h) _, err = app.AppendHistogram(0, l, 100, h, nil)
require.Equal(t, storage.ErrNativeHistogramsDisabled, err)
_, err = app.AppendHistogram(0, l, 105, nil, h.ToFloat())
require.Equal(t, storage.ErrNativeHistogramsDisabled, err) require.Equal(t, storage.ErrNativeHistogramsDisabled, err)
// Enable and append. // Enable and append.
db.EnableNativeHistograms() db.EnableNativeHistograms()
_, err = app.AppendHistogram(0, l, 200, h) _, err = app.AppendHistogram(0, l, 200, h, nil)
require.NoError(t, err)
_, err = app.AppendHistogram(0, l, 205, nil, h.ToFloat())
require.NoError(t, err) require.NoError(t, err)
db.DisableNativeHistograms() db.DisableNativeHistograms()
_, err = app.AppendHistogram(0, l, 300, h) _, err = app.AppendHistogram(0, l, 300, h, nil)
require.Equal(t, storage.ErrNativeHistogramsDisabled, err)
_, err = app.AppendHistogram(0, l, 305, nil, h.ToFloat())
require.Equal(t, storage.ErrNativeHistogramsDisabled, err) require.Equal(t, storage.ErrNativeHistogramsDisabled, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
@ -6189,5 +6272,7 @@ func TestNativeHistogramFlag(t *testing.T) {
q, err := db.Querier(context.Background(), math.MinInt, math.MaxInt64) q, err := db.Querier(context.Background(), math.MinInt, math.MaxInt64)
require.NoError(t, err) require.NoError(t, err)
act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Equal(t, map[string][]tsdbutil.Sample{l.String(): {sample{t: 200, h: h}}}, act) require.Equal(t, map[string][]tsdbutil.Sample{
l.String(): {sample{t: 200, h: h}, sample{t: 205, fh: h.ToFloat()}},
}, act)
} }

View file

@ -74,19 +74,20 @@ type Head struct {
// This should be typecasted to chunks.ChunkDiskMapperRef after loading. // This should be typecasted to chunks.ChunkDiskMapperRef after loading.
minOOOMmapRef atomic.Uint64 minOOOMmapRef atomic.Uint64
metrics *headMetrics metrics *headMetrics
opts *HeadOptions opts *HeadOptions
wal, wbl *wlog.WL wal, wbl *wlog.WL
exemplarMetrics *ExemplarMetrics exemplarMetrics *ExemplarMetrics
exemplars ExemplarStorage exemplars ExemplarStorage
logger log.Logger logger log.Logger
appendPool sync.Pool appendPool sync.Pool
exemplarsPool sync.Pool exemplarsPool sync.Pool
histogramsPool sync.Pool histogramsPool sync.Pool
metadataPool sync.Pool floatHistogramsPool sync.Pool
seriesPool sync.Pool metadataPool sync.Pool
bytesPool sync.Pool seriesPool sync.Pool
memChunkPool sync.Pool bytesPool sync.Pool
memChunkPool sync.Pool
// All series addressable by their ID or hash. // All series addressable by their ID or hash.
series *stripeSeries series *stripeSeries
@ -1850,7 +1851,8 @@ type memSeries struct {
lastValue float64 lastValue float64
// We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates. // We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates.
lastHistogramValue *histogram.Histogram lastHistogramValue *histogram.Histogram
lastFloatHistogramValue *histogram.FloatHistogram
// Current appender for the head chunk. Set when a new head chunk is cut. // Current appender for the head chunk. Set when a new head chunk is cut.
// It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit // It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit
@ -2043,3 +2045,22 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
return r return r
} }
func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
for i := 0; i < n; i++ {
r = append(r, &histogram.FloatHistogram{
Count: 5 + float64(i*4),
ZeroCount: 2 + float64(i),
ZeroThreshold: 0.001,
Sum: 18.4 * float64(i+1),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)},
})
}
return r
}

View file

@ -68,14 +68,14 @@ func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e
return a.app.AppendExemplar(ref, l, e) return a.app.AppendExemplar(ref, l, e)
} }
func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if a.app != nil { if a.app != nil {
return a.app.AppendHistogram(ref, l, t, h) return a.app.AppendHistogram(ref, l, t, h, fh)
} }
a.head.initTime(t) a.head.initTime(t)
a.app = a.head.appender() a.app = a.head.appender()
return a.app.AppendHistogram(ref, l, t, h) return a.app.AppendHistogram(ref, l, t, h, fh)
} }
func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
@ -156,6 +156,7 @@ func (h *Head) appender() *headAppender {
sampleSeries: h.getSeriesBuffer(), sampleSeries: h.getSeriesBuffer(),
exemplars: exemplarsBuf, exemplars: exemplarsBuf,
histograms: h.getHistogramBuffer(), histograms: h.getHistogramBuffer(),
floatHistograms: h.getFloatHistogramBuffer(),
metadata: h.getMetadataBuffer(), metadata: h.getMetadataBuffer(),
appendID: appendID, appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow, cleanupAppendIDsBelow: cleanupAppendIDsBelow,
@ -236,6 +237,19 @@ func (h *Head) putHistogramBuffer(b []record.RefHistogramSample) {
h.histogramsPool.Put(b[:0]) h.histogramsPool.Put(b[:0])
} }
func (h *Head) getFloatHistogramBuffer() []record.RefFloatHistogramSample {
b := h.floatHistogramsPool.Get()
if b == nil {
return make([]record.RefFloatHistogramSample, 0, 512)
}
return b.([]record.RefFloatHistogramSample)
}
func (h *Head) putFloatHistogramBuffer(b []record.RefFloatHistogramSample) {
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h.floatHistogramsPool.Put(b[:0])
}
func (h *Head) getMetadataBuffer() []record.RefMetadata { func (h *Head) getMetadataBuffer() []record.RefMetadata {
b := h.metadataPool.Get() b := h.metadataPool.Get()
if b == nil { if b == nil {
@ -287,14 +301,16 @@ type headAppender struct {
headMaxt int64 // We track it here to not take the lock for every sample appended. headMaxt int64 // We track it here to not take the lock for every sample appended.
oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample.
series []record.RefSeries // New series held by this appender. series []record.RefSeries // New series held by this appender.
samples []record.RefSample // New float samples held by this appender. samples []record.RefSample // New float samples held by this appender.
exemplars []exemplarWithSeriesRef // New exemplars held by this appender. sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). histograms []record.RefHistogramSample // New histogram samples held by this appender.
histograms []record.RefHistogramSample // New histogram samples held by this appender. histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
metadata []record.RefMetadata // New metadata held by this appender. floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. metadata []record.RefMetadata // New metadata held by this appender.
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
appendID, cleanupAppendIDsBelow uint64 appendID, cleanupAppendIDsBelow uint64
closed bool closed bool
@ -335,7 +351,8 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
} }
if value.IsStaleNaN(v) && s.isHistogramSeries { if value.IsStaleNaN(v) && s.isHistogramSeries {
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}) // TODO(marctc): do we have do to the same for float histograms?
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil)
} }
s.Lock() s.Lock()
@ -439,6 +456,28 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error {
return nil return nil
} }
// appendableFloatHistogram checks whether the given sample is valid for appending to the series.
func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram) error {
c := s.head()
if c == nil {
return nil
}
if t > c.maxTime {
return nil
}
if t < c.maxTime {
return storage.ErrOutOfOrderSample
}
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if !fh.Equals(s.lastFloatHistogramValue) {
return storage.ErrDuplicateSampleForTimestamp
}
return nil
}
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't // AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset validity checks that Append does. // use getOrCreate or make any of the lset validity checks that Append does.
func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
@ -476,7 +515,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
return storage.SeriesRef(s.ref), nil return storage.SeriesRef(s.ref), nil
} }
func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if !a.head.opts.EnableNativeHistograms.Load() { if !a.head.opts.EnableNativeHistograms.Load() {
return 0, storage.ErrNativeHistogramsDisabled return 0, storage.ErrNativeHistogramsDisabled
} }
@ -486,8 +525,16 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
return 0, storage.ErrOutOfBounds return 0, storage.ErrOutOfBounds
} }
if err := ValidateHistogram(h); err != nil { if h != nil {
return 0, err if err := ValidateHistogram(h); err != nil {
return 0, err
}
}
if fh != nil {
if err := ValidateFloatHistogram(fh); err != nil {
return 0, err
}
} }
s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
@ -517,16 +564,41 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
} }
} }
s.Lock() if h != nil {
if err := s.appendableHistogram(t, h); err != nil { s.Lock()
s.Unlock() if err := s.appendableHistogram(t, h); err != nil {
if err == storage.ErrOutOfOrderSample { s.Unlock()
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() if err == storage.ErrOutOfOrderSample {
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
}
return 0, err
} }
return 0, err s.pendingCommit = true
s.Unlock()
a.histograms = append(a.histograms, record.RefHistogramSample{
Ref: s.ref,
T: t,
H: h,
})
a.histogramSeries = append(a.histogramSeries, s)
} else if fh != nil {
s.Lock()
if err := s.appendableFloatHistogram(t, fh); err != nil {
s.Unlock()
if err == storage.ErrOutOfOrderSample {
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
}
return 0, err
}
s.pendingCommit = true
s.Unlock()
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
Ref: s.ref,
T: t,
FH: fh,
})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
} }
s.pendingCommit = true
s.Unlock()
if t < a.mint { if t < a.mint {
a.mint = t a.mint = t
@ -535,12 +607,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
a.maxt = t a.maxt = t
} }
a.histograms = append(a.histograms, record.RefHistogramSample{
Ref: s.ref,
T: t,
H: h,
})
a.histogramSeries = append(a.histogramSeries, s)
return storage.SeriesRef(s.ref), nil return storage.SeriesRef(s.ref), nil
} }
@ -582,17 +648,17 @@ func ValidateHistogram(h *histogram.Histogram) error {
if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil { if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
return errors.Wrap(err, "positive side") return errors.Wrap(err, "positive side")
} }
var nCount, pCount uint64
negativeCount, err := checkHistogramBuckets(h.NegativeBuckets) err := checkHistogramBuckets(h.NegativeBuckets, &nCount, true)
if err != nil { if err != nil {
return errors.Wrap(err, "negative side") return errors.Wrap(err, "negative side")
} }
positiveCount, err := checkHistogramBuckets(h.PositiveBuckets) err = checkHistogramBuckets(h.PositiveBuckets, &pCount, true)
if err != nil { if err != nil {
return errors.Wrap(err, "positive side") return errors.Wrap(err, "positive side")
} }
if c := negativeCount + positiveCount; c > h.Count { if c := nCount + pCount; c > h.Count {
return errors.Wrap( return errors.Wrap(
storage.ErrHistogramCountNotBigEnough, storage.ErrHistogramCountNotBigEnough,
fmt.Sprintf("%d observations found in buckets, but the Count field is %d", c, h.Count), fmt.Sprintf("%d observations found in buckets, but the Count field is %d", c, h.Count),
@ -602,6 +668,33 @@ func ValidateHistogram(h *histogram.Histogram) error {
return nil return nil
} }
func ValidateFloatHistogram(h *histogram.FloatHistogram) error {
if err := checkHistogramSpans(h.NegativeSpans, len(h.NegativeBuckets)); err != nil {
return errors.Wrap(err, "negative side")
}
if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
return errors.Wrap(err, "positive side")
}
var nCount, pCount float64
err := checkHistogramBuckets(h.NegativeBuckets, &nCount, false)
if err != nil {
return errors.Wrap(err, "negative side")
}
err = checkHistogramBuckets(h.PositiveBuckets, &pCount, false)
if err != nil {
return errors.Wrap(err, "positive side")
}
if c := nCount + pCount; c > h.Count {
return errors.Wrap(
storage.ErrHistogramCountNotBigEnough,
fmt.Sprintf("%f observations found in buckets, but the Count field is %f", c, h.Count),
)
}
return nil
}
func checkHistogramSpans(spans []histogram.Span, numBuckets int) error { func checkHistogramSpans(spans []histogram.Span, numBuckets int) error {
var spanBuckets int var spanBuckets int
for n, span := range spans { for n, span := range spans {
@ -622,27 +715,30 @@ func checkHistogramSpans(spans []histogram.Span, numBuckets int) error {
return nil return nil
} }
func checkHistogramBuckets(buckets []int64) (uint64, error) { func checkHistogramBuckets[BC histogram.BucketCount, IBC histogram.InternalBucketCount](buckets []IBC, count *BC, deltas bool) error {
if len(buckets) == 0 { if len(buckets) == 0 {
return 0, nil return nil
} }
var count uint64 var last IBC
var last int64
for i := 0; i < len(buckets); i++ { for i := 0; i < len(buckets); i++ {
c := last + buckets[i] var c IBC
if deltas {
c = last + buckets[i]
} else {
c = buckets[i]
}
if c < 0 { if c < 0 {
return 0, errors.Wrap( return errors.Wrap(
storage.ErrHistogramNegativeBucketCount, storage.ErrHistogramNegativeBucketCount,
fmt.Sprintf("bucket number %d has observation count of %d", i+1, c), fmt.Sprintf("bucket number %d has observation count of %v", i+1, c),
) )
} }
last = c last = c
count += uint64(c) *count += BC(c)
} }
return count, nil return nil
} }
var _ storage.GetRef = &headAppender{} var _ storage.GetRef = &headAppender{}
@ -707,6 +803,13 @@ func (a *headAppender) log() error {
return errors.Wrap(err, "log histograms") return errors.Wrap(err, "log histograms")
} }
} }
if len(a.floatHistograms) > 0 {
rec = enc.FloatHistogramSamples(a.floatHistograms, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log float histograms")
}
}
return nil return nil
} }
@ -753,6 +856,7 @@ func (a *headAppender) Commit() (err error) {
defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.putSeriesBuffer(a.sampleSeries)
defer a.head.putExemplarBuffer(a.exemplars) defer a.head.putExemplarBuffer(a.exemplars)
defer a.head.putHistogramBuffer(a.histograms) defer a.head.putHistogramBuffer(a.histograms)
defer a.head.putFloatHistogramBuffer(a.floatHistograms)
defer a.head.putMetadataBuffer(a.metadata) defer a.head.putMetadataBuffer(a.metadata)
defer a.head.iso.closeAppend(a.appendID) defer a.head.iso.closeAppend(a.appendID)
@ -924,6 +1028,32 @@ func (a *headAppender) Commit() (err error) {
} }
} }
histogramsTotal += len(a.floatHistograms)
for i, s := range a.floatHistograms {
series = a.floatHistogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
histogramsTotal--
histoOOORejected++
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
}
for i, m := range a.metadata { for i, m := range a.metadata {
series = a.metadataSeries[i] series = a.metadataSeries[i]
series.Lock() series.Lock()
@ -1067,6 +1197,74 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
return true, chunkCreated return true, chunkCreated
} }
// appendFloatHistogram adds the float histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable before
// appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the
// meta properly.
app, _ := s.app.(*chunkenc.FloatHistogramAppender)
var (
positiveInterjections, negativeInterjections []chunkenc.Interjection
okToAppend, counterReset bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
if app != nil {
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh)
}
if !chunkCreated {
// We have 3 cases here
// - !okToAppend -> We need to cut a new chunk.
// - okToAppend but we have interjections → Existing chunk needs
// recoding before we can append our histogram.
// - okToAppend and no interjections → Chunk is ready to support our histogram.
if !okToAppend || counterReset {
c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
chunkCreated = true
} else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 {
// New buckets have appeared. We need to recode all
// prior histogram samples within the chunk before we
// can process this one.
chunk, app := app.Recode(
positiveInterjections, negativeInterjections,
fh.PositiveSpans, fh.NegativeSpans,
)
c.chunk = chunk
s.app = app
}
}
if chunkCreated {
hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk)
header := chunkenc.UnknownCounterReset
if counterReset {
header = chunkenc.CounterReset
} else if okToAppend {
header = chunkenc.NotCounterReset
}
hc.SetCounterResetHeader(header)
}
s.app.AppendFloatHistogram(t, fh)
s.isHistogramSeries = true
c.maxTime = t
s.lastFloatHistogramValue = fh
if appendID > 0 {
s.txs.add(appendID)
}
return true, chunkCreated
}
// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks. // appendPreprocessor takes care of cutting new chunks and m-mapping old chunks.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data. // This should be called only when appending data.
@ -1254,6 +1452,7 @@ func (a *headAppender) Rollback() (err error) {
a.head.putAppendBuffer(a.samples) a.head.putAppendBuffer(a.samples)
a.head.putExemplarBuffer(a.exemplars) a.head.putExemplarBuffer(a.exemplars)
a.head.putHistogramBuffer(a.histograms) a.head.putHistogramBuffer(a.histograms)
a.head.putFloatHistogramBuffer(a.floatHistograms)
a.head.putMetadataBuffer(a.metadata) a.head.putMetadataBuffer(a.metadata)
a.samples = nil a.samples = nil
a.exemplars = nil a.exemplars = nil

View file

@ -2821,6 +2821,7 @@ func TestAppendHistogram(t *testing.T) {
}) })
require.NoError(t, head.Init(0)) require.NoError(t, head.Init(0))
ingestTs := int64(0)
app := head.Appender(context.Background()) app := head.Appender(context.Background())
type timedHistogram struct { type timedHistogram struct {
@ -2828,10 +2829,31 @@ func TestAppendHistogram(t *testing.T) {
h *histogram.Histogram h *histogram.Histogram
} }
expHistograms := make([]timedHistogram, 0, numHistograms) expHistograms := make([]timedHistogram, 0, numHistograms)
for i, h := range GenerateTestHistograms(numHistograms) { for _, h := range GenerateTestHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, int64(i), h) _, err := app.AppendHistogram(0, l, ingestTs, h, nil)
require.NoError(t, err) require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{int64(i), h}) expHistograms = append(expHistograms, timedHistogram{ingestTs, h})
ingestTs++
if ingestTs%50 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
type timedFloatHistogram struct {
t int64
h *histogram.FloatHistogram
}
expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms)
for _, fh := range GenerateTestFloatHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
require.NoError(t, err)
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
ingestTs++
if ingestTs%50 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
@ -2849,18 +2871,25 @@ func TestAppendHistogram(t *testing.T) {
it := s.Iterator(nil) it := s.Iterator(nil)
actHistograms := make([]timedHistogram, 0, len(expHistograms)) actHistograms := make([]timedHistogram, 0, len(expHistograms))
for it.Next() == chunkenc.ValHistogram { actFloatHistograms := make([]timedFloatHistogram, 0, len(expFloatHistograms))
t, h := it.AtHistogram() for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
actHistograms = append(actHistograms, timedHistogram{t, h}) if typ == chunkenc.ValHistogram {
ts, h := it.AtHistogram()
actHistograms = append(actHistograms, timedHistogram{ts, h})
} else if typ == chunkenc.ValFloatHistogram {
ts, fh := it.AtFloatHistogram()
actFloatHistograms = append(actFloatHistograms, timedFloatHistogram{ts, fh})
}
} }
require.Equal(t, expHistograms, actHistograms) require.Equal(t, expHistograms, actHistograms)
require.Equal(t, expFloatHistograms, actFloatHistograms)
}) })
} }
} }
func TestHistogramInWALAndMmapChunk(t *testing.T) { func TestHistogramInWALAndMmapChunk(t *testing.T) {
head, _ := newTestHead(t, 1000, false, false) head, _ := newTestHead(t, 2000, false, false)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}) })
@ -2872,24 +2901,41 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
numHistograms := 450 numHistograms := 450
exp := map[string][]tsdbutil.Sample{} exp := map[string][]tsdbutil.Sample{}
app := head.Appender(context.Background()) app := head.Appender(context.Background())
for i, h := range GenerateTestHistograms(numHistograms) { ts := int64(0)
for _, h := range GenerateTestHistograms(numHistograms) {
h.Count = h.Count * 2 h.Count = h.Count * 2
h.NegativeSpans = h.PositiveSpans h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s1, int64(i), h) _, err := app.AppendHistogram(0, s1, ts, h, nil)
require.NoError(t, err) require.NoError(t, err)
exp[k1] = append(exp[k1], sample{t: int64(i), h: h.Copy()}) exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()})
if i%5 == 0 { ts++
if ts%5 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
for _, h := range GenerateTestFloatHistograms(numHistograms) {
h.Count = h.Count * 2
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s1, ts, nil, h)
require.NoError(t, err)
exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()})
ts++
if ts%5 == 0 {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
app = head.Appender(context.Background()) app = head.Appender(context.Background())
} }
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
// There should be 3 mmap chunks in s1. // There should be 7 mmap chunks in s1.
ms := head.series.getByHash(s1.Hash(), s1) ms := head.series.getByHash(s1.Hash(), s1)
require.Len(t, ms.mmappedChunks, 3) require.Len(t, ms.mmappedChunks, 7)
expMmapChunks := make([]*mmappedChunk, 0, 3) expMmapChunks := make([]*mmappedChunk, 0, 7)
for _, mmap := range ms.mmappedChunks { for _, mmap := range ms.mmappedChunks {
require.Greater(t, mmap.numSamples, uint16(0)) require.Greater(t, mmap.numSamples, uint16(0))
cpy := *mmap cpy := *mmap
@ -2902,13 +2948,13 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
s2 := labels.FromStrings("a", "b2") s2 := labels.FromStrings("a", "b2")
k2 := s2.String() k2 := s2.String()
app = head.Appender(context.Background()) app = head.Appender(context.Background())
ts := 0 ts = 0
for _, h := range GenerateTestHistograms(200) { for _, h := range GenerateTestHistograms(100) {
ts++ ts++
h.Count = h.Count * 2 h.Count = h.Count * 2
h.NegativeSpans = h.PositiveSpans h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s2, int64(ts), h) _, err := app.AppendHistogram(0, s2, int64(ts), h, nil)
require.NoError(t, err) require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()}) exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()})
if ts%20 == 0 { if ts%20 == 0 {
@ -2926,6 +2972,30 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
} }
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
app = head.Appender(context.Background())
for _, h := range GenerateTestFloatHistograms(100) {
ts++
h.Count = h.Count * 2
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s2, int64(ts), nil, h)
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()})
if ts%20 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
// Add some float.
for i := 0; i < 10; i++ {
ts++
_, err := app.Append(0, s2, int64(ts), float64(ts))
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)})
}
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
// Restart head. // Restart head.
require.NoError(t, head.Close()) require.NoError(t, head.Close())
@ -3250,6 +3320,7 @@ func TestSnapshotError(t *testing.T) {
} }
func TestHistogramMetrics(t *testing.T) { func TestHistogramMetrics(t *testing.T) {
numHistograms := 10
head, _ := newTestHead(t, 1000, false, false) head, _ := newTestHead(t, 1000, false, false)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
@ -3261,9 +3332,16 @@ func TestHistogramMetrics(t *testing.T) {
for x := 0; x < 5; x++ { for x := 0; x < 5; x++ {
expHSeries++ expHSeries++
l := labels.FromStrings("a", fmt.Sprintf("b%d", x)) l := labels.FromStrings("a", fmt.Sprintf("b%d", x))
for i, h := range GenerateTestHistograms(10) { for i, h := range GenerateTestHistograms(numHistograms) {
app := head.Appender(context.Background()) app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, l, int64(i), h) _, err := app.AppendHistogram(0, l, int64(i), h, nil)
require.NoError(t, err)
require.NoError(t, app.Commit())
expHSamples++
}
for i, fh := range GenerateTestFloatHistograms(numHistograms) {
app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, l, int64(numHistograms+i), nil, fh)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
expHSamples++ expHSamples++
@ -3283,6 +3361,8 @@ func TestHistogramMetrics(t *testing.T) {
} }
func TestHistogramStaleSample(t *testing.T) { func TestHistogramStaleSample(t *testing.T) {
// TODO(marctc): Add similar test for float histograms
l := labels.FromStrings("a", "b") l := labels.FromStrings("a", "b")
numHistograms := 20 numHistograms := 20
head, _ := newTestHead(t, 100000, false, false) head, _ := newTestHead(t, 100000, false, false)
@ -3338,7 +3418,7 @@ func TestHistogramStaleSample(t *testing.T) {
// Adding stale in the same appender. // Adding stale in the same appender.
app := head.Appender(context.Background()) app := head.Appender(context.Background())
for _, h := range GenerateTestHistograms(numHistograms) { for _, h := range GenerateTestHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h) _, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h, nil)
require.NoError(t, err) require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h}) expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h})
} }
@ -3357,7 +3437,7 @@ func TestHistogramStaleSample(t *testing.T) {
// Adding stale in different appender and continuing series after a stale sample. // Adding stale in different appender and continuing series after a stale sample.
app = head.Appender(context.Background()) app = head.Appender(context.Background())
for _, h := range GenerateTestHistograms(2 * numHistograms)[numHistograms:] { for _, h := range GenerateTestHistograms(2 * numHistograms)[numHistograms:] {
_, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h) _, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h, nil)
require.NoError(t, err) require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h}) expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h})
} }
@ -3378,104 +3458,121 @@ func TestHistogramStaleSample(t *testing.T) {
} }
func TestHistogramCounterResetHeader(t *testing.T) { func TestHistogramCounterResetHeader(t *testing.T) {
l := labels.FromStrings("a", "b") for _, floatHisto := range []bool{true, false} {
head, _ := newTestHead(t, 1000, false, false) t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
t.Cleanup(func() { l := labels.FromStrings("a", "b")
require.NoError(t, head.Close()) head, _ := newTestHead(t, 1000, false, false)
}) t.Cleanup(func() {
require.NoError(t, head.Init(0)) require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))
ts := int64(0) ts := int64(0)
appendHistogram := func(h *histogram.Histogram) { appendHistogram := func(h *histogram.Histogram) {
ts++ ts++
app := head.Appender(context.Background()) app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, l, ts, h) var err error
require.NoError(t, err) if floatHisto {
require.NoError(t, app.Commit()) _, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat())
} else {
_, err = app.AppendHistogram(0, l, ts, h, nil)
}
require.NoError(t, err)
require.NoError(t, app.Commit())
}
var expHeaders []chunkenc.CounterResetHeader
checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) {
expHeaders = append(expHeaders, newHeaders...)
ms, _, err := head.getOrCreate(l.Hash(), l)
require.NoError(t, err)
require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk.
for i, mmapChunk := range ms.mmappedChunks {
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
require.NoError(t, err)
if floatHisto {
require.Equal(t, expHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
} else {
require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
}
if floatHisto {
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
} else {
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
}
h := GenerateTestHistograms(1)[0]
if len(h.NegativeBuckets) == 0 {
h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...)
h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...)
}
h.PositiveBuckets = []int64{100, 1, 1, 1}
h.NegativeBuckets = []int64{100, 1, 1, 1}
h.Count = 1000
// First histogram is UnknownCounterReset.
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Another normal histogram.
h.Count++
appendHistogram(h)
checkExpCounterResetHeader()
// Counter reset via Count.
h.Count--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Add 2 non-counter reset histograms.
for i := 0; i < 250; i++ {
appendHistogram(h)
}
checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset)
// Changing schema will cut a new chunk with unknown counter reset.
h.Schema++
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Changing schema will zero threshold a new chunk with unknown counter reset.
h.ZeroThreshold += 0.01
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Counter reset by removing a positive bucket.
h.PositiveSpans[1].Length--
h.PositiveBuckets = h.PositiveBuckets[1:]
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Counter reset by removing a negative bucket.
h.NegativeSpans[1].Length--
h.NegativeBuckets = h.NegativeBuckets[1:]
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Add 2 non-counter reset histograms. Just to have some non-counter reset chunks in between.
for i := 0; i < 250; i++ {
appendHistogram(h)
}
checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset)
// Counter reset with counter reset in a positive bucket.
h.PositiveBuckets[len(h.PositiveBuckets)-1]--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Counter reset with counter reset in a negative bucket.
h.NegativeBuckets[len(h.NegativeBuckets)-1]--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
})
} }
var expHeaders []chunkenc.CounterResetHeader
checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) {
expHeaders = append(expHeaders, newHeaders...)
ms, _, err := head.getOrCreate(l.Hash(), l)
require.NoError(t, err)
require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk.
for i, mmapChunk := range ms.mmappedChunks {
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
require.NoError(t, err)
require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
h := GenerateTestHistograms(1)[0]
if len(h.NegativeBuckets) == 0 {
h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...)
h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...)
}
h.PositiveBuckets = []int64{100, 1, 1, 1}
h.NegativeBuckets = []int64{100, 1, 1, 1}
h.Count = 1000
// First histogram is UnknownCounterReset.
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Another normal histogram.
h.Count++
appendHistogram(h)
checkExpCounterResetHeader()
// Counter reset via Count.
h.Count--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Add 2 non-counter reset histograms.
for i := 0; i < 250; i++ {
appendHistogram(h)
}
checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset)
// Changing schema will cut a new chunk with unknown counter reset.
h.Schema++
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Changing schema will zero threshold a new chunk with unknown counter reset.
h.ZeroThreshold += 0.01
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Counter reset by removing a positive bucket.
h.PositiveSpans[1].Length--
h.PositiveBuckets = h.PositiveBuckets[1:]
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Counter reset by removing a negative bucket.
h.NegativeSpans[1].Length--
h.NegativeBuckets = h.NegativeBuckets[1:]
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Add 2 non-counter reset histograms. Just to have some non-counter reset chunks in between.
for i := 0; i < 250; i++ {
appendHistogram(h)
}
checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset)
// Counter reset with counter reset in a positive bucket.
h.PositiveBuckets[len(h.PositiveBuckets)-1]--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Counter reset with counter reset in a negative bucket.
h.NegativeBuckets[len(h.NegativeBuckets)-1]--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
} }
func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
@ -3490,34 +3587,10 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
db.DisableCompactions() db.DisableCompactions()
hists := GenerateTestHistograms(10) hists := GenerateTestHistograms(10)
floatHists := GenerateTestFloatHistograms(10)
lbls := labels.FromStrings("a", "b") lbls := labels.FromStrings("a", "b")
type result struct { var expResult []tsdbutil.Sample
t int64
v float64
h *histogram.Histogram
vt chunkenc.ValueType
}
expResult := []result{}
ref := storage.SeriesRef(0)
addFloat64Sample := func(app storage.Appender, ts int64, v float64) {
ref, err = app.Append(ref, lbls, ts, v)
require.NoError(t, err)
expResult = append(expResult, result{
t: ts,
v: v,
vt: chunkenc.ValFloat,
})
}
addHistogramSample := func(app storage.Appender, ts int64, h *histogram.Histogram) {
ref, err = app.AppendHistogram(ref, lbls, ts, h)
require.NoError(t, err)
expResult = append(expResult, result{
t: ts,
h: h,
vt: chunkenc.ValHistogram,
})
}
checkExpChunks := func(count int) { checkExpChunks := func(count int) {
ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls) ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err) require.NoError(t, err)
@ -3526,94 +3599,120 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
require.Len(t, ms.mmappedChunks, count-1) // One will be the head chunk. require.Len(t, ms.mmappedChunks, count-1) // One will be the head chunk.
} }
// Only histograms in first commit. appends := []struct {
app := db.Appender(context.Background()) samples []tsdbutil.Sample
addHistogramSample(app, 1, hists[1]) expChunks int
require.NoError(t, app.Commit()) err error
checkExpChunks(1) // If this is empty, samples above will be taken instead of this.
addToExp []tsdbutil.Sample
}{
{
samples: []tsdbutil.Sample{sample{t: 100, h: hists[1]}},
expChunks: 1,
},
{
samples: []tsdbutil.Sample{sample{t: 200, v: 2}},
expChunks: 2,
},
{
samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[1]}},
expChunks: 3,
},
{
samples: []tsdbutil.Sample{sample{t: 220, h: hists[1]}},
expChunks: 4,
},
{
samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3]}},
expChunks: 5,
},
{
samples: []tsdbutil.Sample{sample{t: 100, h: hists[2]}},
err: storage.ErrOutOfOrderSample,
},
{
samples: []tsdbutil.Sample{sample{t: 300, h: hists[3]}},
expChunks: 6,
},
{
samples: []tsdbutil.Sample{sample{t: 100, v: 2}},
err: storage.ErrOutOfOrderSample,
},
{
samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4]}},
err: storage.ErrOutOfOrderSample,
},
{
// Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also
// verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order.
samples: []tsdbutil.Sample{
sample{t: 400, v: 4},
sample{t: 500, h: hists[5]}, // This won't be committed.
sample{t: 600, v: 6},
},
addToExp: []tsdbutil.Sample{
sample{t: 400, v: 4},
sample{t: 600, v: 6},
},
expChunks: 7, // Only 1 new chunk for float64.
},
{
// Here the histogram is appended at the end, hence the first histogram is out of order.
samples: []tsdbutil.Sample{
sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first.
sample{t: 800, v: 8},
sample{t: 900, h: hists[9]},
},
addToExp: []tsdbutil.Sample{
sample{t: 800, v: 8},
sample{t: 900, h: hists[9]},
},
expChunks: 8, // float64 added to old chunk, only 1 new for histograms.
},
{
// Float histogram is appended at the end.
samples: []tsdbutil.Sample{
sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram.
sample{t: 1100, h: hists[9]},
},
addToExp: []tsdbutil.Sample{
sample{t: 1100, h: hists[9]},
},
expChunks: 8,
},
}
// Only float64 in second commit, a new chunk should be cut. for _, a := range appends {
app = db.Appender(context.Background()) app := db.Appender(context.Background())
addFloat64Sample(app, 2, 2) for _, s := range a.samples {
require.NoError(t, app.Commit()) var err error
checkExpChunks(2) if s.H() != nil || s.FH() != nil {
_, err = app.AppendHistogram(0, lbls, s.T(), s.H(), s.FH())
} else {
_, err = app.Append(0, lbls, s.T(), s.V())
}
require.Equal(t, a.err, err)
}
// Out of order histogram is shown correctly for a float64 chunk. No new chunk. if a.err == nil {
app = db.Appender(context.Background()) require.NoError(t, app.Commit())
_, err = app.AppendHistogram(ref, lbls, 1, hists[2]) if len(a.addToExp) > 0 {
require.Equal(t, storage.ErrOutOfOrderSample, err) expResult = append(expResult, a.addToExp...)
require.NoError(t, app.Commit()) } else {
expResult = append(expResult, a.samples...)
// Only histograms in third commit to check float64 -> histogram transition. }
app = db.Appender(context.Background()) checkExpChunks(a.expChunks)
addHistogramSample(app, 3, hists[3]) } else {
require.NoError(t, app.Commit()) require.NoError(t, app.Rollback())
checkExpChunks(3) }
}
// Out of order float64 is shown correctly for a histogram chunk. No new chunk.
app = db.Appender(context.Background())
_, err = app.Append(ref, lbls, 1, 2)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.NoError(t, app.Commit())
// Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also
// verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order.
app = db.Appender(context.Background())
addFloat64Sample(app, 4, 4)
// This won't be committed.
addHistogramSample(app, 5, hists[5])
expResult = expResult[0 : len(expResult)-1]
addFloat64Sample(app, 6, 6)
require.NoError(t, app.Commit())
checkExpChunks(4) // Only 1 new chunk for float64.
// Here the histogram is appended at the end, hence the first histogram is out of order.
app = db.Appender(context.Background())
// Out of order w.r.t. the next float64 sample that is appended first.
addHistogramSample(app, 7, hists[7])
expResult = expResult[0 : len(expResult)-1]
addFloat64Sample(app, 8, 9)
addHistogramSample(app, 9, hists[9])
require.NoError(t, app.Commit())
checkExpChunks(5) // float64 added to old chunk, only 1 new for histograms.
// Query back and expect same order of samples. // Query back and expect same order of samples.
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, q.Close())
})
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next()) require.Equal(t, map[string][]tsdbutil.Sample{lbls.String(): expResult}, series)
s := ss.At()
it := s.Iterator(nil)
expIdx := 0
loop:
for {
vt := it.Next()
switch vt {
case chunkenc.ValNone:
require.Equal(t, len(expResult), expIdx)
break loop
case chunkenc.ValFloat:
ts, v := it.At()
require.Equal(t, expResult[expIdx].t, ts)
require.Equal(t, expResult[expIdx].v, v)
case chunkenc.ValHistogram:
ts, h := it.AtHistogram()
require.Equal(t, expResult[expIdx].t, ts)
require.Equal(t, expResult[expIdx].h, h)
default:
require.Error(t, fmt.Errorf("unexpected ValueType %v", vt))
}
require.Equal(t, expResult[expIdx].vt, vt)
expIdx++
}
require.NoError(t, it.Err())
require.NoError(t, ss.Err())
require.Equal(t, len(expResult), expIdx)
require.False(t, ss.Next()) // Only 1 series.
} }
// Tests https://github.com/prometheus/prometheus/issues/9725. // Tests https://github.com/prometheus/prometheus/issues/9725.
@ -4101,8 +4200,9 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
func TestHistogramValidation(t *testing.T) { func TestHistogramValidation(t *testing.T) {
tests := map[string]struct { tests := map[string]struct {
h *histogram.Histogram h *histogram.Histogram
errMsg string errMsg string
errMsgFloat string // To be considered for float histogram only if it is non-empty.
}{ }{
"valid histogram": { "valid histogram": {
h: GenerateTestHistograms(1)[0], h: GenerateTestHistograms(1)[0],
@ -4171,7 +4271,8 @@ func TestHistogramValidation(t *testing.T) {
NegativeBuckets: []int64{1}, NegativeBuckets: []int64{1},
PositiveBuckets: []int64{1}, PositiveBuckets: []int64{1},
}, },
errMsg: `2 observations found in buckets, but the Count field is 0`, errMsg: `2 observations found in buckets, but the Count field is 0`,
errMsgFloat: `2.000000 observations found in buckets, but the Count field is 0.000000`,
}, },
} }
@ -4183,12 +4284,22 @@ func TestHistogramValidation(t *testing.T) {
} else { } else {
require.NoError(t, err) require.NoError(t, err)
} }
err = ValidateFloatHistogram(tc.h.ToFloat())
if tc.errMsgFloat != "" {
require.ErrorContains(t, err, tc.errMsgFloat)
} else if tc.errMsg != "" {
require.ErrorContains(t, err, tc.errMsg)
} else {
require.NoError(t, err)
}
}) })
} }
} }
func BenchmarkHistogramValidation(b *testing.B) { func BenchmarkHistogramValidation(b *testing.B) {
histograms := generateBigTestHistograms(b.N) histograms := generateBigTestHistograms(b.N)
b.ResetTimer()
for _, h := range histograms { for _, h := range histograms {
require.NoError(b, ValidateHistogram(h)) require.NoError(b, ValidateHistogram(h))
} }

View file

@ -29,6 +29,7 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -42,6 +43,15 @@ import (
"github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/tsdb/wlog"
) )
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
// to simplify the WAL replay.
type histogramRecord struct {
ref chunks.HeadSeriesRef
t int64
h *histogram.Histogram
fh *histogram.FloatHistogram
}
func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
// Track number of samples that referenced a series we don't know about // Track number of samples that referenced a series we don't know about
// for error reporting. // for error reporting.
@ -61,7 +71,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
dec record.Decoder dec record.Decoder
shards = make([][]record.RefSample, n) shards = make([][]record.RefSample, n)
histogramShards = make([][]record.RefHistogramSample, n) histogramShards = make([][]histogramRecord, n)
decoded = make(chan interface{}, 10) decoded = make(chan interface{}, 10)
decodeErr, seriesCreationErr error decodeErr, seriesCreationErr error
@ -90,6 +100,11 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
return []record.RefHistogramSample{} return []record.RefHistogramSample{}
}, },
} }
floatHistogramsPool = sync.Pool{
New: func() interface{} {
return []record.RefFloatHistogramSample{}
},
}
metadataPool = sync.Pool{ metadataPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
return []record.RefMetadata{} return []record.RefMetadata{}
@ -212,6 +227,18 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
return return
} }
decoded <- hists decoded <- hists
case record.FloatHistogramSamples:
hists := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode float histograms"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- hists
case record.Metadata: case record.Metadata:
meta := metadataPool.Get().([]record.RefMetadata)[:0] meta := metadataPool.Get().([]record.RefMetadata)[:0]
meta, err := dec.Metadata(rec, meta) meta, err := dec.Metadata(rec, meta)
@ -337,7 +364,7 @@ Outer:
sam.Ref = r sam.Ref = r
} }
mod := uint64(sam.Ref) % uint64(n) mod := uint64(sam.Ref) % uint64(n)
histogramShards[mod] = append(histogramShards[mod], sam) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
if len(histogramShards[i]) > 0 { if len(histogramShards[i]) > 0 {
@ -349,6 +376,43 @@ Outer:
} }
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. //nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
histogramsPool.Put(v) histogramsPool.Put(v)
case []record.RefFloatHistogramSample:
samples := v
minValidTime := h.minValidTime.Load()
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
m := 5000
if len(samples) < m {
m = len(samples)
}
for i := 0; i < n; i++ {
if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf()
}
}
for _, sam := range samples[:m] {
if sam.T < minValidTime {
continue // Before minValidTime: discard.
}
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(n)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
}
for i := 0; i < n; i++ {
if len(histogramShards[i]) > 0 {
processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
histogramShards[i] = nil
}
}
samples = samples[m:]
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
floatHistogramsPool.Put(v)
case []record.RefMetadata: case []record.RefMetadata:
for _, m := range v { for _, m := range v {
s := h.series.getByID(chunks.HeadSeriesRef(m.Ref)) s := h.series.getByID(chunks.HeadSeriesRef(m.Ref))
@ -467,12 +531,12 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
type walSubsetProcessor struct { type walSubsetProcessor struct {
input chan walSubsetProcessorInputItem input chan walSubsetProcessorInputItem
output chan []record.RefSample output chan []record.RefSample
histogramsOutput chan []record.RefHistogramSample histogramsOutput chan []histogramRecord
} }
type walSubsetProcessorInputItem struct { type walSubsetProcessorInputItem struct {
samples []record.RefSample samples []record.RefSample
histogramSamples []record.RefHistogramSample histogramSamples []histogramRecord
existingSeries *memSeries existingSeries *memSeries
walSeriesRef chunks.HeadSeriesRef walSeriesRef chunks.HeadSeriesRef
} }
@ -480,7 +544,7 @@ type walSubsetProcessorInputItem struct {
func (wp *walSubsetProcessor) setup() { func (wp *walSubsetProcessor) setup() {
wp.input = make(chan walSubsetProcessorInputItem, 300) wp.input = make(chan walSubsetProcessorInputItem, 300)
wp.output = make(chan []record.RefSample, 300) wp.output = make(chan []record.RefSample, 300)
wp.histogramsOutput = make(chan []record.RefHistogramSample, 300) wp.histogramsOutput = make(chan []histogramRecord, 300)
} }
func (wp *walSubsetProcessor) closeAndDrain() { func (wp *walSubsetProcessor) closeAndDrain() {
@ -502,7 +566,7 @@ func (wp *walSubsetProcessor) reuseBuf() []record.RefSample {
} }
// If there is a buffer in the output chan, return it for reuse, otherwise return nil. // If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func (wp *walSubsetProcessor) reuseHistogramBuf() []record.RefHistogramSample { func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord {
select { select {
case buf := <-wp.histogramsOutput: case buf := <-wp.histogramsOutput:
return buf[:0] return buf[:0]
@ -562,27 +626,33 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
} }
for _, s := range in.histogramSamples { for _, s := range in.histogramSamples {
if s.T < minValidTime { if s.t < minValidTime {
continue continue
} }
ms := h.series.getByID(s.Ref) ms := h.series.getByID(s.ref)
if ms == nil { if ms == nil {
unknownHistogramRefs++ unknownHistogramRefs++
continue continue
} }
ms.isHistogramSeries = true ms.isHistogramSeries = true
if s.T <= ms.mmMaxTime { if s.t <= ms.mmMaxTime {
continue continue
} }
if _, chunkCreated := ms.appendHistogram(s.T, s.H, 0, h.chunkDiskMapper, chunkRange); chunkCreated { var chunkCreated bool
if s.h != nil {
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange)
} else {
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange)
}
if chunkCreated {
h.metrics.chunksCreated.Inc() h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc() h.metrics.chunks.Inc()
} }
if s.T > maxt { if s.t > maxt {
maxt = s.T maxt = s.t
} }
if s.T < mint { if s.t < mint {
mint = s.T mint = s.t
} }
} }

View file

@ -531,7 +531,6 @@ func (b *blockBaseSeriesSet) Next() bool {
b.curr.labels = b.builder.Labels() b.curr.labels = b.builder.Labels()
b.curr.chks = chks b.curr.chks = chks
b.curr.intervals = intervals b.curr.intervals = intervals
return true return true
} }
return false return false
@ -750,7 +749,6 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
if p.currDelIter == nil { if p.currDelIter == nil {
return true return true
} }
valueType := p.currDelIter.Next() valueType := p.currDelIter.Next()
if valueType == chunkenc.ValNone { if valueType == chunkenc.ValNone {
if err := p.currDelIter.Err(); err != nil { if err := p.currDelIter.Err(); err != nil {
@ -825,9 +823,47 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
t, v = p.currDelIter.At() t, v = p.currDelIter.At()
app.Append(t, v) app.Append(t, v)
} }
case chunkenc.ValFloatHistogram:
newChunk = chunkenc.NewFloatHistogramChunk()
if app, err = newChunk.Appender(); err != nil {
break
}
if hc, ok := p.currChkMeta.Chunk.(*chunkenc.FloatHistogramChunk); ok {
newChunk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
}
var h *histogram.FloatHistogram
t, h = p.currDelIter.AtFloatHistogram()
p.curr.MinTime = t
app.AppendFloatHistogram(t, h)
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloatHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
}
t, h = p.currDelIter.AtFloatHistogram()
// Defend against corrupted chunks.
pI, nI, okToAppend, counterReset := app.(*chunkenc.FloatHistogramAppender).Appendable(h)
if len(pI)+len(nI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
len(pI), len(nI),
)
break
}
if counterReset {
err = errors.New("detected unexpected counter reset in histogram")
break
}
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
app.AppendFloatHistogram(t, h)
}
default: default:
// TODO(beorn7): Need FloatHistogram eventually.
err = fmt.Errorf("populateWithDelChunkSeriesIterator: value type %v unsupported", valueType) err = fmt.Errorf("populateWithDelChunkSeriesIterator: value type %v unsupported", valueType)
} }

View file

@ -49,6 +49,8 @@ const (
Metadata Type = 6 Metadata Type = 6
// HistogramSamples is used to match WAL records of type Histograms. // HistogramSamples is used to match WAL records of type Histograms.
HistogramSamples Type = 7 HistogramSamples Type = 7
// FloatHistogramSamples is used to match WAL records of type Float Histograms.
FloatHistogramSamples Type = 8
) )
func (rt Type) String() string { func (rt Type) String() string {
@ -63,6 +65,8 @@ func (rt Type) String() string {
return "exemplars" return "exemplars"
case HistogramSamples: case HistogramSamples:
return "histogram_samples" return "histogram_samples"
case FloatHistogramSamples:
return "float_histogram_samples"
case MmapMarkers: case MmapMarkers:
return "mmapmarkers" return "mmapmarkers"
case Metadata: case Metadata:
@ -173,6 +177,13 @@ type RefHistogramSample struct {
H *histogram.Histogram H *histogram.Histogram
} }
// RefFloatHistogramSample is a float histogram.
type RefFloatHistogramSample struct {
Ref chunks.HeadSeriesRef
T int64
FH *histogram.FloatHistogram
}
// RefMmapMarker marks that the all the samples of the given series until now have been m-mapped to disk. // RefMmapMarker marks that the all the samples of the given series until now have been m-mapped to disk.
type RefMmapMarker struct { type RefMmapMarker struct {
Ref chunks.HeadSeriesRef Ref chunks.HeadSeriesRef
@ -192,7 +203,7 @@ func (d *Decoder) Type(rec []byte) Type {
return Unknown return Unknown
} }
switch t := Type(rec[0]); t { switch t := Type(rec[0]); t {
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples: case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples:
return t return t
} }
return Unknown return Unknown
@ -427,13 +438,7 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample)
rh := RefHistogramSample{ rh := RefHistogramSample{
Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)), Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)),
T: baseTime + dtime, T: baseTime + dtime,
H: &histogram.Histogram{ H: &histogram.Histogram{},
Schema: 0,
ZeroThreshold: 0,
ZeroCount: 0,
Count: 0,
Sum: 0,
},
} }
rh.H.Schema = int32(dec.Varint64()) rh.H.Schema = int32(dec.Varint64())
@ -489,6 +494,82 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample)
return histograms, nil return histograms, nil
} }
func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) {
dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte())
if t != FloatHistogramSamples {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
return histograms, nil
}
var (
baseRef = dec.Be64()
baseTime = dec.Be64int64()
)
for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64()
dtime := dec.Varint64()
rh := RefFloatHistogramSample{
Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)),
T: baseTime + dtime,
FH: &histogram.FloatHistogram{},
}
rh.FH.Schema = int32(dec.Varint64())
rh.FH.ZeroThreshold = dec.Be64Float64()
rh.FH.ZeroCount = dec.Be64Float64()
rh.FH.Count = dec.Be64Float64()
rh.FH.Sum = dec.Be64Float64()
l := dec.Uvarint()
if l > 0 {
rh.FH.PositiveSpans = make([]histogram.Span, l)
}
for i := range rh.FH.PositiveSpans {
rh.FH.PositiveSpans[i].Offset = int32(dec.Varint64())
rh.FH.PositiveSpans[i].Length = dec.Uvarint32()
}
l = dec.Uvarint()
if l > 0 {
rh.FH.NegativeSpans = make([]histogram.Span, l)
}
for i := range rh.FH.NegativeSpans {
rh.FH.NegativeSpans[i].Offset = int32(dec.Varint64())
rh.FH.NegativeSpans[i].Length = dec.Uvarint32()
}
l = dec.Uvarint()
if l > 0 {
rh.FH.PositiveBuckets = make([]float64, l)
}
for i := range rh.FH.PositiveBuckets {
rh.FH.PositiveBuckets[i] = dec.Be64Float64()
}
l = dec.Uvarint()
if l > 0 {
rh.FH.NegativeBuckets = make([]float64, l)
}
for i := range rh.FH.NegativeBuckets {
rh.FH.NegativeBuckets[i] = dec.Be64Float64()
}
histograms = append(histograms, rh)
}
if dec.Err() != nil {
return nil, errors.Wrapf(dec.Err(), "decode error after %d histograms", len(histograms))
}
if len(dec.B) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return histograms, nil
}
// Encoder encodes series, sample, and tombstones records. // Encoder encodes series, sample, and tombstones records.
// The zero value is ready to use. // The zero value is ready to use.
type Encoder struct{} type Encoder struct{}
@ -666,3 +747,54 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []
return buf.Get() return buf.Get()
} }
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(FloatHistogramSamples))
if len(histograms) == 0 {
return buf.Get()
}
// Store base timestamp and base reference number of first histogram.
// All histograms encode their timestamp and ref as delta to those.
first := histograms[0]
buf.PutBE64(uint64(first.Ref))
buf.PutBE64int64(first.T)
for _, h := range histograms {
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
buf.PutVarint64(int64(h.FH.Schema))
buf.PutBEFloat64(h.FH.ZeroThreshold)
buf.PutBEFloat64(h.FH.ZeroCount)
buf.PutBEFloat64(h.FH.Count)
buf.PutBEFloat64(h.FH.Sum)
buf.PutUvarint(len(h.FH.PositiveSpans))
for _, s := range h.FH.PositiveSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.FH.NegativeSpans))
for _, s := range h.FH.NegativeSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.FH.PositiveBuckets))
for _, b := range h.FH.PositiveBuckets {
buf.PutBEFloat64(b)
}
buf.PutUvarint(len(h.FH.NegativeBuckets))
for _, b := range h.FH.NegativeBuckets {
buf.PutBEFloat64(b)
}
}
return buf.Get()
}

View file

@ -153,6 +153,18 @@ func TestRecord_EncodeDecode(t *testing.T) {
decHistograms, err := dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil) decHistograms, err := dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, histograms, decHistograms) require.Equal(t, histograms, decHistograms)
floatHistograms := make([]RefFloatHistogramSample, len(histograms))
for i, h := range histograms {
floatHistograms[i] = RefFloatHistogramSample{
Ref: h.Ref,
T: h.T,
FH: h.H.ToFloat(),
}
}
decFloatHistograms, err := dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
require.NoError(t, err)
require.Equal(t, floatHistograms, decFloatHistograms)
} }
// TestRecord_Corrupted ensures that corrupted records return the correct error. // TestRecord_Corrupted ensures that corrupted records return the correct error.

View file

@ -74,7 +74,10 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l
ref, err = app.Append(ref, lset, t, v) ref, err = app.Append(ref, lset, t, v)
case chunkenc.ValHistogram: case chunkenc.ValHistogram:
t, h := it.AtHistogram() t, h := it.AtHistogram()
ref, err = app.AppendHistogram(ref, lset, t, h) ref, err = app.AppendHistogram(ref, lset, t, h, nil)
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
default: default:
return "", fmt.Errorf("unknown sample type %s", typ.String()) return "", fmt.Errorf("unknown sample type %s", typ.String())
} }