Hide histogram chunk append and reset header internals (#12352)

tsdb: Hide histogram chunk append and reset header internals

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Signed-off-by: George Krajcsovits <krajorama@users.noreply.github.com>
This commit is contained in:
George Krajcsovits 2023-07-26 15:08:16 +02:00 committed by GitHub
parent 03e549cc39
commit 6cd2d1621f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 818 additions and 698 deletions

View file

@ -292,9 +292,10 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries {
func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
var (
chk chunkenc.Chunk
app *RecodingAppender
err error
chk, newChk chunkenc.Chunk
app chunkenc.Appender
err error
recoded bool
)
mint := int64(math.MaxInt64)
maxt := int64(math.MinInt64)
@ -309,20 +310,17 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
chunkCreated := false
if typ != lastType || i >= seriesToChunkEncoderSplit {
// Create a new chunk if the sample type changed or too many samples in the current one.
chks = appendChunk(chks, mint, maxt, chk)
chunkCreated = true
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
if err != nil {
return errChunksIterator{err: err}
}
chkAppender, err := chk.Appender()
app, err = chk.Appender()
if err != nil {
return errChunksIterator{err: err}
}
app = NewRecodingAppender(&chk, chkAppender)
mint = int64(math.MaxInt64)
// maxt is immediately overwritten below which is why setting it here won't make a difference.
i = 0
@ -341,52 +339,33 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
app.Append(t, v)
case chunkenc.ValHistogram:
t, h = seriesIter.AtHistogram()
if ok, counterReset := app.AppendHistogram(t, h); !ok {
chks = appendChunk(chks, mint, maxt, chk)
histChunk := chunkenc.NewHistogramChunk()
chunkCreated = true
if counterReset {
histChunk.SetCounterResetHeader(chunkenc.CounterReset)
}
chk = histChunk
chkAppender, err := chk.Appender()
if err != nil {
return errChunksIterator{err: err}
}
mint = int64(math.MaxInt64)
i = 0
app = NewRecodingAppender(&chk, chkAppender)
if ok, _ := app.AppendHistogram(t, h); !ok {
panic("unexpected error while appending histogram")
}
newChk, recoded, app, err = app.AppendHistogram(nil, t, h, false)
if err != nil {
return errChunksIterator{err: err}
}
if chunkCreated && h.CounterResetHint == histogram.GaugeType {
chk.(*chunkenc.HistogramChunk).SetCounterResetHeader(chunkenc.GaugeType)
if newChk != nil {
if !recoded {
chks = appendChunk(chks, mint, maxt, chk)
mint = int64(math.MaxInt64)
// maxt is immediately overwritten below which is why setting it here won't make a difference.
i = 0
}
chk = newChk
}
case chunkenc.ValFloatHistogram:
t, fh = seriesIter.AtFloatHistogram()
if ok, counterReset := app.AppendFloatHistogram(t, fh); !ok {
chks = appendChunk(chks, mint, maxt, chk)
floatHistChunk := chunkenc.NewFloatHistogramChunk()
chunkCreated = true
if counterReset {
floatHistChunk.SetCounterResetHeader(chunkenc.CounterReset)
}
chk = floatHistChunk
chkAppender, err := chk.Appender()
if err != nil {
return errChunksIterator{err: err}
}
mint = int64(math.MaxInt64)
i = 0
app = NewRecodingAppender(&chk, chkAppender)
if ok, _ := app.AppendFloatHistogram(t, fh); !ok {
panic("unexpected error while float appending histogram")
}
newChk, recoded, app, err = app.AppendFloatHistogram(nil, t, fh, false)
if err != nil {
return errChunksIterator{err: err}
}
if chunkCreated && fh.CounterResetHint == histogram.GaugeType {
chk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(chunkenc.GaugeType)
if newChk != nil {
if !recoded {
chks = appendChunk(chks, mint, maxt, chk)
mint = int64(math.MaxInt64)
// maxt is immediately overwritten below which is why setting it here won't make a difference.
i = 0
}
chk = newChk
}
default:
return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())}
@ -477,126 +456,3 @@ func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) {
}
return result, iter.Err()
}
// RecodingAppender is a tsdb.Appender that recodes histogram samples if needed during appends.
// It takes an existing appender and a chunk to which samples are appended.
type RecodingAppender struct {
chk *chunkenc.Chunk
app chunkenc.Appender
}
func NewRecodingAppender(chk *chunkenc.Chunk, app chunkenc.Appender) *RecodingAppender {
return &RecodingAppender{
chk: chk,
app: app,
}
}
// Append appends a float sample to the appender.
func (a *RecodingAppender) Append(t int64, v float64) {
a.app.Append(t, v)
}
// AppendHistogram appends a histogram sample to the underlying chunk.
// The method returns false if the sample cannot be appended and a boolean value set to true
// when it is not appendable because of a counter reset.
// If counterReset is true, okToAppend is always false.
func (a *RecodingAppender) AppendHistogram(t int64, h *histogram.Histogram) (okToAppend, counterReset bool) {
app, ok := a.app.(*chunkenc.HistogramAppender)
if !ok {
return false, false
}
if app.NumSamples() == 0 {
a.app.AppendHistogram(t, h)
return true, false
}
var (
pForwardInserts, nForwardInserts []chunkenc.Insert
pBackwardInserts, nBackwardInserts []chunkenc.Insert
pMergedSpans, nMergedSpans []histogram.Span
)
switch h.CounterResetHint {
case histogram.GaugeType:
pForwardInserts, nForwardInserts,
pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans,
okToAppend = app.AppendableGauge(h)
default:
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h)
}
if !okToAppend || counterReset {
return false, counterReset
}
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
h.PositiveSpans = pMergedSpans
h.NegativeSpans = nMergedSpans
app.RecodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
chk, app := app.Recode(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
*a.chk = chk
a.app = app
}
a.app.AppendHistogram(t, h)
return true, counterReset
}
// AppendFloatHistogram appends a float histogram sample to the underlying chunk.
// The method returns false if the sample cannot be appended and a boolean value set to true
// when it is not appendable because of a counter reset.
// If counterReset is true, okToAppend is always false.
func (a *RecodingAppender) AppendFloatHistogram(t int64, fh *histogram.FloatHistogram) (okToAppend, counterReset bool) {
app, ok := a.app.(*chunkenc.FloatHistogramAppender)
if !ok {
return false, false
}
if app.NumSamples() == 0 {
a.app.AppendFloatHistogram(t, fh)
return true, false
}
var (
pForwardInserts, nForwardInserts []chunkenc.Insert
pBackwardInserts, nBackwardInserts []chunkenc.Insert
pMergedSpans, nMergedSpans []histogram.Span
)
switch fh.CounterResetHint {
case histogram.GaugeType:
pForwardInserts, nForwardInserts,
pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans,
okToAppend = app.AppendableGauge(fh)
default:
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh)
}
if !okToAppend || counterReset {
return false, counterReset
}
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
fh.PositiveSpans = pMergedSpans
fh.NegativeSpans = nMergedSpans
app.RecodeHistogramm(fh, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
chunk, app := app.Recode(
pForwardInserts, nForwardInserts,
fh.PositiveSpans, fh.NegativeSpans,
)
*a.chk = chunk
a.app = app
}
a.app.AppendFloatHistogram(t, fh)
return true, counterReset
}

View file

@ -82,8 +82,20 @@ type Chunk interface {
// Appender adds sample pairs to a chunk.
type Appender interface {
Append(int64, float64)
AppendHistogram(t int64, h *histogram.Histogram)
AppendFloatHistogram(t int64, h *histogram.FloatHistogram)
// AppendHistogram and AppendFloatHistogram append a histogram sample to a histogram or float histogram chunk.
// Appending a histogram may require creating a completely new chunk or recoding (changing) the current chunk.
// The Appender prev is used to determine if there is a counter reset between the previous Appender and the current Appender.
// The Appender prev is optional and only taken into account when the first sample is being appended.
// The bool appendOnly governs what happens when a sample cannot be appended to the current chunk. If appendOnly is true, then
// in such case an error is returned without modifying the chunk. If appendOnly is false, then a new chunk is created or the
// current chunk is recoded to accommodate the sample.
// The returned Chunk c is nil if sample could be appended to the current Chunk, otherwise c is the new Chunk.
// The returned bool isRecoded can be used to distinguish between the new Chunk c being a completely new Chunk
// or the current Chunk recoded to a new Chunk.
// The Appender app that can be used for the next append is always returned.
AppendHistogram(prev *HistogramAppender, t int64, h *histogram.Histogram, appendOny bool) (c Chunk, isRecoded bool, app Appender, err error)
AppendFloatHistogram(prev *FloatHistogramAppender, t int64, h *histogram.FloatHistogram, appendOnly bool) (c Chunk, isRecoded bool, app Appender, err error)
}
// Iterator is a simple iterator that can only get the next value.

View file

@ -15,6 +15,7 @@ package chunkenc
import (
"encoding/binary"
"fmt"
"math"
"github.com/prometheus/prometheus/model/histogram"
@ -80,15 +81,10 @@ func (c *FloatHistogramChunk) Layout() (
return readHistogramChunkLayout(&b)
}
// SetCounterResetHeader sets the counter reset header.
func (c *FloatHistogramChunk) SetCounterResetHeader(h CounterResetHeader) {
setCounterResetHeader(h, c.Bytes())
}
// GetCounterResetHeader returns the info about the first 2 bits of the chunk
// header.
func (c *FloatHistogramChunk) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(c.Bytes()[2] & 0b11000000)
return CounterResetHeader(c.Bytes()[2] & CounterResetHeaderMask)
}
// Compact implements the Chunk interface.
@ -174,7 +170,7 @@ func newFloatHistogramIterator(b []byte) *floatHistogramIterator {
// The first 3 bytes contain chunk headers.
// We skip that for actual samples.
_, _ = it.br.readBits(24)
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
it.counterResetHeader = CounterResetHeader(b[2] & CounterResetHeaderMask)
return it
}
@ -198,7 +194,11 @@ type FloatHistogramAppender struct {
}
func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(a.b.bytes()[2] & 0b11000000)
return CounterResetHeader(a.b.bytes()[2] & CounterResetHeaderMask)
}
func (a *FloatHistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
a.b.bytes()[2] = (a.b.bytes()[2] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
}
func (a *FloatHistogramAppender) NumSamples() int {
@ -211,13 +211,7 @@ func (a *FloatHistogramAppender) Append(int64, float64) {
panic("appended a float sample to a histogram chunk")
}
// AppendHistogram implements Appender. This implementation panics because integer
// histogram samples must never be appended to a float histogram chunk.
func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) {
panic("appended an integer histogram to a float histogram chunk")
}
// Appendable returns whether the chunk can be appended to, and if so whether
// appendable returns whether the chunk can be appended to, and if so whether
// any recoding needs to happen using the provided inserts (in case of any new
// buckets, positive or negative range, respectively). If the sample is a gauge
// histogram, AppendableGauge must be used instead.
@ -232,7 +226,7 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) {
// The method returns an additional boolean set to true if it is not appendable
// because of a counter reset. If the given sample is stale, it is always ok to
// append. If counterReset is true, okToAppend is always false.
func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) (
positiveInserts, negativeInserts []Insert,
okToAppend, counterReset bool,
) {
@ -288,7 +282,7 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
return
}
// AppendableGauge returns whether the chunk can be appended to, and if so
// appendableGauge returns whether the chunk can be appended to, and if so
// whether:
// 1. Any recoding needs to happen to the chunk using the provided inserts
// (in case of any new buckets, positive or negative range, respectively).
@ -302,7 +296,7 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
// - The schema has changed.
// - The threshold for the zero bucket has changed.
// - The last sample in the chunk was stale while the current sample is not stale.
func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) (
func (a *FloatHistogramAppender) appendableGauge(h *histogram.FloatHistogram) (
positiveInserts, negativeInserts []Insert,
backwardPositiveInserts, backwardNegativeInserts []Insert,
positiveSpans, negativeSpans []histogram.Span,
@ -399,11 +393,11 @@ func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, o
return false
}
// AppendFloatHistogram appends a float histogram to the chunk. The caller must ensure that
// appendFloatHistogram appends a float histogram to the chunk. The caller must ensure that
// the histogram is properly structured, e.g. the number of buckets used
// corresponds to the number conveyed by the span structures. First call
// Appendable() and act accordingly!
func (a *FloatHistogramAppender) AppendFloatHistogram(t int64, h *histogram.FloatHistogram) {
func (a *FloatHistogramAppender) appendFloatHistogram(t int64, h *histogram.FloatHistogram) {
var tDelta int64
num := binary.BigEndian.Uint16(a.b.bytes())
@ -501,12 +495,12 @@ func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) {
old.value = v
}
// Recode converts the current chunk to accommodate an expansion of the set of
// recode converts the current chunk to accommodate an expansion of the set of
// (positive and/or negative) buckets used, according to the provided inserts,
// resulting in the honoring of the provided new positive and negative spans. To
// continue appending, use the returned Appender rather than the receiver of
// this method.
func (a *FloatHistogramAppender) Recode(
func (a *FloatHistogramAppender) recode(
positiveInserts, negativeInserts []Insert,
positiveSpans, negativeSpans []histogram.Span,
) (Chunk, Appender) {
@ -519,8 +513,9 @@ func (a *FloatHistogramAppender) Recode(
hc := NewFloatHistogramChunk()
app, err := hc.Appender()
if err != nil {
panic(err)
panic(err) // This should never happen for an empty float histogram chunk.
}
happ := app.(*FloatHistogramAppender)
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans)
for it.Next() == ValFloatHistogram {
@ -546,16 +541,16 @@ func (a *FloatHistogramAppender) Recode(
if len(negativeInserts) > 0 {
hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, false)
}
app.AppendFloatHistogram(tOld, hOld)
happ.appendFloatHistogram(tOld, hOld)
}
hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000))
happ.setCounterResetHeader(CounterResetHeader(byts[2] & CounterResetHeaderMask))
return hc, app
}
// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of
// recodeHistogram converts the current histogram (in-place) to accommodate an expansion of the set of
// (positive and/or negative) buckets used.
func (a *FloatHistogramAppender) RecodeHistogramm(
func (a *FloatHistogramAppender) recodeHistogram(
fh *histogram.FloatHistogram,
pBackwardInter, nBackwardInter []Insert,
) {
@ -569,6 +564,111 @@ func (a *FloatHistogramAppender) RecodeHistogramm(
}
}
func (a *FloatHistogramAppender) AppendHistogram(*HistogramAppender, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
panic("appended a histogram sample to a float histogram chunk")
}
func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppender, t int64, h *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) {
if a.NumSamples() == 0 {
a.appendFloatHistogram(t, h)
if h.CounterResetHint == histogram.GaugeType {
a.setCounterResetHeader(GaugeType)
return nil, false, a, nil
}
if prev != nil && h.CounterResetHint != histogram.CounterReset {
// This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set.
_, _, _, counterReset := prev.appendable(h)
if counterReset {
a.setCounterResetHeader(CounterReset)
} else {
a.setCounterResetHeader(NotCounterReset)
}
} else {
// Honor the explicit counter reset hint.
a.setCounterResetHeader(CounterResetHeader(h.CounterResetHint))
}
return nil, false, a, nil
}
// Adding counter-like histogram.
if h.CounterResetHint != histogram.GaugeType {
pForwardInserts, nForwardInserts, okToAppend, counterReset := a.appendable(h)
if !okToAppend || counterReset {
if appendOnly {
if !okToAppend {
return nil, false, a, fmt.Errorf("float histogram schema change")
}
return nil, false, a, fmt.Errorf("float histogram counter reset")
}
newChunk := NewFloatHistogramChunk()
app, err := newChunk.Appender()
if err != nil {
panic(err) // This should never happen for an empty float histogram chunk.
}
happ := app.(*FloatHistogramAppender)
if counterReset {
happ.setCounterResetHeader(CounterReset)
}
happ.appendFloatHistogram(t, h)
return newChunk, false, app, nil
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("float histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
}
chk, app := a.recode(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
app.(*FloatHistogramAppender).appendFloatHistogram(t, h)
return chk, true, app, nil
}
a.appendFloatHistogram(t, h)
return nil, false, a, nil
}
// Adding gauge histogram.
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend := a.appendableGauge(h)
if !okToAppend {
if appendOnly {
return nil, false, a, fmt.Errorf("float gauge histogram schema change")
}
newChunk := NewFloatHistogramChunk()
app, err := newChunk.Appender()
if err != nil {
panic(err) // This should never happen for an empty float histogram chunk.
}
happ := app.(*FloatHistogramAppender)
happ.setCounterResetHeader(GaugeType)
happ.appendFloatHistogram(t, h)
return newChunk, false, app, nil
}
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("float gauge histogram layout change with %d positive and %d negative backwards inserts", len(pBackwardInserts), len(nBackwardInserts))
}
h.PositiveSpans = pMergedSpans
h.NegativeSpans = nMergedSpans
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("float gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
}
chk, app := a.recode(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
app.(*FloatHistogramAppender).appendFloatHistogram(t, h)
return chk, true, app, nil
}
a.appendFloatHistogram(t, h)
return nil, false, a, nil
}
type floatHistogramIterator struct {
br bstreamReader
numTotal uint16
@ -657,7 +757,7 @@ func (it *floatHistogramIterator) Reset(b []byte) {
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
it.counterResetHeader = CounterResetHeader(b[2] & CounterResetHeaderMask)
it.t, it.tDelta = 0, 0
it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{}

View file

@ -53,7 +53,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
},
NegativeBuckets: []int64{2, 1, -1, -1}, // counts: 2, 3, 2, 1 (total 8)
}
app.AppendFloatHistogram(ts, h.ToFloat())
chk, _, app, err := app.AppendFloatHistogram(nil, ts, h.ToFloat(), false)
require.NoError(t, err)
require.Nil(t, chk)
exp = append(exp, floatResult{t: ts, h: h.ToFloat()})
require.Equal(t, 1, c.NumSamples())
@ -65,7 +67,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
h.Sum = 24.4
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
app.AppendFloatHistogram(ts, h.ToFloat())
chk, _, _, err = app.AppendFloatHistogram(nil, ts, h.ToFloat(), false)
require.NoError(t, err)
require.Nil(t, chk)
expH := h.ToFloat()
expH.CounterResetHint = histogram.NotCounterReset
exp = append(exp, floatResult{t: ts, h: expH})
@ -82,7 +86,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
h.Sum = 24.4
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
app.AppendFloatHistogram(ts, h.ToFloat())
chk, _, _, err = app.AppendFloatHistogram(nil, ts, h.ToFloat(), false)
require.NoError(t, err)
require.Nil(t, chk)
expH = h.ToFloat()
expH.CounterResetHint = histogram.NotCounterReset
exp = append(exp, floatResult{t: ts, h: expH})
@ -170,7 +176,9 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
NegativeBuckets: []int64{1},
}
app.AppendFloatHistogram(ts1, h1.ToFloat())
chk, _, app, err := app.AppendFloatHistogram(nil, ts1, h1.ToFloat(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
// Add a new histogram that has expanded buckets.
@ -196,14 +204,15 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3)
// This is how span changes will be handled.
hApp, _ := app.(*FloatHistogramAppender)
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat())
posInterjections, negInterjections, ok, cr := hApp.appendable(h2.ToFloat())
require.Greater(t, len(posInterjections), 0)
require.Greater(t, len(negInterjections), 0)
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
c, app = hApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
app.AppendFloatHistogram(ts2, h2.ToFloat())
c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
chk, _, _, err = app.AppendFloatHistogram(nil, ts2, h2.ToFloat(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 2, c.NumSamples())
// Because the 2nd histogram has expanded buckets, we should expect all
@ -230,49 +239,61 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
}
func TestFloatHistogramChunkAppendable(t *testing.T) {
c := Chunk(NewFloatHistogramChunk())
setup := func() (Chunk, *FloatHistogramAppender, int64, *histogram.FloatHistogram) {
c := Chunk(NewFloatHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
ts := int64(1234567890)
h1 := &histogram.FloatHistogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []float64{6, 3, 3, 2, 4, 5, 1},
ts := int64(1234567890)
h1 := &histogram.FloatHistogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []float64{6, 3, 3, 2, 4, 5, 1},
}
chk, _, app, err := app.AppendFloatHistogram(nil, ts, h1.Copy(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
require.Equal(t, UnknownCounterReset, c.(*FloatHistogramChunk).GetCounterResetHeader())
return c, app.(*FloatHistogramAppender), ts, h1
}
app.AppendFloatHistogram(ts, h1.Copy())
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*FloatHistogramAppender)
{ // Schema change.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.Schema++
_, _, ok, _ := hApp.Appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
}
{ // Zero threshold change.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.ZeroThreshold += 0.1
_, _, ok, _ := hApp.Appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
}
{ // New histogram that has more buckets.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
@ -285,14 +306,18 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2.Sum = 30
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Greater(t, len(posInterjections), 0)
require.Equal(t, 0, len(negInterjections))
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
assertRecodedFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
}
{ // New histogram that has a bucket missing.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
@ -303,26 +328,32 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2.Sum = 21
h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1}
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
{ // New histogram that has a counter reset while buckets are same.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.Sum = 23
h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1}
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
{ // New histogram that has a counter reset while new buckets were added.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
@ -333,14 +364,17 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2.Sum = 29
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
{
c, hApp, ts, h1 := setup()
// New histogram that has a counter reset while new buckets were
// added before the first bucket and reset on first bucket. (to
// catch the edge case where the new bucket should be forwarded
@ -357,14 +391,57 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2.Sum = 26
h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1}
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
}
func assertNewFloatHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *FloatHistogramAppender, ts int64, h *histogram.FloatHistogram, expectHeader CounterResetHeader) {
oldChunkBytes := oldChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, ts, h, false)
require.Equal(t, oldChunkBytes, oldChunk.Bytes()) // Sanity check that previous chunk is untouched.
require.NoError(t, err)
require.NotNil(t, newChunk)
require.False(t, recoded)
require.NotEqual(t, oldChunk, newChunk)
require.Equal(t, expectHeader, newChunk.(*FloatHistogramChunk).GetCounterResetHeader())
require.NotNil(t, newAppender)
require.NotEqual(t, hApp, newAppender)
assertSampleCount(t, newChunk, 1, ValFloatHistogram)
}
func assertNoNewFloatHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *FloatHistogramAppender, ts int64, h *histogram.FloatHistogram, expectHeader CounterResetHeader) {
oldChunkBytes := oldChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, ts, h, false)
require.NotEqual(t, oldChunkBytes, oldChunk.Bytes()) // Sanity check that previous chunk is untouched.
require.NoError(t, err)
require.Nil(t, newChunk)
require.False(t, recoded)
require.Equal(t, expectHeader, oldChunk.(*FloatHistogramChunk).GetCounterResetHeader())
require.NotNil(t, newAppender)
require.Equal(t, hApp, newAppender)
assertSampleCount(t, oldChunk, 2, ValFloatHistogram)
}
func assertRecodedFloatHistogramChunkOnAppend(t *testing.T, prevChunk Chunk, hApp *FloatHistogramAppender, ts int64, h *histogram.FloatHistogram, expectHeader CounterResetHeader) {
prevChunkBytes := prevChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, ts, h, false)
require.Equal(t, prevChunkBytes, prevChunk.Bytes()) // Sanity check that previous chunk is untouched. This may change in the future if we implement in-place recoding.
require.NoError(t, err)
require.NotNil(t, newChunk)
require.True(t, recoded)
require.NotEqual(t, prevChunk, newChunk)
require.Equal(t, expectHeader, newChunk.(*FloatHistogramChunk).GetCounterResetHeader())
require.NotNil(t, newAppender)
require.NotEqual(t, hApp, newAppender)
assertSampleCount(t, newChunk, 2, ValFloatHistogram)
}
func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) {
h1 := &histogram.FloatHistogram{
Schema: 0,
@ -412,11 +489,12 @@ func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
app.AppendFloatHistogram(1, h1)
_, _, _, err = app.AppendFloatHistogram(nil, 1, h1, true)
require.NoError(t, err)
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, okToAppend, counterReset := hApp.Appendable(h2)
pI, nI, okToAppend, counterReset := hApp.appendable(h2)
require.Empty(t, pI)
require.Empty(t, nI)
require.True(t, okToAppend)
@ -424,51 +502,62 @@ func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) {
}
func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
c := Chunk(NewFloatHistogramChunk())
setup := func() (Chunk, *FloatHistogramAppender, int64, *histogram.FloatHistogram) {
c := Chunk(NewFloatHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
ts := int64(1234567890)
h1 := &histogram.FloatHistogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []float64{6, 3, 3, 2, 4, 5, 1},
ts := int64(1234567890)
h1 := &histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []float64{6, 3, 3, 2, 4, 5, 1},
}
chk, _, app, err := app.AppendFloatHistogram(nil, ts, h1.Copy(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
require.Equal(t, GaugeType, c.(*FloatHistogramChunk).GetCounterResetHeader())
return c, app.(*FloatHistogramAppender), ts, h1
}
app.AppendFloatHistogram(ts, h1.Copy())
require.Equal(t, 1, c.NumSamples())
c.(*FloatHistogramChunk).SetCounterResetHeader(GaugeType)
{ // Schema change.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.Schema++
hApp, _ := app.(*FloatHistogramAppender)
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
_, _, _, _, _, _, ok := hApp.appendableGauge(h2)
require.False(t, ok)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, GaugeType)
}
{ // Zero threshold change.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.ZeroThreshold += 0.1
hApp, _ := app.(*FloatHistogramAppender)
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
_, _, _, _, _, _, ok := hApp.appendableGauge(h2)
require.False(t, ok)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, GaugeType)
}
{ // New histogram that has more buckets.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
@ -481,16 +570,18 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum = 30
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
assertRecodedFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, GaugeType)
}
{ // New histogram that has buckets missing.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
@ -503,16 +594,18 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum--
h2.PositiveBuckets = []float64{6, 3, 3, 2, 5, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Len(t, pI, 0)
require.Len(t, nI, 0)
require.Greater(t, len(pBackwardI), 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, GaugeType)
}
{ // New histogram that has a bucket missing and new buckets.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
@ -523,30 +616,34 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum = 21
h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Greater(t, len(pBackwardI), 0)
require.Len(t, nI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
assertRecodedFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, GaugeType)
}
{ // New histogram that has a counter reset while buckets are same.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.Sum = 23
h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Len(t, pI, 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, GaugeType)
}
{ // New histogram that has a counter reset while new buckets were added.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
@ -557,18 +654,20 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum = 29
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
assertRecodedFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, GaugeType)
}
{
// New histogram that has a counter reset while new buckets were
// added before the first bucket and reset on first bucket.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: -3, Length: 2},
@ -581,12 +680,13 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum = 26
h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
assertRecodedFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, GaugeType)
}
}

View file

@ -15,6 +15,7 @@ package chunkenc
import (
"encoding/binary"
"fmt"
"math"
"github.com/prometheus/prometheus/model/histogram"
@ -88,26 +89,13 @@ const (
UnknownCounterReset CounterResetHeader = 0b00000000
)
// setCounterResetHeader sets the counter reset header of the chunk
// The third byte of the chunk is the counter reset header.
func setCounterResetHeader(h CounterResetHeader, bytes []byte) {
switch h {
case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset:
bytes[2] = (bytes[2] & 0b00111111) | byte(h)
default:
panic("invalid CounterResetHeader type")
}
}
// SetCounterResetHeader sets the counter reset header.
func (c *HistogramChunk) SetCounterResetHeader(h CounterResetHeader) {
setCounterResetHeader(h, c.Bytes())
}
// CounterResetHeaderMask is the mask to get the counter reset header bits.
const CounterResetHeaderMask byte = 0b11000000
// GetCounterResetHeader returns the info about the first 2 bits of the chunk
// header.
func (c *HistogramChunk) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(c.Bytes()[2] & 0b11000000)
return CounterResetHeader(c.Bytes()[2] & CounterResetHeaderMask)
}
// Compact implements the Chunk interface.
@ -177,7 +165,7 @@ func newHistogramIterator(b []byte) *histogramIterator {
// The first 3 bytes contain chunk headers.
// We skip that for actual samples.
_, _ = it.br.readBits(24)
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
it.counterResetHeader = CounterResetHeader(b[2] & CounterResetHeaderMask)
return it
}
@ -224,7 +212,11 @@ type HistogramAppender struct {
}
func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(a.b.bytes()[2] & 0b11000000)
return CounterResetHeader(a.b.bytes()[2] & CounterResetHeaderMask)
}
func (a *HistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
a.b.bytes()[2] = (a.b.bytes()[2] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
}
func (a *HistogramAppender) NumSamples() int {
@ -237,13 +229,7 @@ func (a *HistogramAppender) Append(int64, float64) {
panic("appended a float sample to a histogram chunk")
}
// AppendFloatHistogram implements Appender. This implementation panics because float
// histogram samples must never be appended to a histogram chunk.
func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogram) {
panic("appended a float histogram to a histogram chunk")
}
// Appendable returns whether the chunk can be appended to, and if so whether
// appendable returns whether the chunk can be appended to, and if so whether
// any recoding needs to happen using the provided inserts (in case of any new
// buckets, positive or negative range, respectively). If the sample is a gauge
// histogram, AppendableGauge must be used instead.
@ -260,7 +246,7 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra
// The method returns an additional boolean set to true if it is not appendable
// because of a counter reset. If the given sample is stale, it is always ok to
// append. If counterReset is true, okToAppend is always false.
func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
func (a *HistogramAppender) appendable(h *histogram.Histogram) (
positiveInserts, negativeInserts []Insert,
okToAppend, counterReset bool,
) {
@ -316,7 +302,7 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
return
}
// AppendableGauge returns whether the chunk can be appended to, and if so
// appendableGauge returns whether the chunk can be appended to, and if so
// whether:
// 1. Any recoding needs to happen to the chunk using the provided inserts
// (in case of any new buckets, positive or negative range, respectively).
@ -330,7 +316,7 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
// - The schema has changed.
// - The threshold for the zero bucket has changed.
// - The last sample in the chunk was stale while the current sample is not stale.
func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) (
func (a *HistogramAppender) appendableGauge(h *histogram.Histogram) (
positiveInserts, negativeInserts []Insert,
backwardPositiveInserts, backwardNegativeInserts []Insert,
positiveSpans, negativeSpans []histogram.Span,
@ -427,11 +413,11 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans
return false
}
// AppendHistogram appends a histogram to the chunk. The caller must ensure that
// appendHistogram appends a histogram to the chunk. The caller must ensure that
// the histogram is properly structured, e.g. the number of buckets used
// corresponds to the number conveyed by the span structures. First call
// Appendable() and act accordingly!
func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) {
func (a *HistogramAppender) appendHistogram(t int64, h *histogram.Histogram) {
var tDelta, cntDelta, zCntDelta int64
num := binary.BigEndian.Uint16(a.b.bytes())
@ -540,12 +526,12 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) {
a.sum = h.Sum
}
// Recode converts the current chunk to accommodate an expansion of the set of
// recode converts the current chunk to accommodate an expansion of the set of
// (positive and/or negative) buckets used, according to the provided inserts,
// resulting in the honoring of the provided new positive and negative spans. To
// continue appending, use the returned Appender rather than the receiver of
// this method.
func (a *HistogramAppender) Recode(
func (a *HistogramAppender) recode(
positiveInserts, negativeInserts []Insert,
positiveSpans, negativeSpans []histogram.Span,
) (Chunk, Appender) {
@ -558,8 +544,9 @@ func (a *HistogramAppender) Recode(
hc := NewHistogramChunk()
app, err := hc.Appender()
if err != nil {
panic(err)
panic(err) // This should never happen for an empty histogram chunk.
}
happ := app.(*HistogramAppender)
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans)
for it.Next() == ValHistogram {
@ -585,16 +572,16 @@ func (a *HistogramAppender) Recode(
if len(negativeInserts) > 0 {
hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, true)
}
app.AppendHistogram(tOld, hOld)
happ.appendHistogram(tOld, hOld)
}
hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000))
happ.setCounterResetHeader(CounterResetHeader(byts[2] & CounterResetHeaderMask))
return hc, app
}
// RecodeHistogram converts the current histogram (in-place) to accommodate an
// recodeHistogram converts the current histogram (in-place) to accommodate an
// expansion of the set of (positive and/or negative) buckets used.
func (a *HistogramAppender) RecodeHistogram(
func (a *HistogramAppender) recodeHistogram(
h *histogram.Histogram,
pBackwardInserts, nBackwardInserts []Insert,
) {
@ -612,6 +599,124 @@ func (a *HistogramAppender) writeSumDelta(v float64) {
xorWrite(a.b, v, a.sum, &a.leading, &a.trailing)
}
func (a *HistogramAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
panic("appended a float histogram sample to a histogram chunk")
}
func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) {
if a.NumSamples() == 0 {
a.appendHistogram(t, h)
if h.CounterResetHint == histogram.GaugeType {
a.setCounterResetHeader(GaugeType)
return nil, false, a, nil
}
if prev != nil && h.CounterResetHint != histogram.CounterReset {
// This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set.
_, _, _, counterReset := prev.appendable(h)
if counterReset {
a.setCounterResetHeader(CounterReset)
} else {
a.setCounterResetHeader(NotCounterReset)
}
} else {
// Honor the explicit counter reset hint.
a.setCounterResetHeader(CounterResetHeader(h.CounterResetHint))
}
return nil, false, a, nil
}
// Adding counter-like histogram.
if h.CounterResetHint != histogram.GaugeType {
pForwardInserts, nForwardInserts, okToAppend, counterReset := a.appendable(h)
if !okToAppend || counterReset {
if appendOnly {
if !okToAppend {
return nil, false, a, fmt.Errorf("histogram schema change")
}
return nil, false, a, fmt.Errorf("histogram counter reset")
}
newChunk := NewHistogramChunk()
app, err := newChunk.Appender()
if err != nil {
panic(err) // This should never happen for an empty histogram chunk.
}
happ := app.(*HistogramAppender)
if counterReset {
happ.setCounterResetHeader(CounterReset)
}
happ.appendHistogram(t, h)
return newChunk, false, app, nil
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
}
chk, app := a.recode(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
app.(*HistogramAppender).appendHistogram(t, h)
return chk, true, app, nil
}
a.appendHistogram(t, h)
return nil, false, a, nil
}
// Adding gauge histogram.
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend := a.appendableGauge(h)
if !okToAppend {
if appendOnly {
return nil, false, a, fmt.Errorf("gauge histogram schema change")
}
newChunk := NewHistogramChunk()
app, err := newChunk.Appender()
if err != nil {
panic(err) // This should never happen for an empty histogram chunk.
}
happ := app.(*HistogramAppender)
happ.setCounterResetHeader(GaugeType)
happ.appendHistogram(t, h)
return newChunk, false, app, nil
}
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative backwards inserts", len(pBackwardInserts), len(nBackwardInserts))
}
h.PositiveSpans = pMergedSpans
h.NegativeSpans = nMergedSpans
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
}
chk, app := a.recode(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
app.(*HistogramAppender).appendHistogram(t, h)
return chk, true, app, nil
}
a.appendHistogram(t, h)
return nil, false, a, nil
}
func CounterResetHintToHeader(hint histogram.CounterResetHint) CounterResetHeader {
switch hint {
case histogram.CounterReset:
return CounterReset
case histogram.NotCounterReset:
return NotCounterReset
case histogram.GaugeType:
return GaugeType
default:
return UnknownCounterReset
}
}
type histogramIterator struct {
br bstreamReader
numTotal uint16
@ -715,7 +820,7 @@ func (it *histogramIterator) Reset(b []byte) {
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
it.counterResetHeader = CounterResetHeader(b[2] & CounterResetHeaderMask)
it.t, it.cnt, it.zCnt = 0, 0, 0
it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0

View file

@ -14,7 +14,6 @@
package chunkenc
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
@ -55,7 +54,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
},
NegativeBuckets: []int64{2, 1, -1, -1}, // counts: 2, 3, 2, 1 (total 8)
}
app.AppendHistogram(ts, h)
chk, _, app, err := app.AppendHistogram(nil, ts, h, false)
require.NoError(t, err)
require.Nil(t, chk)
exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()})
require.Equal(t, 1, c.NumSamples())
@ -67,7 +68,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
h.Sum = 24.4
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
app.AppendHistogram(ts, h)
chk, _, _, err = app.AppendHistogram(nil, ts, h, false)
require.NoError(t, err)
require.Nil(t, chk)
hExp := h.Copy()
hExp.CounterResetHint = histogram.NotCounterReset
exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()})
@ -84,7 +87,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
h.Sum = 24.4
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
app.AppendHistogram(ts, h)
chk, _, _, err = app.AppendHistogram(nil, ts, h, false)
require.NoError(t, err)
require.Nil(t, chk)
hExp = h.Copy()
hExp.CounterResetHint = histogram.NotCounterReset
exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()})
@ -182,7 +187,9 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
NegativeBuckets: []int64{1},
}
app.AppendHistogram(ts1, h1)
chk, _, app, err := app.AppendHistogram(nil, ts1, h1, false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
// Add a new histogram that has expanded buckets.
@ -208,13 +215,15 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3)
// This is how span changes will be handled.
hApp, _ := app.(*HistogramAppender)
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Greater(t, len(posInterjections), 0)
require.Greater(t, len(negInterjections), 0)
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
c, app = hApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
app.AppendHistogram(ts2, h2)
c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
chk, _, _, err = app.AppendHistogram(nil, ts2, h2, false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 2, c.NumSamples())
@ -244,49 +253,61 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
}
func TestHistogramChunkAppendable(t *testing.T) {
c := Chunk(NewHistogramChunk())
setup := func() (Chunk, *HistogramAppender, int64, *histogram.Histogram) {
c := Chunk(NewHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
ts := int64(1234567890)
h1 := &histogram.Histogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
ts := int64(1234567890)
h1 := &histogram.Histogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
}
chk, _, app, err := app.AppendHistogram(nil, ts, h1.Copy(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
require.Equal(t, UnknownCounterReset, c.(*HistogramChunk).GetCounterResetHeader())
return c, app.(*HistogramAppender), ts, h1
}
app.AppendHistogram(ts, h1.Copy())
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*HistogramAppender)
{ // Schema change.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.Schema++
_, _, ok, _ := hApp.Appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
}
{ // Zero threshold change.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.ZeroThreshold += 0.1
_, _, ok, _ := hApp.Appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
}
{ // New histogram that has more buckets.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
@ -302,14 +323,17 @@ func TestHistogramChunkAppendable(t *testing.T) {
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Greater(t, len(posInterjections), 0)
require.Equal(t, 0, len(negInterjections))
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
assertRecodedHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
}
{ // New histogram that has a bucket missing.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
@ -320,26 +344,32 @@ func TestHistogramChunkAppendable(t *testing.T) {
h2.Sum = 21
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
{ // New histogram that has a counter reset while buckets are same.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.Sum = 23
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
{ // New histogram that has a counter reset while new buckets were added.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
@ -353,11 +383,13 @@ func TestHistogramChunkAppendable(t *testing.T) {
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
{
@ -365,6 +397,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
// added before the first bucket and reset on first bucket. (to
// catch the edge case where the new bucket should be forwarded
// ahead until first old bucket at start)
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: -3, Length: 2},
@ -380,14 +413,55 @@ func TestHistogramChunkAppendable(t *testing.T) {
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
}
func assertNewHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *HistogramAppender, ts int64, h *histogram.Histogram, expectHeader CounterResetHeader) {
oldChunkBytes := oldChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendHistogram(nil, ts, h, false)
require.Equal(t, oldChunkBytes, oldChunk.Bytes()) // Sanity check that previous chunk is untouched.
require.NoError(t, err)
require.NotNil(t, newChunk)
require.False(t, recoded)
require.NotEqual(t, oldChunk, newChunk)
require.Equal(t, expectHeader, newChunk.(*HistogramChunk).GetCounterResetHeader())
require.NotNil(t, newAppender)
require.NotEqual(t, hApp, newAppender)
assertSampleCount(t, newChunk, 1, ValHistogram)
}
func assertRecodedHistogramChunkOnAppend(t *testing.T, prevChunk Chunk, hApp *HistogramAppender, ts int64, h *histogram.Histogram, expectHeader CounterResetHeader) {
prevChunkBytes := prevChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendHistogram(nil, ts, h, false)
require.Equal(t, prevChunkBytes, prevChunk.Bytes()) // Sanity check that previous chunk is untouched. This may change in the future if we implement in-place recoding.
require.NoError(t, err)
require.NotNil(t, newChunk)
require.True(t, recoded)
require.NotEqual(t, prevChunk, newChunk)
require.Equal(t, expectHeader, newChunk.(*HistogramChunk).GetCounterResetHeader())
require.NotNil(t, newAppender)
require.NotEqual(t, hApp, newAppender)
assertSampleCount(t, newChunk, 2, ValHistogram)
}
func assertSampleCount(t *testing.T, c Chunk, exp int64, vtype ValueType) {
count := int64(0)
it := c.Iterator(nil)
require.NoError(t, it.Err())
for it.Next() == vtype {
count++
}
require.NoError(t, it.Err())
require.Equal(t, exp, count)
}
func TestHistogramChunkAppendableWithEmptySpan(t *testing.T) {
h1 := &histogram.Histogram{
Schema: 0,
@ -435,11 +509,12 @@ func TestHistogramChunkAppendableWithEmptySpan(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
app.AppendHistogram(1, h1)
_, _, _, err = app.AppendHistogram(nil, 1, h1, true)
require.NoError(t, err)
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*HistogramAppender)
pI, nI, okToAppend, counterReset := hApp.Appendable(h2)
pI, nI, okToAppend, counterReset := hApp.appendable(h2)
require.Empty(t, pI)
require.Empty(t, nI)
require.True(t, okToAppend)
@ -573,11 +648,9 @@ func TestAtFloatHistogram(t *testing.T) {
app, err := chk.Appender()
require.NoError(t, err)
for i := range input {
if i > 0 {
_, _, okToAppend, _ := app.(*HistogramAppender).Appendable(&input[i])
require.True(t, okToAppend, fmt.Sprintf("idx: %d", i))
}
app.AppendHistogram(int64(i), &input[i])
newc, _, _, err := app.AppendHistogram(nil, int64(i), &input[i], false)
require.NoError(t, err)
require.Nil(t, newc)
}
it := chk.Iterator(nil)
i := int64(0)
@ -590,51 +663,75 @@ func TestAtFloatHistogram(t *testing.T) {
}
func TestHistogramChunkAppendableGauge(t *testing.T) {
c := Chunk(NewHistogramChunk())
setup := func() (Chunk, *HistogramAppender, int64, *histogram.Histogram) {
c := Chunk(NewHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
ts := int64(1234567890)
h1 := &histogram.Histogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // {6, 3, 3, 2, 4, 5, 1}
ts := int64(1234567890)
h1 := &histogram.Histogram{
CounterResetHint: histogram.GaugeType,
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // {6, 3, 3, 2, 4, 5, 1}
}
chk, _, app, err := app.AppendHistogram(nil, ts, h1.Copy(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
return c, app.(*HistogramAppender), ts, h1
}
app.AppendHistogram(ts, h1.Copy())
require.Equal(t, 1, c.NumSamples())
c.(*HistogramChunk).SetCounterResetHeader(GaugeType)
{ // Schema change.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.Schema++
hApp, _ := app.(*HistogramAppender)
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
_, _, _, _, _, _, ok := hApp.appendableGauge(h2)
require.False(t, ok)
newc, recoded, _, err := hApp.AppendHistogram(nil, ts+1, h2, false)
require.NoError(t, err)
require.NotNil(t, newc)
require.False(t, recoded)
require.NotEqual(t, c, newc)
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
require.Equal(t, GaugeType, newc.(*HistogramChunk).GetCounterResetHeader())
}
{ // Zero threshold change.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.ZeroThreshold += 0.1
hApp, _ := app.(*HistogramAppender)
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
_, _, _, _, _, _, ok := hApp.appendableGauge(h2)
require.False(t, ok)
newc, recoded, _, err := hApp.AppendHistogram(nil, ts+1, h2, false)
require.NoError(t, err)
require.NotNil(t, newc)
require.False(t, recoded)
require.NotEqual(t, c, newc)
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
require.Equal(t, GaugeType, newc.(*HistogramChunk).GetCounterResetHeader())
}
{ // New histogram that has more buckets.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
@ -647,16 +744,22 @@ func TestHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum = 30
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // {7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
newc, recoded, _, err := hApp.AppendHistogram(nil, ts+1, h2, false)
require.NoError(t, err)
require.NotNil(t, newc)
require.True(t, recoded)
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
}
{ // New histogram that has buckets missing.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
@ -669,16 +772,22 @@ func TestHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum--
h2.PositiveBuckets = []int64{6, -3, 0, -1, 3, -4} // {6, 3, 3, 2, 5, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Len(t, pI, 0)
require.Len(t, nI, 0)
require.Greater(t, len(pBackwardI), 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
newc, recoded, _, err := hApp.AppendHistogram(nil, ts+1, h2, false)
require.NoError(t, err)
require.Nil(t, newc)
require.False(t, recoded)
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
}
{ // New histogram that has a bucket missing and new buckets.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
@ -689,30 +798,42 @@ func TestHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum = 21
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // {6, 3, 2, 4, 5, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Greater(t, len(pBackwardI), 0)
require.Len(t, nI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
newc, recoded, _, err := hApp.AppendHistogram(nil, ts+1, h2, false)
require.NoError(t, err)
require.NotNil(t, newc)
require.True(t, recoded)
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
}
{ // New histogram that has a counter reset while buckets are same.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.Sum = 23
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // {6, 2, 3, 2, 4, 5, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Len(t, pI, 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
newc, recoded, _, err := hApp.AppendHistogram(nil, ts+1, h2, false)
require.NoError(t, err)
require.Nil(t, newc)
require.False(t, recoded)
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
}
{ // New histogram that has a counter reset while new buckets were added.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
@ -723,18 +844,24 @@ func TestHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum = 29
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // {7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
newc, recoded, _, err := hApp.AppendHistogram(nil, ts+1, h2, false)
require.NoError(t, err)
require.NotNil(t, newc)
require.True(t, recoded)
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
}
{
// New histogram that has a counter reset while new buckets were
// added before the first bucket and reset on first bucket.
c, hApp, ts, h1 := setup()
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: -3, Length: 2},
@ -747,12 +874,17 @@ func TestHistogramChunkAppendableGauge(t *testing.T) {
h2.Sum = 26
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // {1, 2, 5, 3, 3, 2, 4, 5, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.appendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
newc, recoded, _, err := hApp.AppendHistogram(nil, ts+1, h2, false)
require.NoError(t, err)
require.NotNil(t, newc)
require.True(t, recoded)
require.Equal(t, GaugeType, c.(*HistogramChunk).GetCounterResetHeader())
}
}

View file

@ -152,14 +152,6 @@ type xorAppender struct {
trailing uint8
}
func (a *xorAppender) AppendHistogram(int64, *histogram.Histogram) {
panic("appended a histogram to an xor chunk")
}
func (a *xorAppender) AppendFloatHistogram(int64, *histogram.FloatHistogram) {
panic("appended a float histogram to an xor chunk")
}
func (a *xorAppender) Append(t int64, v float64) {
var tDelta uint64
num := binary.BigEndian.Uint16(a.b.bytes())
@ -228,6 +220,14 @@ func (a *xorAppender) writeVDelta(v float64) {
xorWrite(a.b, v, a.v, &a.leading, &a.trailing)
}
func (a *xorAppender) AppendHistogram(*HistogramAppender, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
panic("appended a histogram sample to a float chunk")
}
func (a *xorAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
panic("appended a float histogram sample to a float chunk")
}
type xorIterator struct {
br bstreamReader
numTotal uint16

View file

@ -1155,87 +1155,32 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa
// appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable from appender before
// appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the
// meta properly.
app, _ := s.app.(*chunkenc.HistogramAppender)
var (
pForwardInserts, nForwardInserts []chunkenc.Insert
pBackwardInserts, nBackwardInserts []chunkenc.Insert
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset, gauge bool
)
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
switch h.CounterResetHint {
case histogram.GaugeType:
gauge = true
if app != nil {
pForwardInserts, nForwardInserts,
pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans,
okToAppend = app.AppendableGauge(h)
}
case histogram.CounterReset:
// The caller tells us this is a counter reset, even if it
// doesn't look like one.
counterReset = true
default:
if app != nil {
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h)
}
}
var (
newChunk chunkenc.Chunk
recoded bool
)
if !chunkCreated {
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
h.PositiveSpans = pMergedSpans
h.NegativeSpans = nMergedSpans
app.RecodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
// We have 3 cases here
// - !okToAppend or counterReset -> We need to cut a new chunk.
// - okToAppend but we have inserts → Existing chunk needs
// recoding before we can append our histogram.
// - okToAppend and no inserts → Chunk is ready to support our histogram.
switch {
case !okToAppend || counterReset:
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
case len(pForwardInserts) > 0 || len(nForwardInserts) > 0:
// New buckets have appeared. We need to recode all
// prior histogram samples within the chunk before we
// can process this one.
chunk, app := app.Recode(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
c.chunk = chunk
s.app = app
}
// Ignore the previous appender if we continue the current chunk.
prevApp = nil
}
if chunkCreated {
hc := s.headChunk.chunk.(*chunkenc.HistogramChunk)
header := chunkenc.UnknownCounterReset
switch {
case gauge:
header = chunkenc.GaugeType
case counterReset:
header = chunkenc.CounterReset
case okToAppend:
header = chunkenc.NotCounterReset
}
hc.SetCounterResetHeader(header)
}
s.app.AppendHistogram(t, h)
c.maxTime = t
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, t, h, false) // false=request a new chunk if needed
s.lastHistogramValue = h
s.lastFloatHistogramValue = nil
@ -1244,101 +1189,84 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
s.txs.add(appendID)
}
return true, chunkCreated
if newChunk == nil { // Sample was appended to existing chunk or is the first sample in a new chunk.
c.maxTime = t
return true, chunkCreated
}
if recoded { // The appender needed to recode the chunk.
c.maxTime = t
c.chunk = newChunk
return true, false
}
// This is a brand new chunk, switch out the head chunk (based on cutNewHeadChunk).
s.mmapCurrentHeadChunk(o.chunkDiskMapper)
s.headChunk = &memChunk{
chunk: newChunk,
minTime: t,
maxTime: t,
}
s.nextAt = rangeForTimestamp(t, o.chunkRange)
return true, true
}
// appendFloatHistogram adds the float histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable from appender before
// appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the
// meta properly.
app, _ := s.app.(*chunkenc.FloatHistogramAppender)
var (
pForwardInserts, nForwardInserts []chunkenc.Insert
pBackwardInserts, nBackwardInserts []chunkenc.Insert
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset, gauge bool
)
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
switch fh.CounterResetHint {
case histogram.GaugeType:
gauge = true
if app != nil {
pForwardInserts, nForwardInserts,
pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans,
okToAppend = app.AppendableGauge(fh)
}
case histogram.CounterReset:
// The caller tells us this is a counter reset, even if it
// doesn't look like one.
counterReset = true
default:
if app != nil {
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh)
}
}
var (
newChunk chunkenc.Chunk
recoded bool
)
if !chunkCreated {
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
fh.PositiveSpans = pMergedSpans
fh.NegativeSpans = nMergedSpans
app.RecodeHistogramm(fh, pBackwardInserts, nBackwardInserts)
}
// We have 3 cases here
// - !okToAppend or counterReset -> We need to cut a new chunk.
// - okToAppend but we have inserts → Existing chunk needs
// recoding before we can append our histogram.
// - okToAppend and no inserts → Chunk is ready to support our histogram.
switch {
case !okToAppend || counterReset:
c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
case len(pForwardInserts) > 0 || len(nForwardInserts) > 0:
// New buckets have appeared. We need to recode all
// prior histogram samples within the chunk before we
// can process this one.
chunk, app := app.Recode(
pForwardInserts, nForwardInserts,
fh.PositiveSpans, fh.NegativeSpans,
)
c.chunk = chunk
s.app = app
}
// Ignore the previous appender if we continue the current chunk.
prevApp = nil
}
if chunkCreated {
hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk)
header := chunkenc.UnknownCounterReset
switch {
case gauge:
header = chunkenc.GaugeType
case counterReset:
header = chunkenc.CounterReset
case okToAppend:
header = chunkenc.NotCounterReset
}
hc.SetCounterResetHeader(header)
}
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, t, fh, false) // False means request a new chunk if needed.
s.app.AppendFloatHistogram(t, fh)
c.maxTime = t
s.lastFloatHistogramValue = fh
s.lastHistogramValue = nil
s.lastFloatHistogramValue = fh
if appendID > 0 {
s.txs.add(appendID)
}
return true, chunkCreated
if newChunk == nil { // Sample was appended to existing chunk or is the first sample in a new chunk.
c.maxTime = t
return true, chunkCreated
}
if recoded { // The appender needed to recode the chunk.
c.maxTime = t
c.chunk = newChunk
return true, false
}
// This is a brand new chunk, switch out the head chunk (based on cutNewHeadChunk).
s.mmapCurrentHeadChunk(o.chunkDiskMapper)
s.headChunk = &memChunk{
chunk: newChunk,
minTime: t,
maxTime: t,
}
s.nextAt = rangeForTimestamp(t, o.chunkRange)
return true, true
}
// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks.

View file

@ -3841,7 +3841,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Add 2 non-counter reset histograms.
// Add 2 non-counter reset histogram chunks.
for i := 0; i < 250; i++ {
appendHistogram(h)
}

View file

@ -821,74 +821,17 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
break
}
switch hc := p.currChkMeta.Chunk.(type) {
case *chunkenc.HistogramChunk:
newChunk.(*chunkenc.HistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
case *safeHeadChunk:
if unwrapped, ok := hc.Chunk.(*chunkenc.HistogramChunk); ok {
newChunk.(*chunkenc.HistogramChunk).SetCounterResetHeader(unwrapped.GetCounterResetHeader())
} else {
err = fmt.Errorf("internal error, could not unwrap safeHeadChunk to histogram chunk: %T", hc.Chunk)
}
default:
err = fmt.Errorf("internal error, unknown chunk type %T when expecting histogram", p.currChkMeta.Chunk)
}
if err != nil {
break
}
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram()
p.curr.MinTime = t
// Detect missing gauge reset hint.
if h.CounterResetHint == histogram.GaugeType && newChunk.(*chunkenc.HistogramChunk).GetCounterResetHeader() != chunkenc.GaugeType {
err = fmt.Errorf("found gauge histogram in non gauge chunk")
break
}
app.AppendHistogram(t, h)
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
}
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram()
// Defend against corrupted chunks.
if h.CounterResetHint == histogram.GaugeType {
pI, nI, bpI, bnI, _, _, okToAppend := app.(*chunkenc.HistogramAppender).AppendableGauge(h)
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
if len(pI)+len(nI)+len(bpI)+len(bnI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: forward %d positive, %d negative, backward %d positive %d negative bucket interjections required",
len(pI), len(nI), len(bpI), len(bnI),
)
break
}
} else {
pI, nI, okToAppend, counterReset := app.(*chunkenc.HistogramAppender).Appendable(h)
if len(pI)+len(nI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
len(pI), len(nI),
)
break
}
if counterReset {
err = errors.New("detected unexpected counter reset in histogram")
break
}
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
_, _, app, err = app.AppendHistogram(nil, t, h, true)
if err != nil {
break
}
app.AppendHistogram(t, h)
}
case chunkenc.ValFloat:
newChunk = chunkenc.NewXORChunk()
@ -913,75 +856,17 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
break
}
switch hc := p.currChkMeta.Chunk.(type) {
case *chunkenc.FloatHistogramChunk:
newChunk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
case *safeHeadChunk:
if unwrapped, ok := hc.Chunk.(*chunkenc.FloatHistogramChunk); ok {
newChunk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(unwrapped.GetCounterResetHeader())
} else {
err = fmt.Errorf("internal error, could not unwrap safeHeadChunk to float histogram chunk: %T", hc.Chunk)
}
default:
err = fmt.Errorf("internal error, unknown chunk type %T when expecting float histogram", p.currChkMeta.Chunk)
}
if err != nil {
break
}
var h *histogram.FloatHistogram
t, h = p.currDelIter.AtFloatHistogram()
p.curr.MinTime = t
// Detect missing gauge reset hint.
if h.CounterResetHint == histogram.GaugeType && newChunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader() != chunkenc.GaugeType {
err = fmt.Errorf("found float gauge histogram in non gauge chunk")
break
}
app.AppendFloatHistogram(t, h)
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloatHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
}
var h *histogram.FloatHistogram
t, h = p.currDelIter.AtFloatHistogram()
// Defend against corrupted chunks.
if h.CounterResetHint == histogram.GaugeType {
pI, nI, bpI, bnI, _, _, okToAppend := app.(*chunkenc.FloatHistogramAppender).AppendableGauge(h)
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
if len(pI)+len(nI)+len(bpI)+len(bnI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: forward %d positive, %d negative, backward %d positive %d negative bucket interjections required",
len(pI), len(nI), len(bpI), len(bnI),
)
break
}
} else {
pI, nI, okToAppend, counterReset := app.(*chunkenc.FloatHistogramAppender).Appendable(h)
if len(pI)+len(nI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
len(pI), len(nI),
)
break
}
if counterReset {
err = errors.New("detected unexpected counter reset in histogram")
break
}
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
_, _, app, err = app.AppendFloatHistogram(nil, t, h, true)
if err != nil {
break
}
app.AppendFloatHistogram(t, h)
}
default:
err = fmt.Errorf("populateWithDelChunkSeriesIterator: value type %v unsupported", valueType)

View file

@ -46,6 +46,7 @@ func ChunkFromSamples(s []Sample) (chunks.Meta, error) {
// ChunkFromSamplesGeneric requires all samples to have the same type.
func ChunkFromSamplesGeneric(s Samples) (chunks.Meta, error) {
emptyChunk := chunks.Meta{Chunk: chunkenc.NewXORChunk()}
mint, maxt := int64(0), int64(0)
if s.Len() > 0 {
@ -53,9 +54,7 @@ func ChunkFromSamplesGeneric(s Samples) (chunks.Meta, error) {
}
if s.Len() == 0 {
return chunks.Meta{
Chunk: chunkenc.NewXORChunk(),
}, nil
return emptyChunk, nil
}
sampleType := s.Get(0).Type()
@ -65,24 +64,27 @@ func ChunkFromSamplesGeneric(s Samples) (chunks.Meta, error) {
}
ca, _ := c.Appender()
var newChunk chunkenc.Chunk
for i := 0; i < s.Len(); i++ {
switch sampleType {
case chunkenc.ValFloat:
ca.Append(s.Get(i).T(), s.Get(i).F())
case chunkenc.ValHistogram:
h := s.Get(i).H()
ca.AppendHistogram(s.Get(i).T(), h)
if i == 0 && h.CounterResetHint == histogram.GaugeType {
hc := c.(*chunkenc.HistogramChunk)
hc.SetCounterResetHeader(chunkenc.GaugeType)
newChunk, _, ca, err = ca.AppendHistogram(nil, s.Get(i).T(), s.Get(i).H(), false)
if err != nil {
return emptyChunk, err
}
if newChunk != nil {
return emptyChunk, fmt.Errorf("did not expect to start a second chunk")
}
case chunkenc.ValFloatHistogram:
fh := s.Get(i).FH()
ca.AppendFloatHistogram(s.Get(i).T(), fh)
if i == 0 && fh.CounterResetHint == histogram.GaugeType {
hc := c.(*chunkenc.FloatHistogramChunk)
hc.SetCounterResetHeader(chunkenc.GaugeType)
newChunk, _, ca, err = ca.AppendFloatHistogram(nil, s.Get(i).T(), s.Get(i).FH(), false)
if err != nil {
return emptyChunk, err
}
if newChunk != nil {
return emptyChunk, fmt.Errorf("did not expect to start a second chunk")
}
default:
panic(fmt.Sprintf("unknown sample type %s", sampleType.String()))