diff --git a/storage/series.go b/storage/series.go index f609df3f02..5daa6255da 100644 --- a/storage/series.go +++ b/storage/series.go @@ -281,7 +281,7 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries { func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { var ( chk chunkenc.Chunk - app chunkenc.Appender + app *RecodingAppender err error ) mint := int64(math.MaxInt64) @@ -299,21 +299,16 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { if typ != lastType || i >= seriesToChunkEncoderSplit { // Create a new chunk if the sample type changed or too many samples in the current one. - if chk != nil { - chks = append(chks, chunks.Meta{ - MinTime: mint, - MaxTime: maxt, - Chunk: chk, - }) - } + chks = appendChunk(chks, mint, maxt, chk) chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding()) if err != nil { return errChunksIterator{err: err} } - app, err = chk.Appender() + chkAppender, err := chk.Appender() if err != nil { return errChunksIterator{err: err} } + app = NewRecodingAppender(&chk, chkAppender) mint = int64(math.MaxInt64) // maxt is immediately overwritten below which is why setting it here won't make a difference. i = 0 @@ -332,10 +327,45 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { app.Append(t, v) case chunkenc.ValHistogram: t, h = seriesIter.AtHistogram() - app.AppendHistogram(t, h) + if ok, counterReset := app.AppendHistogram(t, h); !ok { + chks = appendChunk(chks, mint, maxt, chk) + histChunk := chunkenc.NewHistogramChunk() + if counterReset { + histChunk.SetCounterResetHeader(chunkenc.CounterReset) + } + chk = histChunk + + chkAppender, err := chk.Appender() + if err != nil { + return errChunksIterator{err: err} + } + mint = int64(math.MaxInt64) + i = 0 + app = NewRecodingAppender(&chk, chkAppender) + if ok, _ := app.AppendHistogram(t, h); !ok { + panic("unexpected error while appending histogram") + } + } case chunkenc.ValFloatHistogram: t, fh = seriesIter.AtFloatHistogram() - app.AppendFloatHistogram(t, fh) + if ok, counterReset := app.AppendFloatHistogram(t, fh); !ok { + chks = appendChunk(chks, mint, maxt, chk) + floatHistChunk := chunkenc.NewFloatHistogramChunk() + if counterReset { + floatHistChunk.SetCounterResetHeader(chunkenc.CounterReset) + } + chk = floatHistChunk + chkAppender, err := chk.Appender() + if err != nil { + return errChunksIterator{err: err} + } + mint = int64(math.MaxInt64) + i = 0 + app = NewRecodingAppender(&chk, chkAppender) + if ok, _ := app.AppendFloatHistogram(t, fh); !ok { + panic("unexpected error while float appending histogram") + } + } default: return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())} } @@ -350,6 +380,16 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { return errChunksIterator{err: err} } + chks = appendChunk(chks, mint, maxt, chk) + + if existing { + lcsi.Reset(chks...) + return lcsi + } + return NewListChunkSeriesIterator(chks...) +} + +func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta { if chk != nil { chks = append(chks, chunks.Meta{ MinTime: mint, @@ -357,12 +397,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { Chunk: chk, }) } - - if existing { - lcsi.Reset(chks...) - return lcsi - } - return NewListChunkSeriesIterator(chks...) + return chks } type errChunksIterator struct { @@ -420,3 +455,126 @@ func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) { } return result, iter.Err() } + +// RecodingAppender is a tsdb.Appender that recodes histogram samples if needed during appends. +// It takes an existing appender and a chunk to which samples are appended. +type RecodingAppender struct { + chk *chunkenc.Chunk + app chunkenc.Appender +} + +func NewRecodingAppender(chk *chunkenc.Chunk, app chunkenc.Appender) *RecodingAppender { + return &RecodingAppender{ + chk: chk, + app: app, + } +} + +// Append appends a float sample to the appender. +func (a *RecodingAppender) Append(t int64, v float64) { + a.app.Append(t, v) +} + +// AppendHistogram appends a histogram sample to the underlying chunk. +// The method returns false if the sample cannot be appended and a boolean value set to true +// when it is not appendable because of a counter reset. +// If counterReset is true, okToAppend is always false. +func (a *RecodingAppender) AppendHistogram(t int64, h *histogram.Histogram) (okToAppend, counterReset bool) { + app, ok := a.app.(*chunkenc.HistogramAppender) + if !ok { + return false, false + } + + if app.NumSamples() == 0 { + a.app.AppendHistogram(t, h) + return true, false + } + + var ( + pForwardInserts, nForwardInserts []chunkenc.Insert + pBackwardInserts, nBackwardInserts []chunkenc.Insert + pMergedSpans, nMergedSpans []histogram.Span + ) + switch h.CounterResetHint { + case histogram.GaugeType: + pForwardInserts, nForwardInserts, + pBackwardInserts, nBackwardInserts, + pMergedSpans, nMergedSpans, + okToAppend = app.AppendableGauge(h) + default: + pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h) + } + if !okToAppend || counterReset { + return false, counterReset + } + + if len(pBackwardInserts)+len(nBackwardInserts) > 0 { + h.PositiveSpans = pMergedSpans + h.NegativeSpans = nMergedSpans + app.RecodeHistogram(h, pBackwardInserts, nBackwardInserts) + } + if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { + chk, app := app.Recode( + pForwardInserts, nForwardInserts, + h.PositiveSpans, h.NegativeSpans, + ) + *a.chk = chk + a.app = app + } + + a.app.AppendHistogram(t, h) + return true, counterReset +} + +// AppendFloatHistogram appends a float histogram sample to the underlying chunk. +// The method returns false if the sample cannot be appended and a boolean value set to true +// when it is not appendable because of a counter reset. +// If counterReset is true, okToAppend is always false. +func (a *RecodingAppender) AppendFloatHistogram(t int64, fh *histogram.FloatHistogram) (okToAppend, counterReset bool) { + app, ok := a.app.(*chunkenc.FloatHistogramAppender) + if !ok { + return false, false + } + + if app.NumSamples() == 0 { + a.app.AppendFloatHistogram(t, fh) + return true, false + } + + var ( + pForwardInserts, nForwardInserts []chunkenc.Insert + pBackwardInserts, nBackwardInserts []chunkenc.Insert + pMergedSpans, nMergedSpans []histogram.Span + ) + switch fh.CounterResetHint { + case histogram.GaugeType: + pForwardInserts, nForwardInserts, + pBackwardInserts, nBackwardInserts, + pMergedSpans, nMergedSpans, + okToAppend = app.AppendableGauge(fh) + default: + pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh) + } + + if !okToAppend || counterReset { + return false, counterReset + } + + if len(pBackwardInserts)+len(nBackwardInserts) > 0 { + fh.PositiveSpans = pMergedSpans + fh.NegativeSpans = nMergedSpans + app.RecodeHistogramm(fh, pBackwardInserts, nBackwardInserts) + } + + if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { + chunk, app := app.Recode( + pForwardInserts, nForwardInserts, + fh.PositiveSpans, fh.NegativeSpans, + ) + *a.chk = chunk + a.app = app + } + + a.app.AppendFloatHistogram(t, fh) + return true, counterReset +} diff --git a/storage/series_test.go b/storage/series_test.go index 210a68e283..4c318f1a0e 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -14,12 +14,17 @@ package storage import ( + "fmt" + "math" "testing" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/tsdbutil" ) @@ -119,3 +124,284 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) { } } } + +type histogramTest struct { + samples []tsdbutil.Sample + expectedChunks int + expectedCounterReset bool +} + +func TestHistogramSeriesToChunks(t *testing.T) { + h1 := &histogram.Histogram{ + Count: 3, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + }, + PositiveBuckets: []int64{2, 1}, // Abs: 2, 3 + } + // Appendable to h1. + h2 := &histogram.Histogram{ + Count: 12, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{2, 1, -2, 3}, // Abs: 2, 3, 1, 4 + } + // Implicit counter reset by reduction in buckets, not appendable. + h2down := &histogram.Histogram{ + Count: 8, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 3}, // Abs: 1, 2, 1, 4 + } + + fh1 := &histogram.FloatHistogram{ + Count: 4, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + }, + PositiveBuckets: []float64{3, 1}, + } + // Appendable to fh1. + fh2 := &histogram.FloatHistogram{ + Count: 15, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{4, 2, 7, 2}, + } + // Implicit counter reset by reduction in buckets, not appendable. + fh2down := &histogram.FloatHistogram{ + Count: 13, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{2, 2, 7, 2}, + } + + staleHistogram := &histogram.Histogram{ + Sum: math.Float64frombits(value.StaleNaN), + } + staleFloatHistogram := &histogram.FloatHistogram{ + Sum: math.Float64frombits(value.StaleNaN), + } + + tests := map[string]histogramTest{ + "single histogram to single chunk": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: h1}, + }, + expectedChunks: 1, + }, + "two histograms encoded to a single chunk": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: h1}, + hSample{t: 2, h: h2}, + }, + expectedChunks: 1, + }, + "two histograms encoded to two chunks": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: h2}, + hSample{t: 2, h: h1}, + }, + expectedChunks: 2, + expectedCounterReset: true, + }, + "histogram and stale sample encoded to two chunks": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: staleHistogram}, + hSample{t: 2, h: h1}, + }, + expectedChunks: 2, + }, + "histogram and reduction in bucket encoded to two chunks": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: h1}, + hSample{t: 2, h: h2down}, + }, + expectedChunks: 2, + expectedCounterReset: true, + }, + // Float histograms. + "single float histogram to single chunk": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: fh1}, + }, + expectedChunks: 1, + }, + "two float histograms encoded to a single chunk": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: fh1}, + fhSample{t: 2, fh: fh2}, + }, + expectedChunks: 1, + }, + "two float histograms encoded to two chunks": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: fh2}, + fhSample{t: 2, fh: fh1}, + }, + expectedChunks: 2, + expectedCounterReset: true, + }, + "float histogram and stale sample encoded to two chunks": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: staleFloatHistogram}, + fhSample{t: 2, fh: fh1}, + }, + expectedChunks: 2, + }, + "float histogram and reduction in bucket encoded to two chunks": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: fh1}, + fhSample{t: 2, fh: fh2down}, + }, + expectedChunks: 2, + expectedCounterReset: true, + }, + // Mixed. + "histogram and float histogram encoded to two chunks": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: h1}, + fhSample{t: 2, fh: fh2}, + }, + expectedChunks: 2, + }, + "float histogram and histogram encoded to two chunks": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: fh1}, + hSample{t: 2, h: h2}, + }, + expectedChunks: 2, + }, + "histogram and stale float histogram encoded to two chunks": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: h1}, + fhSample{t: 2, fh: staleFloatHistogram}, + }, + expectedChunks: 2, + }, + } + + for testName, test := range tests { + t.Run(testName, func(t *testing.T) { + testHistogramsSeriesToChunks(t, test) + }) + } +} + +func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { + lbs := labels.FromStrings("__name__", "up", "instance", "localhost:8080") + series := NewListSeries(lbs, test.samples) + encoder := NewSeriesToChunkEncoder(series) + require.EqualValues(t, lbs, encoder.Labels()) + + chks, err := ExpandChunks(encoder.Iterator(nil)) + require.NoError(t, err) + require.Equal(t, test.expectedChunks, len(chks)) + + // Decode all encoded samples and assert they are equal to the original ones. + encodedSamples := expandHistogramSamples(chks) + require.Equal(t, len(test.samples), len(encodedSamples)) + + for i, s := range test.samples { + switch expectedSample := s.(type) { + case hSample: + encodedSample, ok := encodedSamples[i].(hSample) + require.True(t, ok, "expect histogram", fmt.Sprintf("at idx %d", i)) + // Ignore counter reset here, will check on chunk level. + encodedSample.h.CounterResetHint = histogram.UnknownCounterReset + if value.IsStaleNaN(expectedSample.h.Sum) { + require.True(t, value.IsStaleNaN(encodedSample.h.Sum), fmt.Sprintf("at idx %d", i)) + continue + } + require.Equal(t, *expectedSample.h, *encodedSample.h.Compact(0), fmt.Sprintf("at idx %d", i)) + case fhSample: + encodedSample, ok := encodedSamples[i].(fhSample) + require.True(t, ok, "expect float histogram", fmt.Sprintf("at idx %d", i)) + // Ignore counter reset here, will check on chunk level. + encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset + if value.IsStaleNaN(expectedSample.fh.Sum) { + require.True(t, value.IsStaleNaN(encodedSample.fh.Sum), fmt.Sprintf("at idx %d", i)) + continue + } + require.Equal(t, *expectedSample.fh, *encodedSample.fh.Compact(0), fmt.Sprintf("at idx %d", i)) + default: + t.Error("internal error, unexpected type") + } + } + + // If a counter reset hint is expected, it can only be found in the second chunk. + // Otherwise, we assert an unknown counter reset hint in all chunks. + if test.expectedCounterReset { + require.Equal(t, chunkenc.UnknownCounterReset, getCounterResetHint(chks[0])) + require.Equal(t, chunkenc.CounterReset, getCounterResetHint(chks[1])) + } else { + for _, chk := range chks { + require.Equal(t, chunkenc.UnknownCounterReset, getCounterResetHint(chk)) + } + } +} + +func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) { + if len(chunks) == 0 { + return + } + + for _, chunk := range chunks { + it := chunk.Chunk.Iterator(nil) + for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { + switch vt { + case chunkenc.ValHistogram: + t, h := it.AtHistogram() + result = append(result, hSample{t: t, h: h}) + case chunkenc.ValFloatHistogram: + t, fh := it.AtFloatHistogram() + result = append(result, fhSample{t: t, fh: fh}) + default: + panic("unexpected value type") + } + } + } + return +} + +func getCounterResetHint(chunk chunks.Meta) chunkenc.CounterResetHeader { + switch chk := chunk.Chunk.(type) { + case *chunkenc.HistogramChunk: + return chk.GetCounterResetHeader() + case *chunkenc.FloatHistogramChunk: + return chk.GetCounterResetHeader() + } + return chunkenc.UnknownCounterReset +}