Rework with state functions and curr+next
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

The idea is to load metrics into curr, but if we see a change in metric
name or labels we load them into next , return curr as detected and
continue from next.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2024-10-07 08:02:54 +02:00
parent 61b8ed3f45
commit b3edeaad7d

View file

@ -23,12 +23,12 @@ import (
"io" "io"
// "math" // "math"
// "strconv" "strconv"
// "strings" "strings"
// "unicode/utf8" // "unicode/utf8"
"unsafe" "unsafe"
"github.com/prometheus/common/model" //"github.com/prometheus/common/model"
// "github.com/prometheus/prometheus/model/exemplar" // "github.com/prometheus/prometheus/model/exemplar"
// "github.com/prometheus/prometheus/model/histogram" // "github.com/prometheus/prometheus/model/histogram"
@ -146,41 +146,62 @@ func (l *promlexer) Error(es string) {
l.err = errors.New(es) l.err = errors.New(es)
} }
// A stateFn is a function that represents a state in a state machine. By
// executing it, the state is progressed to the next state. The stateFn returns
// another stateFn, which represents the new state. The returned bool indicates
// that a metric is found and the Parser.Next() can return. The error is set if
// an error occurred.
type stateFn func() (stateFn, bool, error)
type exposedValue struct {
// Meta.
nameStart int
nameEnd int
detectedType detectedType
helpStart int
helpEnd int
// Values.
hasTimestamp bool
timestamp int64
hasSumValue bool
sumValue float64
hasCountValue bool
countValue float64
//hasInfValue bool
}
// PromParser parses samples from a byte slice of samples in the official // PromParser parses samples from a byte slice of samples in the official
// Prometheus text exposition format. // Prometheus text exposition format.
type PromParser struct { type PromParser struct {
l *promlexer l *promlexer
builder labels.ScratchBuilder // builder labels.ScratchBuilder
series []byte // series []byte
text []byte // text []byte
mtype model.MetricType // mtype model.MetricType
val float64 // val float64
ts int64 // ts int64
hasTS bool // hasTS bool
start int // start int
// offsets is a list of offsets into series that describe the positions // offsets is a list of offsets into series that describe the positions
// of the metric name and label names and values for this series. // of the metric name and label names and values for this series.
// p.offsets[0] is the start character of the metric name. // p.offsets[0] is the start character of the metric name.
// p.offsets[1] is the end of the metric name. // p.offsets[1] is the end of the metric name.
// Subsequently, p.offsets is a pair of pair of offsets for the positions // Subsequently, p.offsets is a pair of pair of offsets for the positions
// of the label name and value start and end characters. // of the label name and value start and end characters.
offsets []int //offsets []int
state stateFn
err error // Use these two struct to store the detected metric values.
state int // One is always the current metric, the other will be used to store the next metric.
detectedNameStart int ev1 exposedValue
detectedNameEnd int ev2 exposedValue
detectedType detectedType curr *exposedValue
detectedHelpStart int next *exposedValue
detectedHelpEnd int
// Values for detected metrics.
hasSumValue bool
sumValue float64
// Cached interface objects. // Cached interface objects.
exposedCounterMetric FloatCounterMetric exposedCounterMetric promFloat
} }
const ( const (
@ -199,18 +220,19 @@ const (
) )
type promBase struct { type promBase struct {
p *PromParser buffer []byte
ev *exposedValue
} }
func (p promBase) Name() string { func (p promBase) Name() string {
return yoloString(p.p.l.b[p.p.detectedNameStart:p.p.detectedNameEnd]) return yoloString(p.buffer[p.ev.nameStart:p.ev.nameEnd])
} }
func (p promBase) Help() (string, bool) { func (p promBase) Help() (string, bool) {
if p.p.detectedHelpStart >= p.p.detectedHelpEnd { if p.ev.helpStart >= p.ev.helpEnd {
return "", false return "", false
} }
return yoloString(p.p.l.b[p.p.detectedHelpStart:p.p.detectedHelpEnd]), true return yoloString(p.buffer[p.ev.helpStart:p.ev.helpEnd]), true
} }
type promFloat struct { type promFloat struct {
@ -218,174 +240,241 @@ type promFloat struct {
} }
func (p promFloat) Value() float64 { func (p promFloat) Value() float64 {
return p.p.sumValue return p.ev.sumValue
} }
// NewPromParser returns a new parser of the byte slice. // NewPromParser returns a new parser of the byte slice.
func NewPromParser(b []byte, st *labels.SymbolTable) Parser { func NewPromParser(b []byte, st *labels.SymbolTable) Parser {
p := &PromParser{ p := &PromParser{
l: &promlexer{b: append(b, '\n')}, l: &promlexer{b: append(b, '\n')},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16), //builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
} }
p.exposedCounterMetric = promFloat{promBase{p: p}} p.state = p.startState
p.curr = &p.ev1
p.next = &p.ev2
p.exposedCounterMetric = promFloat{promBase{buffer: p.l.b}}
return p return p
} }
func (p *PromParser) Next(d DropperCache, keepClassicHistogramSeries bool) (interface{}, error) { func (p *PromParser) Next(d DropperCache, keepClassicHistogramSeries bool) (interface{}, error) {
for { for {
p.state = p.evalState() var err error
switch p.state { var found bool
case stateError: p.state, found, err = p.state()
return nil, p.err if err != nil {
case stateFoundMetric: return nil, err
switch p.detectedType { }
case detectedGauge: if found {
return p.exposedCounterMetric, nil p.exposedCounterMetric.ev = p.curr
} return p.exposedCounterMetric, nil
} }
} }
} }
func (p *PromParser) evalState() int { func (p* PromParser) startState() (stateFn, bool, error) {
var t token // Get the next token.
t := p.nextToken()
// Shorthand to the lexer. // Shorthand to the lexer.
l := p.l l := p.l
switch p.state { // Shorthand for the current and next metric.
case stateFoundMetric: curr := p.curr
// Reset state to start. next := p.next
p.detectedNameStart = p.detectedNameEnd
p.detectedType = detectedUntyped switch t {
p.detectedHelpStart = p.detectedHelpEnd case tInvalid:
p.hasSumValue = false return nil, false, fmt.Errorf("invalid token")
return stateStart case tEOF:
case stateStart: return p.eofState, p.isValid(), nil
case tLinebreak:
// Allow full blank lines.
return p.startState, false, nil
case tWhitespace:
// Skip whitespace.
return p.startState, false, nil
case tHelp:
// Try to store the help text.
t = p.nextToken() t = p.nextToken()
switch t { if t != tMName {
case tInvalid: // Next token wasn't a metric name as expected. Skip.
p.err = fmt.Errorf("invalid token") t = l.consumeComment()
return stateError
case tEOF:
return p.setEOFState()
case tLinebreak:
// Allow full blank lines.
return stateStart
case tWhitespace:
// Skip whitespace.
return stateStart
case tHelp:
// Try to store the help text.
t = p.nextToken()
if t != tMName {
// Next token wasn't a metric name as expected. Skip.
t = l.consumeComment()
if t == tEOF {
return p.setEOFState()
}
return stateStart
}
nameStart := p.l.start
nameEnd := p.l.i
t = p.nextToken()
if t == tEOF { if t == tEOF {
return p.setEOFState() return p.eofState, p.isValid(), nil
} }
if t != tText { return p.startState, false, nil
// We are supposed to have text here.
p.err = fmt.Errorf("expected text")
return stateError
}
// Check if we have a metric name already.
if p.detectedNameStart != p.detectedNameEnd {
if !bytes.Equal(l.b[nameStart:nameEnd], l.buf()) {
// The metric name in the help text does not match the metric name for the type. We prioritize the type. Skip.
t = l.consumeComment()
if t == tEOF {
return p.setEOFState()
}
return stateStart
}
} else {
// Store the metric name for later.
p.detectedNameStart = nameStart
p.detectedNameEnd = nameEnd
}
p.detectedHelpStart = p.l.start+1
p.detectedHelpEnd = p.l.i
return stateStart
case tType:
// Try to store the type.
t = p.nextToken()
if t != tMName {
// Next token wasn't a metric name as expected. Skip.
t = l.consumeComment()
if t == tEOF {
return p.setEOFState()
}
return stateStart
}
nameStart := p.l.start
nameEnd := p.l.i
// Get the type.
t = p.nextToken()
if t != tText {
// We are supposed to have text here.
p.err = fmt.Errorf("expected text")
return stateError
}
switch s := string(l.buf()); s {
case "counter":
p.detectedType = detectedCounter
case "gauge":
p.detectedType = detectedGauge
case "histogram":
p.detectedType = detectedHistogram
case "summary":
p.detectedType = detectedSummary
case "untyped":
p.detectedType = detectedUntyped
default:
// We don't know this type. Skip.
return stateStart
}
if p.detectedNameStart != p.detectedNameEnd {
if !bytes.Equal(l.b[nameStart:nameEnd], l.buf()) {
// The metric name in the help text does not match the metric name for the type. We prioritize the type.
p.detectedNameStart = nameStart
p.detectedNameEnd = nameEnd
// Unset help.
p.detectedHelpStart = p.detectedHelpEnd
}
} else {
// Store the metric name for later.
p.detectedNameStart = nameStart
p.detectedNameEnd = nameEnd
}
return stateStart
case tMName:
// Check if we have the metric name already.
if p.detectedNameStart != p.detectedNameEnd {
if !bytes.Equal(l.b[p.detectedNameStart:p.detectedNameEnd], l.buf()) {
// Metric name does not match the stored metric name. Overwrite.
p.detectedNameStart = p.l.start
p.detectedNameEnd = p.l.i
// Unset help.
p.detectedHelpStart = p.detectedHelpEnd
}
} else {
// Store the metric name for later.
p.detectedNameStart = p.l.start
p.detectedNameEnd = p.l.i
}
return stateFoundMetric
} }
// Remember the position of the name for later.
nameStart := p.l.start
nameEnd := p.l.i
t = p.nextToken()
if t == tEOF {
return p.eofState, p.isValid(), nil
}
if t != tText {
// We are supposed to have text here.
return nil, false, fmt.Errorf("expected text")
}
// Check if we have a metric name already.
if curr.nameStart != curr.nameEnd {
if !bytes.Equal(l.b[nameStart:nameEnd], l.b[curr.nameStart:curr.nameEnd]) {
// The metric name in the help text does not match the metric name for the type.
next.nameStart = nameStart
next.nameEnd = nameEnd
return p.resetState, p.isValid(), nil
}
} else {
// Store the metric name for later.
curr.nameStart = nameStart
curr.nameEnd = nameEnd
}
curr.helpStart = p.l.start+1
curr.helpEnd = p.l.i
return p.startState, false, nil
case tType:
// Try to store the type.
t = p.nextToken()
if t != tMName {
// Next token wasn't a metric name as expected. Skip.
t = l.consumeComment()
if t == tEOF {
return p.eofState, p.isValid(), nil
}
return p.startState, false, nil
}
// Remember the position of the name for later.
nameStart := p.l.start
nameEnd := p.l.i
// Get the type.
t = p.nextToken()
if t != tText {
// We are supposed to have text here.
return nil, false, fmt.Errorf("expected text")
}
switch s := string(l.b[l.start+1:l.i]); s {
case "counter":
curr.detectedType = detectedCounter
case "gauge":
curr.detectedType = detectedGauge
case "histogram":
curr.detectedType = detectedHistogram
case "summary":
curr.detectedType = detectedSummary
case "untyped":
curr.detectedType = detectedUntyped
default:
// We don't know this type. Skip.
return p.startState, false, nil
}
// Check if we have a metric name already.
if curr.nameStart != curr.nameEnd {
if !bytes.Equal(l.b[nameStart:nameEnd], l.b[curr.nameStart:curr.nameEnd]) {
// The metric name in the help text does not match the metric name for the type.
next.nameStart = nameStart
next.nameEnd = nameEnd
return p.resetState, p.isValid(), nil
}
} else {
// Store the metric name for later.
curr.nameStart = nameStart
curr.nameEnd = nameEnd
}
return p.startState, false, nil
case tMName:
return p.seriesState()
} }
p.err = fmt.Errorf("unhandled state=%d, token=%s", p.state, t.String())
return stateError return nil, false, fmt.Errorf("unhandled state")
} }
func (p *PromParser) setEOFState() int { func (p* PromParser) isValid() bool {
p.err = io.EOF curr := p.curr
return stateError switch curr.detectedType {
case detectedGauge:
return curr.hasSumValue
default:
return false
}
}
func (p *PromParser) eofState() (stateFn, bool, error) {
return nil, false, io.EOF
}
func (p *PromParser) resetState() (stateFn, bool, error) {
// The current values as no longer used, reset them and switch out with next.
curr := p.curr
curr.hasSumValue = false
curr.hasCountValue = false
curr.detectedType = detectedUntyped
curr.nameStart = curr.nameEnd
curr.helpStart = curr.helpEnd
p.curr = p.next
p.next = curr
return p.startState, false, nil
}
// This is the most complicated bit. We have to prepare for the happy case where we continue
// a metric family and also for the case where we start a new metric family.
func (p *PromParser) seriesState() (stateFn, bool, error) {
var err error
// Remember the position of the name for later.
nameStart := p.l.start
nameEnd := p.l.i
// Shorthand to the lexer.
l := p.l
// Shorthand for the current and next metric.
curr := p.curr
next := p.next
switch curr.detectedType {
case detectedGauge:
// The name should match the current metric name.
if !bytes.Equal(l.b[nameStart:nameEnd], l.b[curr.nameStart:curr.nameEnd]) {
// The metric name in the help text does not match the metric name for the type.
next.nameStart = nameStart
next.nameEnd = nameEnd
return p.retrySeries, p.isValid(), nil
}
// We have a gauge, try to parse the value.
t := p.nextToken()
if t != tValue {
// We are supposed to have a value here.
return nil, false, fmt.Errorf("expected value")
}
// Parse the value.
curr.sumValue, err = parseFloat(yoloString(l.buf()))
if err != nil {
return nil, false, fmt.Errorf("%w while parsing: %q", err, l.buf())
}
curr.hasSumValue = true
// Check if we have a timestamp.
switch p.nextToken() {
case tTimestamp:
curr.timestamp, err = strconv.ParseInt(yoloString(p.l.buf()), 10, 64)
if err != nil {
return nil, false, fmt.Errorf("%w while parsing: %q", err, l.buf())
}
curr.hasTimestamp = true
fallthrough
case tLinebreak:
// Done with this line.
return p.resetState, curr.hasSumValue, nil
default:
// Consume the rest of the line.
if l.consumeComment() == tEOF {
return p.eofState, curr.hasSumValue, nil
}
return p.resetState, curr.hasSumValue, nil
}
default:
return nil, false, fmt.Errorf("unhandled metric type")
}
}
func (p *PromParser) retrySeries() (stateFn, bool, error) {
// Switch out the current and next metric, but start from the seriesState and not the startState.
_, _, _ = p.resetState()
return p.seriesState, false, nil
} }
// nextToken returns the next token from the promlexer. It skips over tabs // nextToken returns the next token from the promlexer. It skips over tabs
@ -721,6 +810,9 @@ func unreplace(s string) string {
*/
func parseFloat(s string) (float64, error) { func parseFloat(s string) (float64, error) {
// Keep to pre-Go 1.13 float formats. // Keep to pre-Go 1.13 float formats.
if strings.ContainsAny(s, "pP_") { if strings.ContainsAny(s, "pP_") {
@ -728,7 +820,6 @@ func parseFloat(s string) (float64, error) {
} }
return strconv.ParseFloat(s, 64) return strconv.ParseFloat(s, 64)
} }
*/
func yoloString(b []byte) string { func yoloString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b)) return unsafe.String(unsafe.SliceData(b), len(b))