diff --git a/model/textparse/promparse.go b/model/textparse/promparse.go index e95a63ed6d..c6ff4ec4f3 100644 --- a/model/textparse/promparse.go +++ b/model/textparse/promparse.go @@ -23,12 +23,12 @@ import ( "io" // "math" - // "strconv" - // "strings" + "strconv" + "strings" // "unicode/utf8" "unsafe" - "github.com/prometheus/common/model" + //"github.com/prometheus/common/model" // "github.com/prometheus/prometheus/model/exemplar" // "github.com/prometheus/prometheus/model/histogram" @@ -146,41 +146,62 @@ func (l *promlexer) Error(es string) { l.err = errors.New(es) } +// A stateFn is a function that represents a state in a state machine. By +// executing it, the state is progressed to the next state. The stateFn returns +// another stateFn, which represents the new state. The returned bool indicates +// that a metric is found and the Parser.Next() can return. The error is set if +// an error occurred. +type stateFn func() (stateFn, bool, error) + +type exposedValue struct { + // Meta. + nameStart int + nameEnd int + detectedType detectedType + helpStart int + helpEnd int + + // Values. + hasTimestamp bool + timestamp int64 + hasSumValue bool + sumValue float64 + hasCountValue bool + countValue float64 + //hasInfValue bool +} + // PromParser parses samples from a byte slice of samples in the official // Prometheus text exposition format. type PromParser struct { l *promlexer - builder labels.ScratchBuilder - series []byte - text []byte - mtype model.MetricType - val float64 - ts int64 - hasTS bool - start int + // builder labels.ScratchBuilder + // series []byte + // text []byte + // mtype model.MetricType + // val float64 + // ts int64 + // hasTS bool + // start int // offsets is a list of offsets into series that describe the positions // of the metric name and label names and values for this series. // p.offsets[0] is the start character of the metric name. // p.offsets[1] is the end of the metric name. // Subsequently, p.offsets is a pair of pair of offsets for the positions // of the label name and value start and end characters. - offsets []int + //offsets []int + state stateFn - err error - state int - detectedNameStart int - detectedNameEnd int - detectedType detectedType - detectedHelpStart int - detectedHelpEnd int - - // Values for detected metrics. - hasSumValue bool - sumValue float64 + // Use these two struct to store the detected metric values. + // One is always the current metric, the other will be used to store the next metric. + ev1 exposedValue + ev2 exposedValue + curr *exposedValue + next *exposedValue // Cached interface objects. - exposedCounterMetric FloatCounterMetric + exposedCounterMetric promFloat } const ( @@ -199,18 +220,19 @@ const ( ) type promBase struct { - p *PromParser + buffer []byte + ev *exposedValue } func (p promBase) Name() string { - return yoloString(p.p.l.b[p.p.detectedNameStart:p.p.detectedNameEnd]) + return yoloString(p.buffer[p.ev.nameStart:p.ev.nameEnd]) } func (p promBase) Help() (string, bool) { - if p.p.detectedHelpStart >= p.p.detectedHelpEnd { + if p.ev.helpStart >= p.ev.helpEnd { return "", false } - return yoloString(p.p.l.b[p.p.detectedHelpStart:p.p.detectedHelpEnd]), true + return yoloString(p.buffer[p.ev.helpStart:p.ev.helpEnd]), true } type promFloat struct { @@ -218,174 +240,241 @@ type promFloat struct { } func (p promFloat) Value() float64 { - return p.p.sumValue + return p.ev.sumValue } // NewPromParser returns a new parser of the byte slice. func NewPromParser(b []byte, st *labels.SymbolTable) Parser { p := &PromParser{ l: &promlexer{b: append(b, '\n')}, - builder: labels.NewScratchBuilderWithSymbolTable(st, 16), + //builder: labels.NewScratchBuilderWithSymbolTable(st, 16), } - p.exposedCounterMetric = promFloat{promBase{p: p}} + p.state = p.startState + p.curr = &p.ev1 + p.next = &p.ev2 + p.exposedCounterMetric = promFloat{promBase{buffer: p.l.b}} return p } func (p *PromParser) Next(d DropperCache, keepClassicHistogramSeries bool) (interface{}, error) { for { - p.state = p.evalState() - switch p.state { - case stateError: - return nil, p.err - case stateFoundMetric: - switch p.detectedType { - case detectedGauge: - return p.exposedCounterMetric, nil - } + var err error + var found bool + p.state, found, err = p.state() + if err != nil { + return nil, err + } + if found { + p.exposedCounterMetric.ev = p.curr + return p.exposedCounterMetric, nil } } } -func (p *PromParser) evalState() int { - var t token +func (p* PromParser) startState() (stateFn, bool, error) { + // Get the next token. + t := p.nextToken() // Shorthand to the lexer. l := p.l - switch p.state { - case stateFoundMetric: - // Reset state to start. - p.detectedNameStart = p.detectedNameEnd - p.detectedType = detectedUntyped - p.detectedHelpStart = p.detectedHelpEnd - p.hasSumValue = false - return stateStart - case stateStart: + // Shorthand for the current and next metric. + curr := p.curr + next := p.next + + switch t { + case tInvalid: + return nil, false, fmt.Errorf("invalid token") + case tEOF: + return p.eofState, p.isValid(), nil + case tLinebreak: + // Allow full blank lines. + return p.startState, false, nil + case tWhitespace: + // Skip whitespace. + return p.startState, false, nil + case tHelp: + // Try to store the help text. t = p.nextToken() - switch t { - case tInvalid: - p.err = fmt.Errorf("invalid token") - return stateError - case tEOF: - return p.setEOFState() - case tLinebreak: - // Allow full blank lines. - return stateStart - case tWhitespace: - // Skip whitespace. - return stateStart - case tHelp: - // Try to store the help text. - t = p.nextToken() - if t != tMName { - // Next token wasn't a metric name as expected. Skip. - t = l.consumeComment() - if t == tEOF { - return p.setEOFState() - } - return stateStart - } - nameStart := p.l.start - nameEnd := p.l.i - t = p.nextToken() + if t != tMName { + // Next token wasn't a metric name as expected. Skip. + t = l.consumeComment() if t == tEOF { - return p.setEOFState() + return p.eofState, p.isValid(), nil } - if t != tText { - // We are supposed to have text here. - p.err = fmt.Errorf("expected text") - return stateError - } - // Check if we have a metric name already. - if p.detectedNameStart != p.detectedNameEnd { - if !bytes.Equal(l.b[nameStart:nameEnd], l.buf()) { - // The metric name in the help text does not match the metric name for the type. We prioritize the type. Skip. - t = l.consumeComment() - if t == tEOF { - return p.setEOFState() - } - return stateStart - } - } else { - // Store the metric name for later. - p.detectedNameStart = nameStart - p.detectedNameEnd = nameEnd - } - p.detectedHelpStart = p.l.start+1 - p.detectedHelpEnd = p.l.i - return stateStart - case tType: - // Try to store the type. - t = p.nextToken() - if t != tMName { - // Next token wasn't a metric name as expected. Skip. - t = l.consumeComment() - if t == tEOF { - return p.setEOFState() - } - return stateStart - } - nameStart := p.l.start - nameEnd := p.l.i - // Get the type. - t = p.nextToken() - if t != tText { - // We are supposed to have text here. - p.err = fmt.Errorf("expected text") - return stateError - } - switch s := string(l.buf()); s { - case "counter": - p.detectedType = detectedCounter - case "gauge": - p.detectedType = detectedGauge - case "histogram": - p.detectedType = detectedHistogram - case "summary": - p.detectedType = detectedSummary - case "untyped": - p.detectedType = detectedUntyped - default: - // We don't know this type. Skip. - return stateStart - } - if p.detectedNameStart != p.detectedNameEnd { - if !bytes.Equal(l.b[nameStart:nameEnd], l.buf()) { - // The metric name in the help text does not match the metric name for the type. We prioritize the type. - p.detectedNameStart = nameStart - p.detectedNameEnd = nameEnd - // Unset help. - p.detectedHelpStart = p.detectedHelpEnd - } - } else { - // Store the metric name for later. - p.detectedNameStart = nameStart - p.detectedNameEnd = nameEnd - } - return stateStart - case tMName: - // Check if we have the metric name already. - if p.detectedNameStart != p.detectedNameEnd { - if !bytes.Equal(l.b[p.detectedNameStart:p.detectedNameEnd], l.buf()) { - // Metric name does not match the stored metric name. Overwrite. - p.detectedNameStart = p.l.start - p.detectedNameEnd = p.l.i - // Unset help. - p.detectedHelpStart = p.detectedHelpEnd - } - } else { - // Store the metric name for later. - p.detectedNameStart = p.l.start - p.detectedNameEnd = p.l.i - } - return stateFoundMetric + return p.startState, false, nil } + // Remember the position of the name for later. + nameStart := p.l.start + nameEnd := p.l.i + t = p.nextToken() + if t == tEOF { + return p.eofState, p.isValid(), nil + } + if t != tText { + // We are supposed to have text here. + return nil, false, fmt.Errorf("expected text") + } + // Check if we have a metric name already. + if curr.nameStart != curr.nameEnd { + if !bytes.Equal(l.b[nameStart:nameEnd], l.b[curr.nameStart:curr.nameEnd]) { + // The metric name in the help text does not match the metric name for the type. + next.nameStart = nameStart + next.nameEnd = nameEnd + return p.resetState, p.isValid(), nil + } + } else { + // Store the metric name for later. + curr.nameStart = nameStart + curr.nameEnd = nameEnd + } + curr.helpStart = p.l.start+1 + curr.helpEnd = p.l.i + return p.startState, false, nil + case tType: + // Try to store the type. + t = p.nextToken() + if t != tMName { + // Next token wasn't a metric name as expected. Skip. + t = l.consumeComment() + if t == tEOF { + return p.eofState, p.isValid(), nil + } + return p.startState, false, nil + } + // Remember the position of the name for later. + nameStart := p.l.start + nameEnd := p.l.i + // Get the type. + t = p.nextToken() + if t != tText { + // We are supposed to have text here. + return nil, false, fmt.Errorf("expected text") + } + switch s := string(l.b[l.start+1:l.i]); s { + case "counter": + curr.detectedType = detectedCounter + case "gauge": + curr.detectedType = detectedGauge + case "histogram": + curr.detectedType = detectedHistogram + case "summary": + curr.detectedType = detectedSummary + case "untyped": + curr.detectedType = detectedUntyped + default: + // We don't know this type. Skip. + return p.startState, false, nil + } + // Check if we have a metric name already. + if curr.nameStart != curr.nameEnd { + if !bytes.Equal(l.b[nameStart:nameEnd], l.b[curr.nameStart:curr.nameEnd]) { + // The metric name in the help text does not match the metric name for the type. + next.nameStart = nameStart + next.nameEnd = nameEnd + return p.resetState, p.isValid(), nil + } + } else { + // Store the metric name for later. + curr.nameStart = nameStart + curr.nameEnd = nameEnd + } + return p.startState, false, nil + case tMName: + return p.seriesState() } - p.err = fmt.Errorf("unhandled state=%d, token=%s", p.state, t.String()) - return stateError + + return nil, false, fmt.Errorf("unhandled state") } -func (p *PromParser) setEOFState() int { - p.err = io.EOF - return stateError +func (p* PromParser) isValid() bool { + curr := p.curr + switch curr.detectedType { + case detectedGauge: + return curr.hasSumValue + default: + return false + } +} + +func (p *PromParser) eofState() (stateFn, bool, error) { + return nil, false, io.EOF +} + +func (p *PromParser) resetState() (stateFn, bool, error) { + // The current values as no longer used, reset them and switch out with next. + curr := p.curr + curr.hasSumValue = false + curr.hasCountValue = false + curr.detectedType = detectedUntyped + curr.nameStart = curr.nameEnd + curr.helpStart = curr.helpEnd + p.curr = p.next + p.next = curr + return p.startState, false, nil +} + +// This is the most complicated bit. We have to prepare for the happy case where we continue +// a metric family and also for the case where we start a new metric family. +func (p *PromParser) seriesState() (stateFn, bool, error) { + var err error + // Remember the position of the name for later. + nameStart := p.l.start + nameEnd := p.l.i + // Shorthand to the lexer. + l := p.l + // Shorthand for the current and next metric. + curr := p.curr + next := p.next + switch curr.detectedType { + case detectedGauge: + // The name should match the current metric name. + if !bytes.Equal(l.b[nameStart:nameEnd], l.b[curr.nameStart:curr.nameEnd]) { + // The metric name in the help text does not match the metric name for the type. + next.nameStart = nameStart + next.nameEnd = nameEnd + return p.retrySeries, p.isValid(), nil + } + // We have a gauge, try to parse the value. + t := p.nextToken() + if t != tValue { + // We are supposed to have a value here. + return nil, false, fmt.Errorf("expected value") + } + // Parse the value. + + curr.sumValue, err = parseFloat(yoloString(l.buf())) + if err != nil { + return nil, false, fmt.Errorf("%w while parsing: %q", err, l.buf()) + } + curr.hasSumValue = true + + // Check if we have a timestamp. + switch p.nextToken() { + case tTimestamp: + curr.timestamp, err = strconv.ParseInt(yoloString(p.l.buf()), 10, 64) + if err != nil { + return nil, false, fmt.Errorf("%w while parsing: %q", err, l.buf()) + } + curr.hasTimestamp = true + fallthrough + case tLinebreak: + // Done with this line. + return p.resetState, curr.hasSumValue, nil + default: + // Consume the rest of the line. + if l.consumeComment() == tEOF { + return p.eofState, curr.hasSumValue, nil + } + return p.resetState, curr.hasSumValue, nil + } + default: + return nil, false, fmt.Errorf("unhandled metric type") + } +} + +func (p *PromParser) retrySeries() (stateFn, bool, error) { + // Switch out the current and next metric, but start from the seriesState and not the startState. + _, _, _ = p.resetState() + return p.seriesState, false, nil } // nextToken returns the next token from the promlexer. It skips over tabs @@ -721,6 +810,9 @@ func unreplace(s string) string { + +*/ + func parseFloat(s string) (float64, error) { // Keep to pre-Go 1.13 float formats. if strings.ContainsAny(s, "pP_") { @@ -728,7 +820,6 @@ func parseFloat(s string) (float64, error) { } return strconv.ParseFloat(s, 64) } -*/ func yoloString(b []byte) string { return unsafe.String(unsafe.SliceData(b), len(b))