mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
* Append metadata to the WAL Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Remove extra whitespace; Reword some docstrings and comments Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Use RLock() for hasNewMetadata check Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Use single byte for metric type in RefMetadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Update proposed WAL format for single-byte type metadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Implementa MetadataAppender interface for the Agent Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Address first round of review comments Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Amend description of metadata in wal.md Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Correct key used to retrieve metadata from cache When we're setting metadata entries in the scrapeCace, we're using the p.Help(), p.Unit(), p.Type() helpers, which retrieve the series name and use it as the cache key. When checking for cache entries though, we used p.Series() as the key, which included the metric name _with_ its labels. That meant that we were never actually hitting the cache. We're fixing this by utiling the __name__ internal label for correctly getting the cache entries after they've been set by setHelp(), setType() or setUnit(). Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Put feature behind a feature flag Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix AppendMetadata docstring Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Reorder WAL format document Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Change error message of AppendMetadata; Fix access of s.meta in AppendMetadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Reuse temporary buffer in Metadata encoder Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Only keep latest metadata for each refID during checkpointing Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix test that's referencing decoding metadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Avoid creating metadata block if no new metadata are present Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Add tests for corrupt metadata block and relevant record type Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix CR comments Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Extract logic about changing metadata in an anonymous function Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Implement new proposed WAL format and amend relevant tests Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Use 'const' for metadata field names Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Apply metadata to head memSeries in Commit, not in AppendMetadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Add docstring and rename extracted helper in scrape.go Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Add tests for tsdb-related cases Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix linter issues vol1 Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix linter issues vol2 Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix Windows test by closing WAL reader files Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Use switch instead of two if statements in metadata decoding Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix review comments around TestMetadata* tests Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Add code for replaying WAL; test correctness of in-memory data after a replay Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Remove scrape-loop related code from PR Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Address first round of comments Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Simplify tests by sorting slices before comparison Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix test to use separate transactions Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Empty out buffer and record slices after encoding latest metadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix linting issue Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Update calculation for DroppedMetadata metric Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Rename MetadataAppender interface and AppendMetadata method to MetadataUpdater/UpdateMetadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Reuse buffer when encoding latest metadata for each series Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix review comments; Check all returned error values using two helpers Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Simplify use of helpers Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Satisfy linter Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
471 lines
12 KiB
Go
471 lines
12 KiB
Go
// Copyright 2018 The Prometheus Authors
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.
|
|
package record
|
|
|
|
import (
|
|
"math"
|
|
"sort"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/textparse"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/encoding"
|
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
|
)
|
|
|
|
// Type represents the data type of a record.
|
|
type Type uint8
|
|
|
|
const (
|
|
// Unknown is returned for unrecognised WAL record types.
|
|
Unknown Type = 255
|
|
// Series is used to match WAL records of type Series.
|
|
Series Type = 1
|
|
// Samples is used to match WAL records of type Samples.
|
|
Samples Type = 2
|
|
// Tombstones is used to match WAL records of type Tombstones.
|
|
Tombstones Type = 3
|
|
// Exemplars is used to match WAL records of type Exemplars.
|
|
Exemplars Type = 4
|
|
// Metadata is used to match WAL records of type Metadata.
|
|
Metadata Type = 6
|
|
)
|
|
|
|
func (rt Type) String() string {
|
|
switch rt {
|
|
case Series:
|
|
return "series"
|
|
case Samples:
|
|
return "samples"
|
|
case Exemplars:
|
|
return "exemplars"
|
|
case Tombstones:
|
|
return "tombstones"
|
|
case Metadata:
|
|
return "metadata"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// MetricType represents the type of a series.
|
|
type MetricType uint8
|
|
|
|
const (
|
|
UnknownMT MetricType = 0
|
|
Counter MetricType = 1
|
|
Gauge MetricType = 2
|
|
Histogram MetricType = 3
|
|
GaugeHistogram MetricType = 4
|
|
Summary MetricType = 5
|
|
Info MetricType = 6
|
|
Stateset MetricType = 7
|
|
)
|
|
|
|
func GetMetricType(t textparse.MetricType) uint8 {
|
|
switch t {
|
|
case textparse.MetricTypeCounter:
|
|
return uint8(Counter)
|
|
case textparse.MetricTypeGauge:
|
|
return uint8(Gauge)
|
|
case textparse.MetricTypeHistogram:
|
|
return uint8(Histogram)
|
|
case textparse.MetricTypeGaugeHistogram:
|
|
return uint8(GaugeHistogram)
|
|
case textparse.MetricTypeSummary:
|
|
return uint8(Summary)
|
|
case textparse.MetricTypeInfo:
|
|
return uint8(Info)
|
|
case textparse.MetricTypeStateset:
|
|
return uint8(Stateset)
|
|
default:
|
|
return uint8(UnknownMT)
|
|
}
|
|
}
|
|
|
|
func ToTextparseMetricType(m uint8) textparse.MetricType {
|
|
switch m {
|
|
case uint8(Counter):
|
|
return textparse.MetricTypeCounter
|
|
case uint8(Gauge):
|
|
return textparse.MetricTypeGauge
|
|
case uint8(Histogram):
|
|
return textparse.MetricTypeHistogram
|
|
case uint8(GaugeHistogram):
|
|
return textparse.MetricTypeGaugeHistogram
|
|
case uint8(Summary):
|
|
return textparse.MetricTypeSummary
|
|
case uint8(Info):
|
|
return textparse.MetricTypeInfo
|
|
case uint8(Stateset):
|
|
return textparse.MetricTypeStateset
|
|
default:
|
|
return textparse.MetricTypeUnknown
|
|
}
|
|
}
|
|
|
|
const (
|
|
unitMetaName = "UNIT"
|
|
helpMetaName = "HELP"
|
|
)
|
|
|
|
// ErrNotFound is returned if a looked up resource was not found. Duplicate ErrNotFound from head.go.
|
|
var ErrNotFound = errors.New("not found")
|
|
|
|
// RefSeries is the series labels with the series ID.
|
|
type RefSeries struct {
|
|
Ref chunks.HeadSeriesRef
|
|
Labels labels.Labels
|
|
}
|
|
|
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
|
type RefSample struct {
|
|
Ref chunks.HeadSeriesRef
|
|
T int64
|
|
V float64
|
|
}
|
|
|
|
// RefMetadata is the metadata associated with a series ID.
|
|
type RefMetadata struct {
|
|
Ref chunks.HeadSeriesRef
|
|
Type uint8
|
|
Unit string
|
|
Help string
|
|
}
|
|
|
|
// RefExemplar is an exemplar with it's labels, timestamp, value the exemplar was collected/observed with, and a reference to a series.
|
|
type RefExemplar struct {
|
|
Ref chunks.HeadSeriesRef
|
|
T int64
|
|
V float64
|
|
Labels labels.Labels
|
|
}
|
|
|
|
// Decoder decodes series, sample, metadata and tombstone records.
|
|
// The zero value is ready to use.
|
|
type Decoder struct{}
|
|
|
|
// Type returns the type of the record.
|
|
// Returns RecordUnknown if no valid record type is found.
|
|
func (d *Decoder) Type(rec []byte) Type {
|
|
if len(rec) < 1 {
|
|
return Unknown
|
|
}
|
|
switch t := Type(rec[0]); t {
|
|
case Series, Samples, Tombstones, Exemplars, Metadata:
|
|
return t
|
|
}
|
|
return Unknown
|
|
}
|
|
|
|
// Series appends series in rec to the given slice.
|
|
func (d *Decoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
|
|
dec := encoding.Decbuf{B: rec}
|
|
|
|
if Type(dec.Byte()) != Series {
|
|
return nil, errors.New("invalid record type")
|
|
}
|
|
for len(dec.B) > 0 && dec.Err() == nil {
|
|
ref := storage.SeriesRef(dec.Be64())
|
|
|
|
lset := make(labels.Labels, dec.Uvarint())
|
|
|
|
for i := range lset {
|
|
lset[i].Name = dec.UvarintStr()
|
|
lset[i].Value = dec.UvarintStr()
|
|
}
|
|
sort.Sort(lset)
|
|
|
|
series = append(series, RefSeries{
|
|
Ref: chunks.HeadSeriesRef(ref),
|
|
Labels: lset,
|
|
})
|
|
}
|
|
if dec.Err() != nil {
|
|
return nil, dec.Err()
|
|
}
|
|
if len(dec.B) > 0 {
|
|
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
|
}
|
|
return series, nil
|
|
}
|
|
|
|
// Metadata appends metadata in rec to the given slice.
|
|
func (d *Decoder) Metadata(rec []byte, metadata []RefMetadata) ([]RefMetadata, error) {
|
|
dec := encoding.Decbuf{B: rec}
|
|
|
|
if Type(dec.Byte()) != Metadata {
|
|
return nil, errors.New("invalid record type")
|
|
}
|
|
for len(dec.B) > 0 && dec.Err() == nil {
|
|
ref := dec.Uvarint64()
|
|
typ := dec.Byte()
|
|
numFields := dec.Uvarint()
|
|
|
|
// We're currently aware of two more metadata fields other than TYPE; that is UNIT and HELP.
|
|
// We can skip the rest of the fields (if we encounter any), but we must decode them anyway
|
|
// so we can correctly align with the start with the next metadata record.
|
|
var unit, help string
|
|
for i := 0; i < numFields; i++ {
|
|
fieldName := dec.UvarintStr()
|
|
fieldValue := dec.UvarintStr()
|
|
switch fieldName {
|
|
case unitMetaName:
|
|
unit = fieldValue
|
|
case helpMetaName:
|
|
help = fieldValue
|
|
}
|
|
}
|
|
|
|
metadata = append(metadata, RefMetadata{
|
|
Ref: chunks.HeadSeriesRef(ref),
|
|
Type: typ,
|
|
Unit: unit,
|
|
Help: help,
|
|
})
|
|
}
|
|
if dec.Err() != nil {
|
|
return nil, dec.Err()
|
|
}
|
|
if len(dec.B) > 0 {
|
|
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
|
}
|
|
return metadata, nil
|
|
}
|
|
|
|
// Samples appends samples in rec to the given slice.
|
|
func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
|
|
dec := encoding.Decbuf{B: rec}
|
|
|
|
if Type(dec.Byte()) != Samples {
|
|
return nil, errors.New("invalid record type")
|
|
}
|
|
if dec.Len() == 0 {
|
|
return samples, nil
|
|
}
|
|
var (
|
|
baseRef = dec.Be64()
|
|
baseTime = dec.Be64int64()
|
|
)
|
|
for len(dec.B) > 0 && dec.Err() == nil {
|
|
dref := dec.Varint64()
|
|
dtime := dec.Varint64()
|
|
val := dec.Be64()
|
|
|
|
samples = append(samples, RefSample{
|
|
Ref: chunks.HeadSeriesRef(int64(baseRef) + dref),
|
|
T: baseTime + dtime,
|
|
V: math.Float64frombits(val),
|
|
})
|
|
}
|
|
|
|
if dec.Err() != nil {
|
|
return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples))
|
|
}
|
|
if len(dec.B) > 0 {
|
|
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
|
}
|
|
return samples, nil
|
|
}
|
|
|
|
// Tombstones appends tombstones in rec to the given slice.
|
|
func (d *Decoder) Tombstones(rec []byte, tstones []tombstones.Stone) ([]tombstones.Stone, error) {
|
|
dec := encoding.Decbuf{B: rec}
|
|
|
|
if Type(dec.Byte()) != Tombstones {
|
|
return nil, errors.New("invalid record type")
|
|
}
|
|
for dec.Len() > 0 && dec.Err() == nil {
|
|
tstones = append(tstones, tombstones.Stone{
|
|
Ref: storage.SeriesRef(dec.Be64()),
|
|
Intervals: tombstones.Intervals{
|
|
{Mint: dec.Varint64(), Maxt: dec.Varint64()},
|
|
},
|
|
})
|
|
}
|
|
if dec.Err() != nil {
|
|
return nil, dec.Err()
|
|
}
|
|
if len(dec.B) > 0 {
|
|
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
|
}
|
|
return tstones, nil
|
|
}
|
|
|
|
func (d *Decoder) Exemplars(rec []byte, exemplars []RefExemplar) ([]RefExemplar, error) {
|
|
dec := encoding.Decbuf{B: rec}
|
|
t := Type(dec.Byte())
|
|
if t != Exemplars {
|
|
return nil, errors.New("invalid record type")
|
|
}
|
|
|
|
return d.ExemplarsFromBuffer(&dec, exemplars)
|
|
}
|
|
|
|
func (d *Decoder) ExemplarsFromBuffer(dec *encoding.Decbuf, exemplars []RefExemplar) ([]RefExemplar, error) {
|
|
if dec.Len() == 0 {
|
|
return exemplars, nil
|
|
}
|
|
var (
|
|
baseRef = dec.Be64()
|
|
baseTime = dec.Be64int64()
|
|
)
|
|
for len(dec.B) > 0 && dec.Err() == nil {
|
|
dref := dec.Varint64()
|
|
dtime := dec.Varint64()
|
|
val := dec.Be64()
|
|
|
|
lset := make(labels.Labels, dec.Uvarint())
|
|
for i := range lset {
|
|
lset[i].Name = dec.UvarintStr()
|
|
lset[i].Value = dec.UvarintStr()
|
|
}
|
|
sort.Sort(lset)
|
|
|
|
exemplars = append(exemplars, RefExemplar{
|
|
Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)),
|
|
T: baseTime + dtime,
|
|
V: math.Float64frombits(val),
|
|
Labels: lset,
|
|
})
|
|
}
|
|
|
|
if dec.Err() != nil {
|
|
return nil, errors.Wrapf(dec.Err(), "decode error after %d exemplars", len(exemplars))
|
|
}
|
|
if len(dec.B) > 0 {
|
|
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
|
}
|
|
return exemplars, nil
|
|
}
|
|
|
|
// Encoder encodes series, sample, and tombstones records.
|
|
// The zero value is ready to use.
|
|
type Encoder struct{}
|
|
|
|
// Series appends the encoded series to b and returns the resulting slice.
|
|
func (e *Encoder) Series(series []RefSeries, b []byte) []byte {
|
|
buf := encoding.Encbuf{B: b}
|
|
buf.PutByte(byte(Series))
|
|
|
|
for _, s := range series {
|
|
buf.PutBE64(uint64(s.Ref))
|
|
buf.PutUvarint(len(s.Labels))
|
|
|
|
for _, l := range s.Labels {
|
|
buf.PutUvarintStr(l.Name)
|
|
buf.PutUvarintStr(l.Value)
|
|
}
|
|
}
|
|
return buf.Get()
|
|
}
|
|
|
|
// Metadata appends the encoded metadata to b and returns the resulting slice.
|
|
func (e *Encoder) Metadata(metadata []RefMetadata, b []byte) []byte {
|
|
buf := encoding.Encbuf{B: b}
|
|
buf.PutByte(byte(Metadata))
|
|
|
|
for _, m := range metadata {
|
|
buf.PutUvarint64(uint64(m.Ref))
|
|
|
|
buf.PutByte(m.Type)
|
|
|
|
buf.PutUvarint(2) // num_fields: We currently have two more metadata fields, UNIT and HELP.
|
|
buf.PutUvarintStr(unitMetaName)
|
|
buf.PutUvarintStr(m.Unit)
|
|
buf.PutUvarintStr(helpMetaName)
|
|
buf.PutUvarintStr(m.Help)
|
|
}
|
|
|
|
return buf.Get()
|
|
}
|
|
|
|
// Samples appends the encoded samples to b and returns the resulting slice.
|
|
func (e *Encoder) Samples(samples []RefSample, b []byte) []byte {
|
|
buf := encoding.Encbuf{B: b}
|
|
buf.PutByte(byte(Samples))
|
|
|
|
if len(samples) == 0 {
|
|
return buf.Get()
|
|
}
|
|
|
|
// Store base timestamp and base reference number of first sample.
|
|
// All samples encode their timestamp and ref as delta to those.
|
|
first := samples[0]
|
|
|
|
buf.PutBE64(uint64(first.Ref))
|
|
buf.PutBE64int64(first.T)
|
|
|
|
for _, s := range samples {
|
|
buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
|
|
buf.PutVarint64(s.T - first.T)
|
|
buf.PutBE64(math.Float64bits(s.V))
|
|
}
|
|
return buf.Get()
|
|
}
|
|
|
|
// Tombstones appends the encoded tombstones to b and returns the resulting slice.
|
|
func (e *Encoder) Tombstones(tstones []tombstones.Stone, b []byte) []byte {
|
|
buf := encoding.Encbuf{B: b}
|
|
buf.PutByte(byte(Tombstones))
|
|
|
|
for _, s := range tstones {
|
|
for _, iv := range s.Intervals {
|
|
buf.PutBE64(uint64(s.Ref))
|
|
buf.PutVarint64(iv.Mint)
|
|
buf.PutVarint64(iv.Maxt)
|
|
}
|
|
}
|
|
return buf.Get()
|
|
}
|
|
|
|
func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte {
|
|
buf := encoding.Encbuf{B: b}
|
|
buf.PutByte(byte(Exemplars))
|
|
|
|
if len(exemplars) == 0 {
|
|
return buf.Get()
|
|
}
|
|
|
|
e.EncodeExemplarsIntoBuffer(exemplars, &buf)
|
|
|
|
return buf.Get()
|
|
}
|
|
|
|
func (e *Encoder) EncodeExemplarsIntoBuffer(exemplars []RefExemplar, buf *encoding.Encbuf) {
|
|
// Store base timestamp and base reference number of first sample.
|
|
// All samples encode their timestamp and ref as delta to those.
|
|
first := exemplars[0]
|
|
|
|
buf.PutBE64(uint64(first.Ref))
|
|
buf.PutBE64int64(first.T)
|
|
|
|
for _, ex := range exemplars {
|
|
buf.PutVarint64(int64(ex.Ref) - int64(first.Ref))
|
|
buf.PutVarint64(ex.T - first.T)
|
|
buf.PutBE64(math.Float64bits(ex.V))
|
|
|
|
buf.PutUvarint(len(ex.Labels))
|
|
for _, l := range ex.Labels {
|
|
buf.PutUvarintStr(l.Name)
|
|
buf.PutUvarintStr(l.Value)
|
|
}
|
|
}
|
|
}
|