Attempt for record type

This commit is contained in:
Carrie Edwards 2024-11-13 14:20:11 -08:00
parent cfcd51538d
commit 37df50adb9
14 changed files with 660 additions and 84 deletions

View file

@ -463,7 +463,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return
}
decoded <- samples
case record.HistogramSamples:
case record.HistogramSamples, record.CustomBucketHistogramSamples:
histograms := histogramsPool.Get()[:0]
histograms, err = dec.HistogramSamples(rec, histograms)
if err != nil {
@ -475,7 +475,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return
}
decoded <- histograms
case record.FloatHistogramSamples:
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
floatHistograms := floatHistogramsPool.Get()[:0]
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
if err != nil {
@ -1154,19 +1154,40 @@ func (a *appender) log() error {
}
if len(a.pendingHistograms) > 0 {
buf = encoder.HistogramSamples(a.pendingHistograms, buf)
if err := a.wal.Log(buf); err != nil {
return err
buf1, buf2 := encoder.HistogramSamples(a.pendingHistograms, buf)
//buf = append(buf1, buf2...)
//if err := a.wal.Log(buf); err != nil {
// return err
//}
if len(buf1) > 0 {
buf = buf1[:0]
if err := a.wal.Log(buf1); err != nil {
return err
}
}
if len(buf2) > 0 {
buf = buf2[:0]
if err := a.wal.Log(buf2); err != nil {
return err
}
}
buf = buf[:0]
}
if len(a.pendingFloatHistograms) > 0 {
buf = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
if err := a.wal.Log(buf); err != nil {
return err
buf1, buf2 := encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
if len(buf1) > 0 {
buf = buf1[:0]
if err := a.wal.Log(buf1); err != nil {
return err
}
}
buf = buf[:0]
if len(buf2) > 0 {
buf = buf2[:0]
if err := a.wal.Log(buf2); err != nil {
return err
}
}
//buf = buf[:0]
}
if len(a.pendingExamplars) > 0 {

View file

@ -193,7 +193,8 @@ func TestCommit(t *testing.T) {
)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
recType := dec.Type(rec)
switch recType {
case record.Series:
var series []record.RefSeries
series, err = dec.Series(rec, series)
@ -206,13 +207,13 @@ func TestCommit(t *testing.T) {
require.NoError(t, err)
walSamplesCount += len(samples)
case record.HistogramSamples:
case record.HistogramSamples, record.CustomBucketHistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples:
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)

View file

@ -79,6 +79,32 @@ The first sample record begins at the second row.
└──────────────────────────────────────────────────────────────────┘
```
### Native histogram records
Native histogram records are encoded as
```
┌──────────────────────────────────────────────────────────────────┐
│ type = 2 <1b>
├──────────────────────────────────────────────────────────────────┤
│ ┌────────────────────┬───────────────────────────┐ │
│ │ id <8b> │ timestamp <8b> │ │
│ └────────────────────┴───────────────────────────┘ │
│ ┌────────────────────┬───────────────────────────┬ │
│ │ id_delta <uvarint> │ timestamp_delta <uvarint> │ │
│ ├────────────────────┴───────────────────────────┴─────────────┤ │
│ │ n = len(labels) <uvarint> │ │
│ ├──────────────────────┬───────────────────────────────────────┤ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├──────────────────────┴───────────────────────────────────────┤ │
│ │ ... │ │
│ ├───────────────────────┬──────────────────────────────────────┤ │
│ │ len(str_2n) <uvarint> │ str_2n <bytes> │ │ │
│ └───────────────────────┴────────────────┴─────────────────────┘ │
│ . . . │
└──────────────────────────────────────────────────────────────────┘
```
### Tombstone records
Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)`

View file

@ -104,14 +104,6 @@ func (e *Encbuf) PutHashSum(h hash.Hash) {
e.B = h.Sum(e.B)
}
// IsWholeWhenMultiplied checks to see if the number when multiplied by 1000 can
// be converted into an integer without losing precision.
func IsWholeWhenMultiplied(in float64) bool {
i := uint(math.Round(in * 1000))
out := float64(i) / 1000
return in == out
}
// Decbuf provides safe methods to extract data from a byte slice. It does all
// necessary bounds checking and advancing of the byte slice.
// Several datums can be extracted without checking for errors. However, before using

View file

@ -942,17 +942,47 @@ func (a *headAppender) log() error {
}
}
if len(a.histograms) > 0 {
rec = enc.HistogramSamples(a.histograms, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log histograms: %w", err)
rec1, rec2 := enc.HistogramSamples(a.histograms, buf)
//rec = append(rec1, rec2...)
//
//buf = rec[:0]
//
//if err := a.head.wal.Log(rec); err != nil {
// return fmt.Errorf("log samples: %w", err)
//}
if len(rec1) != 0 {
buf = rec1[:0]
if err := a.head.wal.Log(rec1); err != nil {
return fmt.Errorf("log histograms: %w", err)
}
}
if len(rec2) != 0 {
buf = rec2[:0]
if err := a.head.wal.Log(rec2); err != nil {
return fmt.Errorf("log custom bucket histograms: %w", err)
}
}
}
if len(a.floatHistograms) > 0 {
rec = enc.FloatHistogramSamples(a.floatHistograms, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log float histograms: %w", err)
rec1, rec2 := enc.FloatHistogramSamples(a.floatHistograms, buf)
//rec = append(rec1, rec2...)
//
//buf = rec[:0]
//
//if err := a.head.wal.Log(rec); err != nil {
// return fmt.Errorf("log samples: %w", err)
//}
if len(rec1) != 0 {
buf = rec1[:0]
if err := a.head.wal.Log(rec1); err != nil {
return fmt.Errorf("log float histograms: %w", err)
}
}
if len(rec2) != 0 {
buf = rec2[:0]
if err := a.head.wal.Log(rec2); err != nil {
return fmt.Errorf("log custom bucket float histograms: %w", err)
}
}
}
// Exemplars should be logged after samples (float/native histogram/etc),

View file

@ -740,6 +740,89 @@ func TestHead_ReadWAL(t *testing.T) {
}
}
func TestHead_ReadWAL2(t *testing.T) {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
entries := []interface{}{
[]record.RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
{Ref: 11, Labels: labels.FromStrings("a", "2")},
{Ref: 100, Labels: labels.FromStrings("a", "3")},
},
[]record.RefHistogramSample{
{Ref: 0, T: 99, H: tsdbutil.GenerateTestHistogram(1)},
{Ref: 10, T: 100, H: tsdbutil.GenerateTestCustomBucketsHistogram(2)},
{Ref: 100, T: 100, H: tsdbutil.GenerateTestHistogram(3)},
},
[]record.RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "4")},
// This series has two refs pointing to it.
{Ref: 101, Labels: labels.FromStrings("a", "3")},
},
[]record.RefHistogramSample{
{Ref: 10, T: 101, H: tsdbutil.GenerateTestHistogram(5)},
{Ref: 50, T: 101, H: tsdbutil.GenerateTestHistogram(6)},
{Ref: 101, T: 101, H: tsdbutil.GenerateTestCustomBucketsHistogram(7)},
},
[]tombstones.Stone{
{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
},
[]record.RefExemplar{
{Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")},
},
}
head, w := newTestHead(t, 1000, compress, false)
defer func() {
require.NoError(t, head.Close())
}()
populateTestWL(t, w, entries)
require.NoError(t, head.Init(math.MinInt64))
require.Equal(t, uint64(101), head.lastSeriesID.Load())
s10 := head.series.getByID(10)
s11 := head.series.getByID(11)
s50 := head.series.getByID(50)
s100 := head.series.getByID(100)
testutil.RequireEqual(t, labels.FromStrings("a", "1"), s10.lset)
require.Nil(t, s11) // Series without samples should be garbage collected at head.Init().
testutil.RequireEqual(t, labels.FromStrings("a", "4"), s50.lset)
testutil.RequireEqual(t, labels.FromStrings("a", "3"), s100.lset)
expandChunk := func(c chunkenc.Iterator) (x []sample) {
for c.Next() == chunkenc.ValHistogram {
t, v := c.AtHistogram(nil)
//t, v := c.At()
x = append(x, sample{t: t, h: v})
}
require.NoError(t, c.Err())
return x
}
c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{100, 0, tsdbutil.GenerateTestCustomBucketsHistogram(2), nil}, {101, 0, tsdbutil.GenerateTestCustomBucketsHistogram(5), nil}}, expandChunk(c.chunk.Iterator(nil)))
c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{101, 0, tsdbutil.GenerateTestHistogram(6), nil}}, expandChunk(c.chunk.Iterator(nil)))
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{101, 0, tsdbutil.GenerateTestCustomBucketsHistogram(7), nil}}, expandChunk(c.chunk.Iterator(nil)))
q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")})
require.NoError(t, err)
require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0]))
})
}
}
func TestHead_WALMultiRef(t *testing.T) {
head, w := newTestHead(t, 1000, wlog.CompressionNone, false)
@ -3953,6 +4036,194 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
testQuery()
}
func TestHistogramInWALAndMmapChunk2(t *testing.T) {
head, _ := newTestHead(t, 3000, wlog.CompressionNone, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))
// Series with only histograms.
s1 := labels.FromStrings("a", "b1")
k1 := s1.String()
numHistograms := 300
exp := map[string][]chunks.Sample{}
ts := int64(0)
var app storage.Appender
for _, custom := range []bool{true, false} {
app = head.Appender(context.Background())
var hists []*histogram.Histogram
if custom {
hists = tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
} else {
hists = tsdbutil.GenerateTestHistograms(numHistograms)
}
for _, h := range hists {
if !custom {
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
}
_, err := app.AppendHistogram(0, s1, ts, h, nil)
require.NoError(t, err)
exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()})
ts++
if ts%5 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
}
for _, custom := range []bool{true, false} {
app = head.Appender(context.Background())
var hists []*histogram.FloatHistogram
if custom {
hists = tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
} else {
hists = tsdbutil.GenerateTestFloatHistograms(numHistograms)
}
for _, h := range hists {
if !custom {
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
}
_, err := app.AppendHistogram(0, s1, ts, nil, h)
require.NoError(t, err)
exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()})
ts++
if ts%5 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
head.mmapHeadChunks()
}
// There should be 20 mmap chunks in s1.
ms := head.series.getByHash(s1.Hash(), s1)
require.Len(t, ms.mmappedChunks, 19)
expMmapChunks := make([]*mmappedChunk, 0, 20)
for _, mmap := range ms.mmappedChunks {
require.Positive(t, mmap.numSamples)
cpy := *mmap
expMmapChunks = append(expMmapChunks, &cpy)
}
expHeadChunkSamples := ms.headChunks.chunk.NumSamples()
require.Positive(t, expHeadChunkSamples)
// Series with mix of histograms and float.
s2 := labels.FromStrings("a", "b2")
k2 := s2.String()
ts = 0
for _, custom := range []bool{true, false} {
app = head.Appender(context.Background())
var hists []*histogram.Histogram
if custom {
hists = tsdbutil.GenerateTestCustomBucketsHistograms(100)
} else {
hists = tsdbutil.GenerateTestHistograms(100)
}
for _, h := range hists {
ts++
if !custom {
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
}
_, err := app.AppendHistogram(0, s2, ts, h, nil)
require.NoError(t, err)
eh := h.Copy()
if ts > 30 && (ts-10)%20 == 1 {
// Need "unknown" hint after float sample.
eh.CounterResetHint = histogram.UnknownCounterReset
}
exp[k2] = append(exp[k2], sample{t: ts, h: eh})
if ts%20 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
// Add some float.
for i := 0; i < 10; i++ {
ts++
_, err := app.Append(0, s2, ts, float64(ts))
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: ts, f: float64(ts)})
}
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
}
for _, custom := range []bool{true, false} {
app = head.Appender(context.Background())
var hists []*histogram.FloatHistogram
if custom {
hists = tsdbutil.GenerateTestCustomBucketsFloatHistograms(100)
} else {
hists = tsdbutil.GenerateTestFloatHistograms(100)
}
for _, h := range hists {
ts++
if !custom {
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
}
_, err := app.AppendHistogram(0, s2, ts, nil, h)
require.NoError(t, err)
eh := h.Copy()
if ts > 30 && (ts-10)%20 == 1 {
// Need "unknown" hint after float sample.
eh.CounterResetHint = histogram.UnknownCounterReset
}
exp[k2] = append(exp[k2], sample{t: ts, fh: eh})
if ts%20 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
// Add some float.
for i := 0; i < 10; i++ {
ts++
_, err := app.Append(0, s2, ts, float64(ts))
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: ts, f: float64(ts)})
}
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
}
// Restart head.
require.NoError(t, head.Close())
startHead := func() {
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(0))
}
startHead()
// Checking contents of s1.
ms = head.series.getByHash(s1.Hash(), s1)
require.Equal(t, expMmapChunks, ms.mmappedChunks)
require.Equal(t, expHeadChunkSamples, ms.headChunks.chunk.NumSamples())
testQuery := func() {
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
require.NoError(t, err)
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*"))
compareSeries(t, exp, act)
}
testQuery()
// Restart with no mmap chunks to test WAL replay.
require.NoError(t, head.Close())
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
startHead()
testQuery()
}
func TestChunkSnapshot(t *testing.T) {
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
defer func() {
@ -5089,6 +5360,48 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
require.Positive(t, offset)
}
func TestHistogramWALANDWBLReplay(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
opts.EnableNativeHistograms.Store(true)
opts.EnableOOONativeHistograms.Store(true)
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
require.NoError(t, h.Init(0))
var expOOOSamples []chunks.Sample
l := labels.FromStrings("foo", "bar")
appendSample := func(mins int64, val float64, isOOO bool, isCustomBucketHistogram bool) {
app := h.Appender(context.Background())
var s sample
if isCustomBucketHistogram {
s = sample{t: mins * time.Minute.Milliseconds(), h: tsdbutil.GenerateTestCustomBucketsHistogram(int(val))}
} else {
s = sample{t: mins * time.Minute.Milliseconds(), h: tsdbutil.GenerateTestHistogram(int(val))}
}
_, err := app.AppendHistogram(0, l, mins*time.Minute.Milliseconds(), s.h, nil)
require.NoError(t, err)
require.NoError(t, app.Commit())
if isOOO {
expOOOSamples = append(expOOOSamples, s)
}
}
// In-order histogram samples.
appendSample(60, 60, false, false)
}
// TestWBLReplay checks the replay at a low level.
func TestWBLReplay(t *testing.T) {
for name, scenario := range sampleTypeScenarios {

View file

@ -139,7 +139,8 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
dec := record.NewDecoder(syms)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
recType := dec.Type(rec)
switch recType {
case record.Series:
series := seriesPool.Get()[:0]
series, err = dec.Series(rec, series)
@ -188,7 +189,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decoded <- exemplars
case record.HistogramSamples:
case record.HistogramSamples, record.CustomBucketHistogramSamples:
hists := histogramsPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
@ -200,7 +201,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decoded <- hists
case record.FloatHistogramSamples:
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
hists := floatHistogramsPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {

View file

@ -52,6 +52,10 @@ const (
HistogramSamples Type = 7
// FloatHistogramSamples is used to match WAL records of type Float Histograms.
FloatHistogramSamples Type = 8
// CustomBucketHistogramSamples is used to match WAL records of type Histogram with custom buckets.
CustomBucketHistogramSamples Type = 9
// CustomBucketFloatHistogramSamples is used to match WAL records of type Float Histogram with custom buckets.
CustomBucketFloatHistogramSamples Type = 10
)
func (rt Type) String() string {
@ -68,6 +72,10 @@ func (rt Type) String() string {
return "histogram_samples"
case FloatHistogramSamples:
return "float_histogram_samples"
case CustomBucketHistogramSamples:
return "custom_bucket_histogram_samples"
case CustomBucketFloatHistogramSamples:
return "custom_bucket_float_histogram_samples"
case MmapMarkers:
return "mmapmarkers"
case Metadata:
@ -185,6 +193,10 @@ type RefFloatHistogramSample struct {
FH *histogram.FloatHistogram
}
type RefCustomBucketHistogramSample struct {
RefHistogramSample
}
// RefMmapMarker marks that the all the samples of the given series until now have been m-mapped to disk.
type RefMmapMarker struct {
Ref chunks.HeadSeriesRef
@ -207,7 +219,7 @@ func (d *Decoder) Type(rec []byte) Type {
return Unknown
}
switch t := Type(rec[0]); t {
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples:
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomBucketHistogramSamples, CustomBucketFloatHistogramSamples:
return t
}
return Unknown
@ -428,7 +440,7 @@ func (d *Decoder) MmapMarkers(rec []byte, markers []RefMmapMarker) ([]RefMmapMar
func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) ([]RefHistogramSample, error) {
dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte())
if t != HistogramSamples {
if t != HistogramSamples && t != CustomBucketHistogramSamples {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
@ -509,12 +521,10 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
if histogram.IsCustomBucketsSchema(h.Schema) {
l = buf.Uvarint()
if l > 0 {
if l > 0 {
h.CustomValues = make([]float64, l)
}
for i := range h.CustomValues {
h.CustomValues[i] = buf.Be64Float64()
}
h.CustomValues = make([]float64, l)
}
for i := range h.CustomValues {
h.CustomValues[i] = buf.Be64Float64()
}
}
}
@ -522,7 +532,7 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) {
dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte())
if t != FloatHistogramSamples {
if t != FloatHistogramSamples && t != CustomBucketFloatHistogramSamples {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
@ -603,12 +613,10 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) {
if histogram.IsCustomBucketsSchema(fh.Schema) {
l = buf.Uvarint()
if l > 0 {
if l > 0 {
fh.CustomValues = make([]float64, l)
}
for i := range fh.CustomValues {
fh.CustomValues[i] = buf.Be64Float64()
}
fh.CustomValues = make([]float64, l)
}
for i := range fh.CustomValues {
fh.CustomValues[i] = buf.Be64Float64()
}
}
}
@ -740,12 +748,15 @@ func (e *Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte {
return buf.Get()
}
func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []byte {
func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, []byte) {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(HistogramSamples))
customBucketHistBuf := encoding.Encbuf{B: b}
customBucketHistBuf.PutByte(byte(CustomBucketHistogramSamples))
if len(histograms) == 0 {
return buf.Get()
return buf.Get(), customBucketHistBuf.Get()
}
// Store base timestamp and base reference number of first histogram.
@ -754,14 +765,34 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []
buf.PutBE64(uint64(first.Ref))
buf.PutBE64int64(first.T)
for _, h := range histograms {
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
customBucketHistBuf.PutBE64(uint64(first.Ref))
customBucketHistBuf.PutBE64int64(first.T)
EncodeHistogram(&buf, h.H)
histsAdded := 0
customBucketHistsAdded := 0
for _, h := range histograms {
if h.H.UsesCustomBuckets() {
customBucketHistBuf.PutVarint64(int64(h.Ref) - int64(first.Ref))
customBucketHistBuf.PutVarint64(h.T - first.T)
EncodeHistogram(&customBucketHistBuf, h.H)
customBucketHistsAdded++
} else {
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
EncodeHistogram(&buf, h.H)
histsAdded++
}
}
return buf.Get()
if customBucketHistsAdded == 0 {
customBucketHistBuf.Reset()
} else if histsAdded == 0 {
buf.Reset()
}
return buf.Get(), customBucketHistBuf.Get()
}
// EncodeHistogram encodes a Histogram into a byte slice.
@ -805,12 +836,15 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) {
}
}
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, []byte) {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(FloatHistogramSamples))
customBucketHistBuf := encoding.Encbuf{B: b}
customBucketHistBuf.PutByte(byte(CustomBucketFloatHistogramSamples))
if len(histograms) == 0 {
return buf.Get()
return buf.Get(), customBucketHistBuf.Get()
}
// Store base timestamp and base reference number of first histogram.
@ -819,14 +853,34 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b
buf.PutBE64(uint64(first.Ref))
buf.PutBE64int64(first.T)
for _, h := range histograms {
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
customBucketHistBuf.PutBE64(uint64(first.Ref))
customBucketHistBuf.PutBE64int64(first.T)
EncodeFloatHistogram(&buf, h.FH)
histsAdded := 0
customBucketHistsAdded := 0
for _, h := range histograms {
if h.FH.UsesCustomBuckets() {
customBucketHistBuf.PutVarint64(int64(h.Ref) - int64(first.Ref))
customBucketHistBuf.PutVarint64(h.T - first.T)
EncodeFloatHistogram(&customBucketHistBuf, h.FH)
customBucketHistsAdded++
} else {
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
EncodeFloatHistogram(&buf, h.FH)
histsAdded++
}
}
return buf.Get()
if customBucketHistsAdded == 0 {
customBucketHistBuf.Reset()
} else if histsAdded == 0 {
buf.Reset()
}
return buf.Get(), customBucketHistBuf.Get()
}
// EncodeFloatHistogram encodes the Float Histogram into a byte slice.

View file

@ -148,10 +148,30 @@ func TestRecord_EncodeDecode(t *testing.T) {
NegativeBuckets: []int64{1, 2, -1},
},
},
{
Ref: 67,
T: 5678,
H: &histogram.Histogram{
Count: 8,
ZeroThreshold: 0.001,
Sum: 35.5,
Schema: -53,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
},
PositiveBuckets: []int64{2, -1, 2, 0},
CustomValues: []float64{0, 2, 4, 6, 8},
},
},
}
decHistograms, err := dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil)
histSamples, customBucketHistSamples := enc.HistogramSamples(histograms, nil)
decHistograms, err := dec.HistogramSamples(histSamples, nil)
require.NoError(t, err)
decCustomBucketHistograms, err := dec.HistogramSamples(customBucketHistSamples, nil)
require.NoError(t, err)
decHistograms = append(decHistograms, decCustomBucketHistograms...)
require.Equal(t, histograms, decHistograms)
floatHistograms := make([]RefFloatHistogramSample, len(histograms))
@ -162,24 +182,36 @@ func TestRecord_EncodeDecode(t *testing.T) {
FH: h.H.ToFloat(nil),
}
}
decFloatHistograms, err := dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
histSamples, customBucketFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil)
decFloatHistograms, err := dec.FloatHistogramSamples(histSamples, nil)
require.NoError(t, err)
decCustomBucketFloatHistograms, err := dec.FloatHistogramSamples(customBucketFloatHistSamples, nil)
decFloatHistograms = append(decFloatHistograms, decCustomBucketFloatHistograms...)
require.Equal(t, floatHistograms, decFloatHistograms)
// Gauge integer histograms.
for i := range histograms {
histograms[i].H.CounterResetHint = histogram.GaugeType
}
decHistograms, err = dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil)
gaugeHistSamples, customBucketGaugeHistSamples := enc.HistogramSamples(histograms, nil)
decGaugeHistograms, err := dec.HistogramSamples(gaugeHistSamples, nil)
require.NoError(t, err)
require.Equal(t, histograms, decHistograms)
decCustomBucketGaugeHistograms, err := dec.HistogramSamples(customBucketGaugeHistSamples, nil)
require.NoError(t, err)
decGaugeHistograms = append(decGaugeHistograms, decCustomBucketGaugeHistograms...)
require.Equal(t, histograms, decGaugeHistograms)
// Gauge float histograms.
for i := range floatHistograms {
floatHistograms[i].FH.CounterResetHint = histogram.GaugeType
}
decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
gaugeHistSamples, customBucketGaugeFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil)
decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeHistSamples, nil)
require.NoError(t, err)
decCustomBucketGaugeFloatHistograms, err := dec.FloatHistogramSamples(customBucketGaugeFloatHistSamples, nil)
decFloatHistograms = append(decGaugeFloatHistograms, decCustomBucketGaugeFloatHistograms...)
require.Equal(t, floatHistograms, decFloatHistograms)
}
@ -265,8 +297,12 @@ func TestRecord_Corrupted(t *testing.T) {
},
}
corrupted := enc.HistogramSamples(histograms, nil)[:8]
_, err := dec.HistogramSamples(corrupted, nil)
corruptedHists, corruptedCustomBucketHists := enc.HistogramSamples(histograms, nil)
corruptedHists = corruptedHists[:8]
corruptedCustomBucketHists = corruptedCustomBucketHists[:8]
_, err := dec.HistogramSamples(corruptedHists, nil)
require.ErrorIs(t, err, encoding.ErrInvalidSize)
_, err = dec.HistogramSamples(corruptedCustomBucketHists, nil)
require.ErrorIs(t, err, encoding.ErrInvalidSize)
})
}
@ -308,9 +344,28 @@ func TestRecord_Type(t *testing.T) {
PositiveBuckets: []int64{1, 1, -1, 0},
},
},
{
Ref: 67,
T: 5678,
H: &histogram.Histogram{
Count: 8,
ZeroThreshold: 0.001,
Sum: 35.5,
Schema: -53,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
},
PositiveBuckets: []int64{2, -1, 2, 0},
CustomValues: []float64{0, 2, 4, 6, 8},
},
},
}
recordType = dec.Type(enc.HistogramSamples(histograms, nil))
hists, customBucketHists := enc.HistogramSamples(histograms, nil)
recordType = dec.Type(hists)
require.Equal(t, HistogramSamples, recordType)
recordType = dec.Type(customBucketHists)
require.Equal(t, CustomBucketHistogramSamples, recordType)
recordType = dec.Type(nil)
require.Equal(t, Unknown, recordType)

View file

@ -29,11 +29,13 @@ import (
)
const (
float = "float"
intHistogram = "integer histogram"
floatHistogram = "float histogram"
gaugeIntHistogram = "gauge int histogram"
gaugeFloatHistogram = "gauge float histogram"
float = "float"
intHistogram = "integer histogram"
floatHistogram = "float histogram"
customBucketIntHistogram = "custom bucket int histogram"
customBucketFloatHistogram = "custom bucket float histogram"
gaugeIntHistogram = "gauge int histogram"
gaugeFloatHistogram = "gauge float histogram"
)
type testValue struct {
@ -82,6 +84,28 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
},
},
customBucketIntHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(int(value))}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err
},
sampleFunc: func(ts, value int64) sample {
return sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(int(value))}
},
},
customBucketFloatHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestCustomBucketsFloatHistogram(int(value))}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err
},
sampleFunc: func(ts, value int64) sample {
return sample{t: ts, fh: tsdbutil.GenerateTestCustomBucketsFloatHistogram(int(value))}
},
},
gaugeIntHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {

View file

@ -57,6 +57,18 @@ func GenerateTestHistogram(i int) *histogram.Histogram {
}
}
func GenerateTestCustomBucketsHistograms(n int) (r []*histogram.Histogram) {
for i := 0; i < n; i++ {
h := GenerateTestCustomBucketsHistogram(i)
if i > 0 {
h.CounterResetHint = histogram.NotCounterReset
}
r = append(r, h)
}
return r
}
func GenerateTestCustomBucketsHistogram(i int) *histogram.Histogram {
return &histogram.Histogram{
Count: 5 + uint64(i*4),
@ -117,6 +129,17 @@ func GenerateTestFloatHistogram(i int) *histogram.FloatHistogram {
}
}
func GenerateTestCustomBucketsFloatHistograms(n int) (r []*histogram.FloatHistogram) {
for i := 0; i < n; i++ {
h := GenerateTestCustomBucketsFloatHistogram(i)
if i > 0 {
h.CounterResetHint = histogram.NotCounterReset
}
r = append(r, h)
}
return r
}
func GenerateTestCustomBucketsFloatHistogram(i int) *histogram.FloatHistogram {
return &histogram.FloatHistogram{
Count: 5 + float64(i*4),

View file

@ -221,11 +221,27 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
}
}
if len(repl) > 0 {
buf = enc.HistogramSamples(repl, buf)
buf, _ = enc.HistogramSamples(repl, buf)
}
stats.TotalSamples += len(histogramSamples)
stats.DroppedSamples += len(histogramSamples) - len(repl)
case record.CustomBucketHistogramSamples:
histogramSamples, err = dec.HistogramSamples(rec, histogramSamples)
if err != nil {
return nil, fmt.Errorf("decode histogram samples: %w", err)
}
// Drop irrelevant histogramSamples in place.
repl := histogramSamples[:0]
for _, h := range histogramSamples {
if h.T >= mint {
repl = append(repl, h)
}
}
if len(repl) > 0 {
_, buf = enc.HistogramSamples(repl, buf)
}
stats.TotalSamples += len(histogramSamples)
stats.DroppedSamples += len(histogramSamples) - len(repl)
case record.FloatHistogramSamples:
floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples)
if err != nil {
@ -239,11 +255,27 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
}
}
if len(repl) > 0 {
buf = enc.FloatHistogramSamples(repl, buf)
buf, _ = enc.FloatHistogramSamples(repl, buf)
}
stats.TotalSamples += len(floatHistogramSamples)
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)
case record.CustomBucketFloatHistogramSamples:
floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples)
if err != nil {
return nil, fmt.Errorf("decode float histogram samples: %w", err)
}
// Drop irrelevant floatHistogramSamples in place.
repl := floatHistogramSamples[:0]
for _, fh := range floatHistogramSamples {
if fh.T >= mint {
repl = append(repl, fh)
}
}
if len(repl) > 0 {
_, buf = enc.FloatHistogramSamples(repl, buf)
}
stats.TotalSamples += len(floatHistogramSamples)
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)
case record.Tombstones:
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {

View file

@ -208,22 +208,24 @@ func TestCheckpoint(t *testing.T) {
require.NoError(t, w.Log(b))
samplesInWAL += 4
h := makeHistogram(i)
b = enc.HistogramSamples([]record.RefHistogramSample{
b1, b2 := enc.HistogramSamples([]record.RefHistogramSample{
{Ref: 0, T: last, H: h},
{Ref: 1, T: last + 10000, H: h},
{Ref: 2, T: last + 20000, H: h},
{Ref: 3, T: last + 30000, H: h},
}, nil)
require.NoError(t, w.Log(b))
require.NoError(t, w.Log(b1))
require.NoError(t, w.Log(b2))
histogramsInWAL += 4
fh := makeFloatHistogram(i)
b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
b1, b2 = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
{Ref: 0, T: last, FH: fh},
{Ref: 1, T: last + 10000, FH: fh},
{Ref: 2, T: last + 20000, FH: fh},
{Ref: 3, T: last + 30000, FH: fh},
}, nil)
require.NoError(t, w.Log(b))
require.NoError(t, w.Log(b1))
require.NoError(t, w.Log(b2))
floatHistogramsInWAL += 4
b = enc.Exemplars([]record.RefExemplar{

View file

@ -209,19 +209,21 @@ func TestTailSamples(t *testing.T) {
NegativeBuckets: []int64{int64(-i) - 1},
}
histogram := enc.HistogramSamples([]record.RefHistogramSample{{
histogram, customBucketHistogram := enc.HistogramSamples([]record.RefHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
H: hist,
}}, nil)
require.NoError(t, w.Log(histogram))
require.NoError(t, w.Log(customBucketHistogram))
floatHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
floatHistogram, floatCustomBucketHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
FH: hist.ToFloat(nil),
}}, nil)
require.NoError(t, w.Log(floatHistogram))
require.NoError(t, w.Log(floatCustomBucketHistogram))
}
}