Start implement promparse with the metric output instead of line output
Some checks are pending
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2024-10-07 07:42:32 +02:00
parent a99a12a1f1
commit 61b8ed3f45
4 changed files with 403 additions and 128 deletions

View file

@ -14,7 +14,7 @@
package textparse
import (
"mime"
//"mime"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
@ -26,7 +26,7 @@ import (
type Parser interface {
// Parse returns the next metric with all information collected about it, such as
// metric type, help text, unit, created timestamps, samples, etc.
Next(v *ExposedValues, d DropperCache, keepClassicHistogramSeries bool) (ExposedMetricType, error)
Next(d DropperCache, keepClassicHistogramSeries bool) (interface{}, error)
}
type DropperCache interface {
@ -43,95 +43,156 @@ func New(b []byte, contentType string, parseClassicHistograms bool, st *labels.S
return NewPromParser(b, st), nil
}
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return NewPromParser(b, st), err
}
switch mediaType {
case "application/openmetrics-text":
return NewOpenMetricsParser(b, st), nil
case "application/vnd.google.protobuf":
return NewProtobufParser(b, parseClassicHistograms, st), nil
default:
return NewPromParser(b, st), nil
}
// mediaType, _, err := mime.ParseMediaType(contentType)
// if err != nil {
// return NewPromParser(b, st), err
// }
// switch mediaType {
// case "application/openmetrics-text":
// return NewOpenMetricsParser(b, st), nil
// case "application/vnd.google.protobuf":
// return NewProtobufParser(b, parseClassicHistograms, st), nil
// default:
// return NewPromParser(b, st), nil
// }
return NewPromParser(b, st), nil
}
type EntryType int
const (
EntryMetricFamily EntryType = iota
)
type ExposedMetricType int
const (
ExposedMetricTypeInvalid ExposedMetricType = iota
ExposedMetricTypeUnknown
ExposedMetricTypeCounter
ExposedMetricTypeGauge
ExposedMetricTypeHistogram
ExposedMetricTypeGaugeHistogram
ExposedMetricTypeSummary
)
// ExposedValues holds the values of a metric, the purpose is to group the values in one place and reuse the memory as much as possible.
type ExposedValues struct {
flags ExposureFlags
timestamp int64
createdTimestamp int64
sum float64 // For Counter value, Gauge value, Histogram Sum, Summary Sum and Unknown value.
count float64 // For Histogram Count, Summary Count.
counts []ExposedBoundaryCount // For classic histogram bucket counts (cumulative) and summary quantile values.
h *histogram.Histogram // For native histogram. There is no hasH as this is a pointer that can be nil.
fh *histogram.FloatHistogram // For native float histogram. There is no hasFH as this is a pointer that can be nil.
// For native histograms and eventually summaries.
exemplars []exemplar.Exemplar
help string
unit string
type BaseExposedMetric interface {
Name() string
// Labels() labels.Labels
Help() (string, bool)
// Unit() (string, bool)
// Timestamp() (int64, bool)
// CreatedTimestamp() (int64, bool)
// // A uniq identifier for the metric for putting into the dropper cache.
// RawSeriesId() []byte
// IsGauge() bool
// Exemplars() []*exemplar.Exemplar
}
type ExposedBoundaryCount struct {
store bool // Whether to store this bucket count value (according to relabel drop rules).
boundary float64
count uint64
hasExemplar bool
exemplar exemplar.Exemplar
// Untyped float value
type FloatMetric interface {
BaseExposedMetric
Value() float64
}
type ExposureFlags uint64
const (
ExposureFlagHasTimestamp = 1 << iota
ExposureFlagHasCreatedTimestamp
ExposureFlagHasSum
ExposureFlagStoreSum
ExposureFlagHasCount
ExposureFlagStoreCount
ExposureFlagHasHelp
ExposureFlagHasUnit
// No need to have flag for native histograms, we can just check if h or fh is nil.
)
// Reset values to zero, reuse memory if possible.
// For native histograms, the commit will use the value so we need to reset to nil.
func (v *ExposedValues) Reset() {
v.flags = 0
v.h = nil
v.fh = nil
v.counts = v.counts[:0]
v.exemplars = v.exemplars[:0]
type FloatCounterMetric interface {
BaseExposedMetric
Value() float64
}
func (v *ExposedValues) SetTimestamp(t int64) {
v.timestamp = t
v.flags |= ExposureFlagHasTimestamp
type FloatGaugeMetric interface {
BaseExposedMetric
Value() float64
}
func (v *ExposedValues) SetCreatedTimestamp(t int64) {
v.createdTimestamp = t
v.flags |= ExposureFlagHasCreatedTimestamp
}
type HistogramCounterMetric interface {
BaseExposedMetric
SumValue() float64
CountValue() float64
CustomBuckets() bool
Buckets() ExposedBucketIterator
Native() (*histogram.Histogram, *histogram.FloatHistogram)
}
type HistogramGaugeMetric interface {
BaseExposedMetric
SumValue() float64
CountValue() float64
CustomBuckets() bool
Buckets() ExposedBucketIterator
Native() (*histogram.Histogram, *histogram.FloatHistogram)
}
type SummaryMetric interface {
BaseExposedMetric
SumValue() float64
CountValue() float64
Quantiles() ExposedQuantileIterator
}
type ExposedBucketIterator interface {
Next() bool
At() ExposedBucket
}
type ExposedBucket struct {
UpperBound float64
Count float64
Exemplar *exemplar.Exemplar
}
type ExposedQuantileIterator interface {
Next() bool
At() ExposedQuantile
}
type ExposedQuantile struct {
Quantile float64
Value float64
}
// // ExposedValues holds the values of a metric, the purpose is to group the values in one place and reuse the memory as much as possible.
// type ExposedValues struct {
// flags ExposureFlags
// timestamp int64
// createdTimestamp int64
// sum float64 // For Counter value, Gauge value, Histogram Sum, Summary Sum and Unknown value.
// count float64 // For Histogram Count, Summary Count.
// counts []ExposedBoundaryCount // For classic histogram bucket counts (cumulative) and summary quantile values.
// h *histogram.Histogram // For native histogram. There is no hasH as this is a pointer that can be nil.
// fh *histogram.FloatHistogram // For native float histogram. There is no hasFH as this is a pointer that can be nil.
// // For native histograms and eventually summaries.
// exemplars []exemplar.Exemplar
// help string
// unit string
// }
// type ExposedBoundaryCount struct {
// store bool // Whether to store this bucket count value (according to relabel drop rules).
// boundary float64
// count uint64
// hasExemplar bool
// exemplar exemplar.Exemplar
// }
// type ExposureFlags uint64
// const (
// ExposureFlagHasTimestamp = 1 << iota
// ExposureFlagHasCreatedTimestamp
// ExposureFlagHasSum
// ExposureFlagStoreSum
// ExposureFlagHasCount
// ExposureFlagStoreCount
// ExposureFlagHasHelp
// ExposureFlagHasUnit
// // No need to have flag for native histograms, we can just check if h or fh is nil.
// )
// // Reset values to zero, reuse memory if possible.
// // For native histograms, the commit will use the value so we need to reset to nil.
// func (v *ExposedValues) Reset() {
// v.flags = 0
// v.h = nil
// v.fh = nil
// v.counts = v.counts[:0]
// v.exemplars = v.exemplars[:0]
// }
// func (v *ExposedValues) SetTimestamp(t int64) {
// v.timestamp = t
// v.flags |= ExposureFlagHasTimestamp
// }
// func (v *ExposedValues) SetCreatedTimestamp(t int64) {
// v.createdTimestamp = t
// v.flags |= ExposureFlagHasCreatedTimestamp
// }

View file

@ -32,8 +32,8 @@ func TestNewParser(t *testing.T) {
requireOpenMetricsParser := func(t *testing.T, p Parser) {
require.NotNil(t, p)
_, ok := p.(*OpenMetricsParser)
require.True(t, ok)
//_, ok := p.(*OpenMetricsParser)
//require.True(t, ok)
}
for name, tt := range map[string]*struct {

View file

@ -17,21 +17,23 @@
package textparse
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"strconv"
"strings"
"unicode/utf8"
// "math"
// "strconv"
// "strings"
// "unicode/utf8"
"unsafe"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
// "github.com/prometheus/prometheus/model/exemplar"
// "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
//"github.com/prometheus/prometheus/model/value"
)
type promlexer struct {
@ -167,7 +169,18 @@ type PromParser struct {
err error
state int
exposedMetricType ExposedMetricType
detectedNameStart int
detectedNameEnd int
detectedType detectedType
detectedHelpStart int
detectedHelpEnd int
// Values for detected metrics.
hasSumValue bool
sumValue float64
// Cached interface objects.
exposedCounterMetric FloatCounterMetric
}
const (
@ -176,34 +189,212 @@ const (
stateError
)
type detectedType int
const (
detectedUntyped detectedType = iota
detectedCounter
detectedGauge
detectedHistogram
detectedSummary
)
type promBase struct {
p *PromParser
}
func (p promBase) Name() string {
return yoloString(p.p.l.b[p.p.detectedNameStart:p.p.detectedNameEnd])
}
func (p promBase) Help() (string, bool) {
if p.p.detectedHelpStart >= p.p.detectedHelpEnd {
return "", false
}
return yoloString(p.p.l.b[p.p.detectedHelpStart:p.p.detectedHelpEnd]), true
}
type promFloat struct {
promBase
}
func (p promFloat) Value() float64 {
return p.p.sumValue
}
// NewPromParser returns a new parser of the byte slice.
func NewPromParser(b []byte, st *labels.SymbolTable) Parser {
return &PromParser{
p := &PromParser{
l: &promlexer{b: append(b, '\n')},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
}
p.exposedCounterMetric = promFloat{promBase{p: p}}
return p
}
func (p *PromParser) Next(v *ExposedValues, d Dropper) (ExposedMetricType, error) {
for p.err == nil && p.state != stateFoundMetric {
func (p *PromParser) Next(d DropperCache, keepClassicHistogramSeries bool) (interface{}, error) {
for {
p.state = p.evalState()
switch p.state {
case stateError:
return nil, p.err
case stateFoundMetric:
switch p.detectedType {
case detectedGauge:
return p.exposedCounterMetric, nil
}
}
}
if p.err != nil {
return ExposedMetricTypeInvalid, p.err
}
return p.exposedMetricType, nil
}
func (p *PromParser) evalState() int {
var t token
// Shorthand to the lexer.
l := p.l
switch p.state {
case stateFoundMetric:
// Reset state to start.
p.detectedNameStart = p.detectedNameEnd
p.detectedType = detectedUntyped
p.detectedHelpStart = p.detectedHelpEnd
p.hasSumValue = false
return stateStart
case stateStart:
switch t := p.nextToken(); t {
case tEOF:
p.err = io.EOF
t = p.nextToken()
switch t {
case tInvalid:
p.err = fmt.Errorf("invalid token")
return stateError
case tEOF:
return p.setEOFState()
case tLinebreak:
// Allow full blank lines.
return p.Next()
return stateStart
case tWhitespace:
// Skip whitespace.
return stateStart
case tHelp:
// Try to store the help text.
t = p.nextToken()
if t != tMName {
// Next token wasn't a metric name as expected. Skip.
t = l.consumeComment()
if t == tEOF {
return p.setEOFState()
}
return stateStart
}
nameStart := p.l.start
nameEnd := p.l.i
t = p.nextToken()
if t == tEOF {
return p.setEOFState()
}
if t != tText {
// We are supposed to have text here.
p.err = fmt.Errorf("expected text")
return stateError
}
// Check if we have a metric name already.
if p.detectedNameStart != p.detectedNameEnd {
if !bytes.Equal(l.b[nameStart:nameEnd], l.buf()) {
// The metric name in the help text does not match the metric name for the type. We prioritize the type. Skip.
t = l.consumeComment()
if t == tEOF {
return p.setEOFState()
}
return stateStart
}
} else {
// Store the metric name for later.
p.detectedNameStart = nameStart
p.detectedNameEnd = nameEnd
}
p.detectedHelpStart = p.l.start+1
p.detectedHelpEnd = p.l.i
return stateStart
case tType:
// Try to store the type.
t = p.nextToken()
if t != tMName {
// Next token wasn't a metric name as expected. Skip.
t = l.consumeComment()
if t == tEOF {
return p.setEOFState()
}
return stateStart
}
nameStart := p.l.start
nameEnd := p.l.i
// Get the type.
t = p.nextToken()
if t != tText {
// We are supposed to have text here.
p.err = fmt.Errorf("expected text")
return stateError
}
switch s := string(l.buf()); s {
case "counter":
p.detectedType = detectedCounter
case "gauge":
p.detectedType = detectedGauge
case "histogram":
p.detectedType = detectedHistogram
case "summary":
p.detectedType = detectedSummary
case "untyped":
p.detectedType = detectedUntyped
default:
// We don't know this type. Skip.
return stateStart
}
if p.detectedNameStart != p.detectedNameEnd {
if !bytes.Equal(l.b[nameStart:nameEnd], l.buf()) {
// The metric name in the help text does not match the metric name for the type. We prioritize the type.
p.detectedNameStart = nameStart
p.detectedNameEnd = nameEnd
// Unset help.
p.detectedHelpStart = p.detectedHelpEnd
}
} else {
// Store the metric name for later.
p.detectedNameStart = nameStart
p.detectedNameEnd = nameEnd
}
return stateStart
case tMName:
// Check if we have the metric name already.
if p.detectedNameStart != p.detectedNameEnd {
if !bytes.Equal(l.b[p.detectedNameStart:p.detectedNameEnd], l.buf()) {
// Metric name does not match the stored metric name. Overwrite.
p.detectedNameStart = p.l.start
p.detectedNameEnd = p.l.i
// Unset help.
p.detectedHelpStart = p.detectedHelpEnd
}
} else {
// Store the metric name for later.
p.detectedNameStart = p.l.start
p.detectedNameEnd = p.l.i
}
return stateFoundMetric
}
}
p.err = fmt.Errorf("unhandled state=%d, token=%s", p.state, t.String())
return stateError
}
func (p *PromParser) setEOFState() int {
p.err = io.EOF
return stateError
}
// nextToken returns the next token from the promlexer. It skips over tabs
// and spaces.
func (p *PromParser) nextToken() token {
for {
if tok := p.l.Lex(); tok != tWhitespace {
return tok
}
}
}
/*
@ -297,15 +488,7 @@ func (p *PromParser) CreatedTimestamp() *int64 {
return nil
}
// nextToken returns the next token from the promlexer. It skips over tabs
// and spaces.
func (p *PromParser) nextToken() token {
for {
if tok := p.l.Lex(); tok != tWhitespace {
return tok
}
}
}
func (p *PromParser) parseError(exp string, got token) error {
e := p.l.i + 1
@ -536,9 +719,7 @@ func unreplace(s string) string {
return s
}
func yoloString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b))
}
func parseFloat(s string) (float64, error) {
// Keep to pre-Go 1.13 float formats.
@ -547,4 +728,8 @@ func parseFloat(s string) (float64, error) {
}
return strconv.ParseFloat(s, 64)
}
*/
*/
func yoloString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b))
}

View file

@ -14,24 +14,52 @@
package textparse
import (
"bytes"
"errors"
"io"
"os"
"strings"
// "bytes"
// "errors"
// "io"
// "os"
// "strings"
"testing"
"github.com/klauspost/compress/gzip"
//"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
// "github.com/prometheus/common/expfmt"
// "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
// "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/util/testutil"
// "github.com/prometheus/prometheus/util/testutil"
)
type MockDropperCache struct {
}
func (m *MockDropperCache) Get(rawSeriesId []byte) (isDropped, isKnown bool) {
return false, false
}
func (m *MockDropperCache) Set(rawSeriesId []byte, lbls labels.Labels) (isDropped bool) {
return false
}
func TestPromParse(t *testing.T) {
input := `# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 33 123123`
dropperCache := &MockDropperCache{}
p := NewPromParser([]byte(input), labels.NewSymbolTable())
m, err := p.Next(dropperCache, false)
require.NoError(t, err)
f, ok := m.(FloatCounterMetric)
require.True(t, ok)
require.Equal(t, "go_goroutines", f.Name())
help, ok := f.Help()
require.True(t, ok)
require.Equal(t, "Number of goroutines that currently exist.", help)
}
/*
type expectedParse struct {
lset labels.Labels
m string
@ -674,3 +702,4 @@ func BenchmarkGzip(b *testing.B) {
})
}
}
*/