mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Fix encoding samples in ChunkSeries (#12185)
The storage.ChunkSeries iterator assumes that a histogram sample can always be appended to the currently open chunk. This is not the case when there is a counter reset, or when appending a stale sample to a chunk with non-stale samples. In addition, the open chunk sometimes needs to be recoded before a sample can be appended. This commit addresses the issue by implementing a RecodingAppender which can recode incoming samples in a transparent way. It also detects cases when a sample cannot be appended at all and returns `false` so that the caller can open a new chunk. Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
24d7e5bd49
commit
0d049feac7
|
@ -281,7 +281,7 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries {
|
||||||
func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
||||||
var (
|
var (
|
||||||
chk chunkenc.Chunk
|
chk chunkenc.Chunk
|
||||||
app chunkenc.Appender
|
app *RecodingAppender
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
mint := int64(math.MaxInt64)
|
mint := int64(math.MaxInt64)
|
||||||
|
@ -299,21 +299,16 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
||||||
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
|
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
|
||||||
if typ != lastType || i >= seriesToChunkEncoderSplit {
|
if typ != lastType || i >= seriesToChunkEncoderSplit {
|
||||||
// Create a new chunk if the sample type changed or too many samples in the current one.
|
// Create a new chunk if the sample type changed or too many samples in the current one.
|
||||||
if chk != nil {
|
chks = appendChunk(chks, mint, maxt, chk)
|
||||||
chks = append(chks, chunks.Meta{
|
|
||||||
MinTime: mint,
|
|
||||||
MaxTime: maxt,
|
|
||||||
Chunk: chk,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
|
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errChunksIterator{err: err}
|
return errChunksIterator{err: err}
|
||||||
}
|
}
|
||||||
app, err = chk.Appender()
|
chkAppender, err := chk.Appender()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errChunksIterator{err: err}
|
return errChunksIterator{err: err}
|
||||||
}
|
}
|
||||||
|
app = NewRecodingAppender(&chk, chkAppender)
|
||||||
mint = int64(math.MaxInt64)
|
mint = int64(math.MaxInt64)
|
||||||
// maxt is immediately overwritten below which is why setting it here won't make a difference.
|
// maxt is immediately overwritten below which is why setting it here won't make a difference.
|
||||||
i = 0
|
i = 0
|
||||||
|
@ -332,10 +327,45 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
||||||
app.Append(t, v)
|
app.Append(t, v)
|
||||||
case chunkenc.ValHistogram:
|
case chunkenc.ValHistogram:
|
||||||
t, h = seriesIter.AtHistogram()
|
t, h = seriesIter.AtHistogram()
|
||||||
app.AppendHistogram(t, h)
|
if ok, counterReset := app.AppendHistogram(t, h); !ok {
|
||||||
|
chks = appendChunk(chks, mint, maxt, chk)
|
||||||
|
histChunk := chunkenc.NewHistogramChunk()
|
||||||
|
if counterReset {
|
||||||
|
histChunk.SetCounterResetHeader(chunkenc.CounterReset)
|
||||||
|
}
|
||||||
|
chk = histChunk
|
||||||
|
|
||||||
|
chkAppender, err := chk.Appender()
|
||||||
|
if err != nil {
|
||||||
|
return errChunksIterator{err: err}
|
||||||
|
}
|
||||||
|
mint = int64(math.MaxInt64)
|
||||||
|
i = 0
|
||||||
|
app = NewRecodingAppender(&chk, chkAppender)
|
||||||
|
if ok, _ := app.AppendHistogram(t, h); !ok {
|
||||||
|
panic("unexpected error while appending histogram")
|
||||||
|
}
|
||||||
|
}
|
||||||
case chunkenc.ValFloatHistogram:
|
case chunkenc.ValFloatHistogram:
|
||||||
t, fh = seriesIter.AtFloatHistogram()
|
t, fh = seriesIter.AtFloatHistogram()
|
||||||
app.AppendFloatHistogram(t, fh)
|
if ok, counterReset := app.AppendFloatHistogram(t, fh); !ok {
|
||||||
|
chks = appendChunk(chks, mint, maxt, chk)
|
||||||
|
floatHistChunk := chunkenc.NewFloatHistogramChunk()
|
||||||
|
if counterReset {
|
||||||
|
floatHistChunk.SetCounterResetHeader(chunkenc.CounterReset)
|
||||||
|
}
|
||||||
|
chk = floatHistChunk
|
||||||
|
chkAppender, err := chk.Appender()
|
||||||
|
if err != nil {
|
||||||
|
return errChunksIterator{err: err}
|
||||||
|
}
|
||||||
|
mint = int64(math.MaxInt64)
|
||||||
|
i = 0
|
||||||
|
app = NewRecodingAppender(&chk, chkAppender)
|
||||||
|
if ok, _ := app.AppendFloatHistogram(t, fh); !ok {
|
||||||
|
panic("unexpected error while float appending histogram")
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())}
|
return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())}
|
||||||
}
|
}
|
||||||
|
@ -350,6 +380,16 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
||||||
return errChunksIterator{err: err}
|
return errChunksIterator{err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chks = appendChunk(chks, mint, maxt, chk)
|
||||||
|
|
||||||
|
if existing {
|
||||||
|
lcsi.Reset(chks...)
|
||||||
|
return lcsi
|
||||||
|
}
|
||||||
|
return NewListChunkSeriesIterator(chks...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta {
|
||||||
if chk != nil {
|
if chk != nil {
|
||||||
chks = append(chks, chunks.Meta{
|
chks = append(chks, chunks.Meta{
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
|
@ -357,12 +397,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
||||||
Chunk: chk,
|
Chunk: chk,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
return chks
|
||||||
if existing {
|
|
||||||
lcsi.Reset(chks...)
|
|
||||||
return lcsi
|
|
||||||
}
|
|
||||||
return NewListChunkSeriesIterator(chks...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type errChunksIterator struct {
|
type errChunksIterator struct {
|
||||||
|
@ -420,3 +455,126 @@ func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) {
|
||||||
}
|
}
|
||||||
return result, iter.Err()
|
return result, iter.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecodingAppender is a tsdb.Appender that recodes histogram samples if needed during appends.
|
||||||
|
// It takes an existing appender and a chunk to which samples are appended.
|
||||||
|
type RecodingAppender struct {
|
||||||
|
chk *chunkenc.Chunk
|
||||||
|
app chunkenc.Appender
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRecodingAppender(chk *chunkenc.Chunk, app chunkenc.Appender) *RecodingAppender {
|
||||||
|
return &RecodingAppender{
|
||||||
|
chk: chk,
|
||||||
|
app: app,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append appends a float sample to the appender.
|
||||||
|
func (a *RecodingAppender) Append(t int64, v float64) {
|
||||||
|
a.app.Append(t, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendHistogram appends a histogram sample to the underlying chunk.
|
||||||
|
// The method returns false if the sample cannot be appended and a boolean value set to true
|
||||||
|
// when it is not appendable because of a counter reset.
|
||||||
|
// If counterReset is true, okToAppend is always false.
|
||||||
|
func (a *RecodingAppender) AppendHistogram(t int64, h *histogram.Histogram) (okToAppend, counterReset bool) {
|
||||||
|
app, ok := a.app.(*chunkenc.HistogramAppender)
|
||||||
|
if !ok {
|
||||||
|
return false, false
|
||||||
|
}
|
||||||
|
|
||||||
|
if app.NumSamples() == 0 {
|
||||||
|
a.app.AppendHistogram(t, h)
|
||||||
|
return true, false
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
pForwardInserts, nForwardInserts []chunkenc.Insert
|
||||||
|
pBackwardInserts, nBackwardInserts []chunkenc.Insert
|
||||||
|
pMergedSpans, nMergedSpans []histogram.Span
|
||||||
|
)
|
||||||
|
switch h.CounterResetHint {
|
||||||
|
case histogram.GaugeType:
|
||||||
|
pForwardInserts, nForwardInserts,
|
||||||
|
pBackwardInserts, nBackwardInserts,
|
||||||
|
pMergedSpans, nMergedSpans,
|
||||||
|
okToAppend = app.AppendableGauge(h)
|
||||||
|
default:
|
||||||
|
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h)
|
||||||
|
}
|
||||||
|
if !okToAppend || counterReset {
|
||||||
|
return false, counterReset
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
|
||||||
|
h.PositiveSpans = pMergedSpans
|
||||||
|
h.NegativeSpans = nMergedSpans
|
||||||
|
app.RecodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||||
|
}
|
||||||
|
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
||||||
|
chk, app := app.Recode(
|
||||||
|
pForwardInserts, nForwardInserts,
|
||||||
|
h.PositiveSpans, h.NegativeSpans,
|
||||||
|
)
|
||||||
|
*a.chk = chk
|
||||||
|
a.app = app
|
||||||
|
}
|
||||||
|
|
||||||
|
a.app.AppendHistogram(t, h)
|
||||||
|
return true, counterReset
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendFloatHistogram appends a float histogram sample to the underlying chunk.
|
||||||
|
// The method returns false if the sample cannot be appended and a boolean value set to true
|
||||||
|
// when it is not appendable because of a counter reset.
|
||||||
|
// If counterReset is true, okToAppend is always false.
|
||||||
|
func (a *RecodingAppender) AppendFloatHistogram(t int64, fh *histogram.FloatHistogram) (okToAppend, counterReset bool) {
|
||||||
|
app, ok := a.app.(*chunkenc.FloatHistogramAppender)
|
||||||
|
if !ok {
|
||||||
|
return false, false
|
||||||
|
}
|
||||||
|
|
||||||
|
if app.NumSamples() == 0 {
|
||||||
|
a.app.AppendFloatHistogram(t, fh)
|
||||||
|
return true, false
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
pForwardInserts, nForwardInserts []chunkenc.Insert
|
||||||
|
pBackwardInserts, nBackwardInserts []chunkenc.Insert
|
||||||
|
pMergedSpans, nMergedSpans []histogram.Span
|
||||||
|
)
|
||||||
|
switch fh.CounterResetHint {
|
||||||
|
case histogram.GaugeType:
|
||||||
|
pForwardInserts, nForwardInserts,
|
||||||
|
pBackwardInserts, nBackwardInserts,
|
||||||
|
pMergedSpans, nMergedSpans,
|
||||||
|
okToAppend = app.AppendableGauge(fh)
|
||||||
|
default:
|
||||||
|
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !okToAppend || counterReset {
|
||||||
|
return false, counterReset
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
|
||||||
|
fh.PositiveSpans = pMergedSpans
|
||||||
|
fh.NegativeSpans = nMergedSpans
|
||||||
|
app.RecodeHistogramm(fh, pBackwardInserts, nBackwardInserts)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
||||||
|
chunk, app := app.Recode(
|
||||||
|
pForwardInserts, nForwardInserts,
|
||||||
|
fh.PositiveSpans, fh.NegativeSpans,
|
||||||
|
)
|
||||||
|
*a.chk = chunk
|
||||||
|
a.app = app
|
||||||
|
}
|
||||||
|
|
||||||
|
a.app.AppendFloatHistogram(t, fh)
|
||||||
|
return true, counterReset
|
||||||
|
}
|
||||||
|
|
|
@ -14,12 +14,17 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/value"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -119,3 +124,284 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type histogramTest struct {
|
||||||
|
samples []tsdbutil.Sample
|
||||||
|
expectedChunks int
|
||||||
|
expectedCounterReset bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHistogramSeriesToChunks(t *testing.T) {
|
||||||
|
h1 := &histogram.Histogram{
|
||||||
|
Count: 3,
|
||||||
|
ZeroCount: 2,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 100,
|
||||||
|
Schema: 0,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{2, 1}, // Abs: 2, 3
|
||||||
|
}
|
||||||
|
// Appendable to h1.
|
||||||
|
h2 := &histogram.Histogram{
|
||||||
|
Count: 12,
|
||||||
|
ZeroCount: 2,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 100,
|
||||||
|
Schema: 0,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{2, 1, -2, 3}, // Abs: 2, 3, 1, 4
|
||||||
|
}
|
||||||
|
// Implicit counter reset by reduction in buckets, not appendable.
|
||||||
|
h2down := &histogram.Histogram{
|
||||||
|
Count: 8,
|
||||||
|
ZeroCount: 2,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 100,
|
||||||
|
Schema: 0,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{1, 1, -1, 3}, // Abs: 1, 2, 1, 4
|
||||||
|
}
|
||||||
|
|
||||||
|
fh1 := &histogram.FloatHistogram{
|
||||||
|
Count: 4,
|
||||||
|
ZeroCount: 2,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 100,
|
||||||
|
Schema: 0,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []float64{3, 1},
|
||||||
|
}
|
||||||
|
// Appendable to fh1.
|
||||||
|
fh2 := &histogram.FloatHistogram{
|
||||||
|
Count: 15,
|
||||||
|
ZeroCount: 2,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 100,
|
||||||
|
Schema: 0,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []float64{4, 2, 7, 2},
|
||||||
|
}
|
||||||
|
// Implicit counter reset by reduction in buckets, not appendable.
|
||||||
|
fh2down := &histogram.FloatHistogram{
|
||||||
|
Count: 13,
|
||||||
|
ZeroCount: 2,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 100,
|
||||||
|
Schema: 0,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []float64{2, 2, 7, 2},
|
||||||
|
}
|
||||||
|
|
||||||
|
staleHistogram := &histogram.Histogram{
|
||||||
|
Sum: math.Float64frombits(value.StaleNaN),
|
||||||
|
}
|
||||||
|
staleFloatHistogram := &histogram.FloatHistogram{
|
||||||
|
Sum: math.Float64frombits(value.StaleNaN),
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := map[string]histogramTest{
|
||||||
|
"single histogram to single chunk": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
hSample{t: 1, h: h1},
|
||||||
|
},
|
||||||
|
expectedChunks: 1,
|
||||||
|
},
|
||||||
|
"two histograms encoded to a single chunk": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
hSample{t: 1, h: h1},
|
||||||
|
hSample{t: 2, h: h2},
|
||||||
|
},
|
||||||
|
expectedChunks: 1,
|
||||||
|
},
|
||||||
|
"two histograms encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
hSample{t: 1, h: h2},
|
||||||
|
hSample{t: 2, h: h1},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
expectedCounterReset: true,
|
||||||
|
},
|
||||||
|
"histogram and stale sample encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
hSample{t: 1, h: staleHistogram},
|
||||||
|
hSample{t: 2, h: h1},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
},
|
||||||
|
"histogram and reduction in bucket encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
hSample{t: 1, h: h1},
|
||||||
|
hSample{t: 2, h: h2down},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
expectedCounterReset: true,
|
||||||
|
},
|
||||||
|
// Float histograms.
|
||||||
|
"single float histogram to single chunk": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
fhSample{t: 1, fh: fh1},
|
||||||
|
},
|
||||||
|
expectedChunks: 1,
|
||||||
|
},
|
||||||
|
"two float histograms encoded to a single chunk": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
fhSample{t: 1, fh: fh1},
|
||||||
|
fhSample{t: 2, fh: fh2},
|
||||||
|
},
|
||||||
|
expectedChunks: 1,
|
||||||
|
},
|
||||||
|
"two float histograms encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
fhSample{t: 1, fh: fh2},
|
||||||
|
fhSample{t: 2, fh: fh1},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
expectedCounterReset: true,
|
||||||
|
},
|
||||||
|
"float histogram and stale sample encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
fhSample{t: 1, fh: staleFloatHistogram},
|
||||||
|
fhSample{t: 2, fh: fh1},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
},
|
||||||
|
"float histogram and reduction in bucket encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
fhSample{t: 1, fh: fh1},
|
||||||
|
fhSample{t: 2, fh: fh2down},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
expectedCounterReset: true,
|
||||||
|
},
|
||||||
|
// Mixed.
|
||||||
|
"histogram and float histogram encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
hSample{t: 1, h: h1},
|
||||||
|
fhSample{t: 2, fh: fh2},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
},
|
||||||
|
"float histogram and histogram encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
fhSample{t: 1, fh: fh1},
|
||||||
|
hSample{t: 2, h: h2},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
},
|
||||||
|
"histogram and stale float histogram encoded to two chunks": {
|
||||||
|
samples: []tsdbutil.Sample{
|
||||||
|
hSample{t: 1, h: h1},
|
||||||
|
fhSample{t: 2, fh: staleFloatHistogram},
|
||||||
|
},
|
||||||
|
expectedChunks: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for testName, test := range tests {
|
||||||
|
t.Run(testName, func(t *testing.T) {
|
||||||
|
testHistogramsSeriesToChunks(t, test)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
|
||||||
|
lbs := labels.FromStrings("__name__", "up", "instance", "localhost:8080")
|
||||||
|
series := NewListSeries(lbs, test.samples)
|
||||||
|
encoder := NewSeriesToChunkEncoder(series)
|
||||||
|
require.EqualValues(t, lbs, encoder.Labels())
|
||||||
|
|
||||||
|
chks, err := ExpandChunks(encoder.Iterator(nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, test.expectedChunks, len(chks))
|
||||||
|
|
||||||
|
// Decode all encoded samples and assert they are equal to the original ones.
|
||||||
|
encodedSamples := expandHistogramSamples(chks)
|
||||||
|
require.Equal(t, len(test.samples), len(encodedSamples))
|
||||||
|
|
||||||
|
for i, s := range test.samples {
|
||||||
|
switch expectedSample := s.(type) {
|
||||||
|
case hSample:
|
||||||
|
encodedSample, ok := encodedSamples[i].(hSample)
|
||||||
|
require.True(t, ok, "expect histogram", fmt.Sprintf("at idx %d", i))
|
||||||
|
// Ignore counter reset here, will check on chunk level.
|
||||||
|
encodedSample.h.CounterResetHint = histogram.UnknownCounterReset
|
||||||
|
if value.IsStaleNaN(expectedSample.h.Sum) {
|
||||||
|
require.True(t, value.IsStaleNaN(encodedSample.h.Sum), fmt.Sprintf("at idx %d", i))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
require.Equal(t, *expectedSample.h, *encodedSample.h.Compact(0), fmt.Sprintf("at idx %d", i))
|
||||||
|
case fhSample:
|
||||||
|
encodedSample, ok := encodedSamples[i].(fhSample)
|
||||||
|
require.True(t, ok, "expect float histogram", fmt.Sprintf("at idx %d", i))
|
||||||
|
// Ignore counter reset here, will check on chunk level.
|
||||||
|
encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset
|
||||||
|
if value.IsStaleNaN(expectedSample.fh.Sum) {
|
||||||
|
require.True(t, value.IsStaleNaN(encodedSample.fh.Sum), fmt.Sprintf("at idx %d", i))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
require.Equal(t, *expectedSample.fh, *encodedSample.fh.Compact(0), fmt.Sprintf("at idx %d", i))
|
||||||
|
default:
|
||||||
|
t.Error("internal error, unexpected type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a counter reset hint is expected, it can only be found in the second chunk.
|
||||||
|
// Otherwise, we assert an unknown counter reset hint in all chunks.
|
||||||
|
if test.expectedCounterReset {
|
||||||
|
require.Equal(t, chunkenc.UnknownCounterReset, getCounterResetHint(chks[0]))
|
||||||
|
require.Equal(t, chunkenc.CounterReset, getCounterResetHint(chks[1]))
|
||||||
|
} else {
|
||||||
|
for _, chk := range chks {
|
||||||
|
require.Equal(t, chunkenc.UnknownCounterReset, getCounterResetHint(chk))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) {
|
||||||
|
if len(chunks) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
it := chunk.Chunk.Iterator(nil)
|
||||||
|
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
|
||||||
|
switch vt {
|
||||||
|
case chunkenc.ValHistogram:
|
||||||
|
t, h := it.AtHistogram()
|
||||||
|
result = append(result, hSample{t: t, h: h})
|
||||||
|
case chunkenc.ValFloatHistogram:
|
||||||
|
t, fh := it.AtFloatHistogram()
|
||||||
|
result = append(result, fhSample{t: t, fh: fh})
|
||||||
|
default:
|
||||||
|
panic("unexpected value type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCounterResetHint(chunk chunks.Meta) chunkenc.CounterResetHeader {
|
||||||
|
switch chk := chunk.Chunk.(type) {
|
||||||
|
case *chunkenc.HistogramChunk:
|
||||||
|
return chk.GetCounterResetHeader()
|
||||||
|
case *chunkenc.FloatHistogramChunk:
|
||||||
|
return chk.GetCounterResetHeader()
|
||||||
|
}
|
||||||
|
return chunkenc.UnknownCounterReset
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue