From 00ec720c29bce637c1580a2ce43b32122d133f83 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 26 Jul 2022 15:42:00 +0100 Subject: [PATCH] tsdb: extract functions to encode and decode labels (#11045) * tsdb/record: Extract functions to encode and decode labels Signed-off-by: Bryan Boreham * tsdb: make use of Encode/Decode Labels Simplify the code by re-using routines from tsdb/record. Signed-off-by: Bryan Boreham --- tsdb/head_wal.go | 18 ++++----------- tsdb/record/record.go | 53 +++++++++++++++++++++---------------------- tsdb/wal.go | 19 +++------------- 3 files changed, 34 insertions(+), 56 deletions(-) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 32e2b93fb..f6990d8bf 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -498,11 +498,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf.PutByte(chunkSnapshotRecordTypeSeries) buf.PutBE64(uint64(s.ref)) - buf.PutUvarint(len(s.lset)) - for _, l := range s.lset { - buf.PutUvarintStr(l.Name) - buf.PutUvarintStr(l.Value) - } + record.EncodeLabels(&buf, s.lset) buf.PutBE64int64(s.chunkRange) s.Lock() @@ -525,7 +521,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { return buf.Get() } -func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error) { +func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapshotRecord, err error) { dec := encoding.Decbuf{B: b} if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries { @@ -533,13 +529,9 @@ func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error } csr.ref = chunks.HeadSeriesRef(dec.Be64()) - // The label set written to the disk is already sorted. - csr.lset = make(labels.Labels, dec.Uvarint()) - for i := range csr.lset { - csr.lset[i].Name = dec.UvarintStr() - csr.lset[i].Value = dec.UvarintStr() - } + // TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it. + csr.lset = d.DecodeLabels(&dec) csr.chunkRange = dec.Be64int64() if dec.Uvarint() == 0 { @@ -971,7 +963,7 @@ Outer: switch rec[0] { case chunkSnapshotRecordTypeSeries: numSeries++ - csr, err := decodeSeriesFromChunkSnapshot(rec) + csr, err := decodeSeriesFromChunkSnapshot(&dec, rec) if err != nil { loopErr = errors.Wrap(err, "decode series record") break Outer diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 9e8de9169..ee7169a45 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -183,14 +183,7 @@ func (d *Decoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { } for len(dec.B) > 0 && dec.Err() == nil { ref := storage.SeriesRef(dec.Be64()) - - lset := make(labels.Labels, dec.Uvarint()) - - for i := range lset { - lset[i].Name = dec.UvarintStr() - lset[i].Value = dec.UvarintStr() - } - sort.Sort(lset) + lset := d.DecodeLabels(&dec) series = append(series, RefSeries{ Ref: chunks.HeadSeriesRef(ref), @@ -249,6 +242,18 @@ func (d *Decoder) Metadata(rec []byte, metadata []RefMetadata) ([]RefMetadata, e return metadata, nil } +// DecodeLabels decodes one set of labels from buf. +func (d *Decoder) DecodeLabels(dec *encoding.Decbuf) labels.Labels { + lset := make(labels.Labels, dec.Uvarint()) + + for i := range lset { + lset[i].Name = dec.UvarintStr() + lset[i].Value = dec.UvarintStr() + } + sort.Sort(lset) + return lset +} + // Samples appends samples in rec to the given slice. func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { dec := encoding.Decbuf{B: rec} @@ -330,13 +335,7 @@ func (d *Decoder) ExemplarsFromBuffer(dec *encoding.Decbuf, exemplars []RefExemp dref := dec.Varint64() dtime := dec.Varint64() val := dec.Be64() - - lset := make(labels.Labels, dec.Uvarint()) - for i := range lset { - lset[i].Name = dec.UvarintStr() - lset[i].Value = dec.UvarintStr() - } - sort.Sort(lset) + lset := d.DecodeLabels(dec) exemplars = append(exemplars, RefExemplar{ Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)), @@ -366,12 +365,7 @@ func (e *Encoder) Series(series []RefSeries, b []byte) []byte { for _, s := range series { buf.PutBE64(uint64(s.Ref)) - buf.PutUvarint(len(s.Labels)) - - for _, l := range s.Labels { - buf.PutUvarintStr(l.Name) - buf.PutUvarintStr(l.Value) - } + EncodeLabels(&buf, s.Labels) } return buf.Get() } @@ -396,6 +390,16 @@ func (e *Encoder) Metadata(metadata []RefMetadata, b []byte) []byte { return buf.Get() } +// EncodeLabels encodes the contents of labels into buf. +func EncodeLabels(buf *encoding.Encbuf, lbls labels.Labels) { + buf.PutUvarint(len(lbls)) + + for _, l := range lbls { + buf.PutUvarintStr(l.Name) + buf.PutUvarintStr(l.Value) + } +} + // Samples appends the encoded samples to b and returns the resulting slice. func (e *Encoder) Samples(samples []RefSample, b []byte) []byte { buf := encoding.Encbuf{B: b} @@ -460,11 +464,6 @@ func (e *Encoder) EncodeExemplarsIntoBuffer(exemplars []RefExemplar, buf *encodi buf.PutVarint64(int64(ex.Ref) - int64(first.Ref)) buf.PutVarint64(ex.T - first.T) buf.PutBE64(math.Float64bits(ex.V)) - - buf.PutUvarint(len(ex.Labels)) - for _, l := range ex.Labels { - buf.PutUvarintStr(l.Name) - buf.PutUvarintStr(l.Value) - } + EncodeLabels(buf, ex.Labels) } } diff --git a/tsdb/wal.go b/tsdb/wal.go index 2b9f117aa..615903c63 100644 --- a/tsdb/wal.go +++ b/tsdb/wal.go @@ -23,7 +23,6 @@ import ( "math" "os" "path/filepath" - "sort" "sync" "time" @@ -32,7 +31,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" @@ -790,12 +788,7 @@ const ( func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []record.RefSeries) uint8 { for _, s := range series { buf.PutBE64(uint64(s.Ref)) - buf.PutUvarint(len(s.Labels)) - - for _, l := range s.Labels { - buf.PutUvarintStr(l.Name) - buf.PutUvarintStr(l.Value) - } + record.EncodeLabels(buf, s.Labels) } return walSeriesSimple } @@ -840,6 +833,7 @@ type walReader struct { cur int buf []byte crc32 hash.Hash32 + dec record.Decoder curType WALEntryType curFlag byte @@ -1123,14 +1117,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte, res *[]record.RefSeries) e for len(dec.B) > 0 && dec.Err() == nil { ref := chunks.HeadSeriesRef(dec.Be64()) - - lset := make(labels.Labels, dec.Uvarint()) - - for i := range lset { - lset[i].Name = dec.UvarintStr() - lset[i].Value = dec.UvarintStr() - } - sort.Sort(lset) + lset := r.dec.DecodeLabels(&dec) *res = append(*res, record.RefSeries{ Ref: ref,