mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge branch 'main' into krajo/update-pr-14546-from-main
This commit is contained in:
commit
e66c22203c
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
|
@ -186,7 +186,7 @@ jobs:
|
|||
with:
|
||||
args: --verbose
|
||||
# Make sure to sync this with Makefile.common and scripts/golangci-lint.yml.
|
||||
version: v1.60.1
|
||||
version: v1.60.2
|
||||
fuzzing:
|
||||
uses: ./.github/workflows/fuzzing.yml
|
||||
if: github.event_name == 'pull_request'
|
||||
|
|
|
@ -25,15 +25,34 @@ linters:
|
|||
- loggercheck
|
||||
|
||||
issues:
|
||||
max-issues-per-linter: 0
|
||||
max-same-issues: 0
|
||||
# The default exclusions are too aggressive. For one, they
|
||||
# essentially disable any linting on doc comments. We disable
|
||||
# default exclusions here and add exclusions fitting our codebase
|
||||
# further down.
|
||||
exclude-use-default: false
|
||||
exclude-files:
|
||||
# Skip autogenerated files.
|
||||
- ^.*\.(pb|y)\.go$
|
||||
exclude-dirs:
|
||||
# Copied it from a different source
|
||||
# Copied it from a different source.
|
||||
- storage/remote/otlptranslator/prometheusremotewrite
|
||||
- storage/remote/otlptranslator/prometheus
|
||||
exclude-rules:
|
||||
- linters:
|
||||
- errcheck
|
||||
# Taken from the default exclusions (that are otherwise disabled above).
|
||||
text: Error return value of .((os\.)?std(out|err)\..*|.*Close|.*Flush|os\.Remove(All)?|.*print(f|ln)?|os\.(Un)?Setenv). is not checked
|
||||
- linters:
|
||||
- govet
|
||||
# We use many Seek methods that do not follow the usual pattern.
|
||||
text: "stdmethods: method Seek.* should have signature Seek"
|
||||
- linters:
|
||||
- revive
|
||||
# We have stopped at some point to write doc comments on exported symbols.
|
||||
# TODO(beorn7): Maybe we should enforce this again? There are ~500 offenders right now.
|
||||
text: exported (.+) should have comment( \(or a comment on this block\))? or be unexported
|
||||
- linters:
|
||||
- gocritic
|
||||
text: "appendAssign"
|
||||
|
@ -94,15 +113,14 @@ linters-settings:
|
|||
errorf: false
|
||||
revive:
|
||||
# By default, revive will enable only the linting rules that are named in the configuration file.
|
||||
# So, it's needed to explicitly set in configuration all required rules.
|
||||
# The following configuration enables all the rules from the defaults.toml
|
||||
# https://github.com/mgechev/revive/blob/master/defaults.toml
|
||||
# So, it's needed to explicitly enable all required rules here.
|
||||
rules:
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md
|
||||
- name: blank-imports
|
||||
- name: comment-spacings
|
||||
- name: context-as-argument
|
||||
arguments:
|
||||
# allow functions with test or bench signatures
|
||||
# Allow functions with test or bench signatures.
|
||||
- allowTypesBefore: "*testing.T,testing.TB"
|
||||
- name: context-keys-type
|
||||
- name: dot-imports
|
||||
|
@ -118,6 +136,8 @@ linters-settings:
|
|||
- name: increment-decrement
|
||||
- name: indent-error-flow
|
||||
- name: package-comments
|
||||
# TODO(beorn7): Currently, we have a lot of missing package doc comments. Maybe we should have them.
|
||||
disabled: true
|
||||
- name: range
|
||||
- name: receiver-naming
|
||||
- name: redefines-builtin-id
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
## unreleased
|
||||
|
||||
* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
|
||||
* [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706
|
||||
* [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042
|
||||
|
||||
## 2.54.0-rc.1 / 2024-08-05
|
||||
|
|
|
@ -61,7 +61,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_
|
|||
SKIP_GOLANGCI_LINT :=
|
||||
GOLANGCI_LINT :=
|
||||
GOLANGCI_LINT_OPTS ?=
|
||||
GOLANGCI_LINT_VERSION ?= v1.60.1
|
||||
GOLANGCI_LINT_VERSION ?= v1.60.2
|
||||
# golangci-lint only supports linux, darwin and windows platforms on i386/amd64/arm64.
|
||||
# windows isn't included here because of the path separator being different.
|
||||
ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin))
|
||||
|
|
|
@ -471,7 +471,7 @@ func (ls lintConfig) lintDuplicateRules() bool {
|
|||
return ls.all || ls.duplicateRules
|
||||
}
|
||||
|
||||
// Check server status - healthy & ready.
|
||||
// CheckServerStatus - healthy & ready.
|
||||
func CheckServerStatus(serverURL *url.URL, checkEndpoint string, roundTripper http.RoundTripper) error {
|
||||
if serverURL.Scheme == "" {
|
||||
serverURL.Scheme = "http"
|
||||
|
|
|
@ -31,7 +31,7 @@ import (
|
|||
"github.com/prometheus/prometheus/util/fmtutil"
|
||||
)
|
||||
|
||||
// Push metrics to a prometheus remote write (for testing purpose only).
|
||||
// PushMetrics to a prometheus remote write (for testing purpose only).
|
||||
func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int {
|
||||
addressURL, err := url.Parse(url.String())
|
||||
if err != nil {
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
|
||||
package discovery
|
||||
|
||||
// Create a dummy metrics struct, because this SD doesn't have any metrics.
|
||||
// NoopDiscovererMetrics creates a dummy metrics struct, because this SD doesn't have any metrics.
|
||||
type NoopDiscovererMetrics struct{}
|
||||
|
||||
var _ DiscovererMetrics = (*NoopDiscovererMetrics)(nil)
|
||||
|
|
|
@ -39,7 +39,7 @@ type Discoverer interface {
|
|||
Run(ctx context.Context, up chan<- []*targetgroup.Group)
|
||||
}
|
||||
|
||||
// Internal metrics of service discovery mechanisms.
|
||||
// DiscovererMetrics are internal metrics of service discovery mechanisms.
|
||||
type DiscovererMetrics interface {
|
||||
Register() error
|
||||
Unregister()
|
||||
|
@ -56,7 +56,7 @@ type DiscovererOptions struct {
|
|||
HTTPClientOptions []config.HTTPClientOption
|
||||
}
|
||||
|
||||
// Metrics used by the "refresh" package.
|
||||
// RefreshMetrics are used by the "refresh" package.
|
||||
// We define them here in the "discovery" package in order to avoid a cyclic dependency between
|
||||
// "discovery" and "refresh".
|
||||
type RefreshMetrics struct {
|
||||
|
@ -64,17 +64,18 @@ type RefreshMetrics struct {
|
|||
Duration prometheus.Observer
|
||||
}
|
||||
|
||||
// Instantiate the metrics used by the "refresh" package.
|
||||
// RefreshMetricsInstantiator instantiates the metrics used by the "refresh" package.
|
||||
type RefreshMetricsInstantiator interface {
|
||||
Instantiate(mech string) *RefreshMetrics
|
||||
}
|
||||
|
||||
// An interface for registering, unregistering, and instantiating metrics for the "refresh" package.
|
||||
// Refresh metrics are registered and unregistered outside of the service discovery mechanism.
|
||||
// This is so that the same metrics can be reused across different service discovery mechanisms.
|
||||
// To manage refresh metrics inside the SD mechanism, we'd need to use const labels which are
|
||||
// specific to that SD. However, doing so would also expose too many unused metrics on
|
||||
// the Prometheus /metrics endpoint.
|
||||
// RefreshMetricsManager is an interface for registering, unregistering, and
|
||||
// instantiating metrics for the "refresh" package. Refresh metrics are
|
||||
// registered and unregistered outside of the service discovery mechanism. This
|
||||
// is so that the same metrics can be reused across different service discovery
|
||||
// mechanisms. To manage refresh metrics inside the SD mechanism, we'd need to
|
||||
// use const labels which are specific to that SD. However, doing so would also
|
||||
// expose too many unused metrics on the Prometheus /metrics endpoint.
|
||||
type RefreshMetricsManager interface {
|
||||
DiscovererMetrics
|
||||
RefreshMetricsInstantiator
|
||||
|
@ -145,7 +146,8 @@ func (c StaticConfig) NewDiscoverer(DiscovererOptions) (Discoverer, error) {
|
|||
return staticDiscoverer(c), nil
|
||||
}
|
||||
|
||||
// No metrics are needed for this service discovery mechanism.
|
||||
// NewDiscovererMetrics returns NoopDiscovererMetrics because no metrics are
|
||||
// needed for this service discovery mechanism.
|
||||
func (c StaticConfig) NewDiscovererMetrics(prometheus.Registerer, RefreshMetricsInstantiator) DiscovererMetrics {
|
||||
return &NoopDiscovererMetrics{}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ func (p *Provider) Config() interface{} {
|
|||
return p.config
|
||||
}
|
||||
|
||||
// Registers the metrics needed for SD mechanisms.
|
||||
// CreateAndRegisterSDMetrics registers the metrics needed for SD mechanisms.
|
||||
// Does not register the metrics for the Discovery Manager.
|
||||
// TODO(ptodev): Add ability to unregister the metrics?
|
||||
func CreateAndRegisterSDMetrics(reg prometheus.Registerer) (map[string]DiscovererMetrics, error) {
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// Metric vectors for the "refresh" package.
|
||||
// RefreshMetricsVecs are metric vectors for the "refresh" package.
|
||||
// We define them here in the "discovery" package in order to avoid a cyclic dependency between
|
||||
// "discovery" and "refresh".
|
||||
type RefreshMetricsVecs struct {
|
||||
|
|
|
@ -19,8 +19,8 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// A utility to be used by implementations of discovery.Discoverer
|
||||
// which need to manage the lifetime of their metrics.
|
||||
// MetricRegisterer is used by implementations of discovery.Discoverer that need
|
||||
// to manage the lifetime of their metrics.
|
||||
type MetricRegisterer interface {
|
||||
RegisterMetrics() error
|
||||
UnregisterMetrics()
|
||||
|
@ -34,7 +34,7 @@ type metricRegistererImpl struct {
|
|||
|
||||
var _ MetricRegisterer = &metricRegistererImpl{}
|
||||
|
||||
// Creates an instance of a MetricRegisterer.
|
||||
// NewMetricRegisterer creates an instance of a MetricRegisterer.
|
||||
// Typically called inside the implementation of the NewDiscoverer() method.
|
||||
func NewMetricRegisterer(reg prometheus.Registerer, metrics []prometheus.Collector) MetricRegisterer {
|
||||
return &metricRegistererImpl{
|
||||
|
|
|
@ -3275,12 +3275,16 @@ Initially, aside from the configured per-target labels, a target's `job`
|
|||
label is set to the `job_name` value of the respective scrape configuration.
|
||||
The `__address__` label is set to the `<host>:<port>` address of the target.
|
||||
After relabeling, the `instance` label is set to the value of `__address__` by default if
|
||||
it was not set during relabeling. The `__scheme__` and `__metrics_path__` labels
|
||||
are set to the scheme and metrics path of the target respectively. The `__param_<name>`
|
||||
label is set to the value of the first passed URL parameter called `<name>`.
|
||||
it was not set during relabeling.
|
||||
|
||||
The `__scheme__` and `__metrics_path__` labels
|
||||
are set to the scheme and metrics path of the target respectively, as specified in `scrape_config`.
|
||||
|
||||
The `__param_<name>`
|
||||
label is set to the value of the first passed URL parameter called `<name>`, as defined in `scrape_config`.
|
||||
|
||||
The `__scrape_interval__` and `__scrape_timeout__` labels are set to the target's
|
||||
interval and timeout.
|
||||
interval and timeout, as specified in `scrape_config`.
|
||||
|
||||
Additional labels prefixed with `__meta_` may be available during the
|
||||
relabeling phase. They are set by the service discovery mechanism that provided
|
||||
|
|
|
@ -15,7 +15,9 @@ package exemplar
|
|||
|
||||
import "github.com/prometheus/prometheus/model/labels"
|
||||
|
||||
// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters
|
||||
// ExemplarMaxLabelSetLength is defined by OpenMetrics: "The combined length of
|
||||
// the label names and values of an Exemplar's LabelSet MUST NOT exceed 128
|
||||
// UTF-8 characters."
|
||||
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
|
||||
const ExemplarMaxLabelSetLength = 128
|
||||
|
||||
|
@ -49,7 +51,7 @@ func (e Exemplar) Equals(e2 Exemplar) bool {
|
|||
return e.Value == e2.Value
|
||||
}
|
||||
|
||||
// Sort first by timestamp, then value, then labels.
|
||||
// Compare first timestamps, then values, then labels.
|
||||
func Compare(a, b Exemplar) int {
|
||||
if a.Ts < b.Ts {
|
||||
return -1
|
||||
|
|
|
@ -315,7 +315,8 @@ func Compare(a, b Labels) int {
|
|||
return len(a) - len(b)
|
||||
}
|
||||
|
||||
// Copy labels from b on top of whatever was in ls previously, reusing memory or expanding if needed.
|
||||
// CopyFrom copies labels from b on top of whatever was in ls previously,
|
||||
// reusing memory or expanding if needed.
|
||||
func (ls *Labels) CopyFrom(b Labels) {
|
||||
(*ls) = append((*ls)[:0], b...)
|
||||
}
|
||||
|
@ -422,7 +423,7 @@ type ScratchBuilder struct {
|
|||
add Labels
|
||||
}
|
||||
|
||||
// Symbol-table is no-op, just for api parity with dedupelabels.
|
||||
// SymbolTable is no-op, just for api parity with dedupelabels.
|
||||
type SymbolTable struct{}
|
||||
|
||||
func NewSymbolTable() *SymbolTable { return nil }
|
||||
|
@ -458,7 +459,7 @@ func (b *ScratchBuilder) Add(name, value string) {
|
|||
b.add = append(b.add, Label{Name: name, Value: value})
|
||||
}
|
||||
|
||||
// Add a name/value pair, using []byte instead of string.
|
||||
// UnsafeAddBytes adds a name/value pair, using []byte instead of string.
|
||||
// The '-tags stringlabels' version of this function is unsafe, hence the name.
|
||||
// This version is safe - it copies the strings immediately - but we keep the same name so everything compiles.
|
||||
func (b *ScratchBuilder) UnsafeAddBytes(name, value []byte) {
|
||||
|
@ -475,14 +476,14 @@ func (b *ScratchBuilder) Assign(ls Labels) {
|
|||
b.add = append(b.add[:0], ls...) // Copy on top of our slice, so we don't retain the input slice.
|
||||
}
|
||||
|
||||
// Return the name/value pairs added so far as a Labels object.
|
||||
// Labels returns the name/value pairs added so far as a Labels object.
|
||||
// Note: if you want them sorted, call Sort() first.
|
||||
func (b *ScratchBuilder) Labels() Labels {
|
||||
// Copy the slice, so the next use of ScratchBuilder doesn't overwrite.
|
||||
return append([]Label{}, b.add...)
|
||||
}
|
||||
|
||||
// Write the newly-built Labels out to ls.
|
||||
// Overwrite the newly-built Labels out to ls.
|
||||
// Callers must ensure that there are no other references to ls, or any strings fetched from it.
|
||||
func (b *ScratchBuilder) Overwrite(ls *Labels) {
|
||||
*ls = append((*ls)[:0], b.add...)
|
||||
|
|
|
@ -106,8 +106,8 @@ const (
|
|||
EntryInvalid Entry = -1
|
||||
EntryType Entry = 0
|
||||
EntryHelp Entry = 1
|
||||
EntrySeries Entry = 2 // A series with a simple float64 as value.
|
||||
EntrySeries Entry = 2 // EntrySeries marks a series with a simple float64 as value.
|
||||
EntryComment Entry = 3
|
||||
EntryUnit Entry = 4
|
||||
EntryHistogram Entry = 5 // A series with a native histogram as a value.
|
||||
EntryHistogram Entry = 5 // EntryHistogram marks a series with a native histogram as a value.
|
||||
)
|
||||
|
|
|
@ -573,7 +573,7 @@ func (ng *Engine) validateOpts(expr parser.Expr) error {
|
|||
return validationErr
|
||||
}
|
||||
|
||||
// NewTestQuery: inject special behaviour into Query for testing.
|
||||
// NewTestQuery injects special behaviour into Query for testing.
|
||||
func (ng *Engine) NewTestQuery(f func(context.Context) error) Query {
|
||||
qry := &query{
|
||||
q: "test statement",
|
||||
|
@ -3531,14 +3531,14 @@ func makeInt64Pointer(val int64) *int64 {
|
|||
return valp
|
||||
}
|
||||
|
||||
// Add RatioSampler interface to allow unit-testing (previously: Randomizer).
|
||||
// RatioSampler allows unit-testing (previously: Randomizer).
|
||||
type RatioSampler interface {
|
||||
// Return this sample "offset" between [0.0, 1.0]
|
||||
sampleOffset(ts int64, sample *Sample) float64
|
||||
AddRatioSample(r float64, sample *Sample) bool
|
||||
}
|
||||
|
||||
// Use Hash(labels.String()) / maxUint64 as a "deterministic"
|
||||
// HashRatioSampler uses Hash(labels.String()) / maxUint64 as a "deterministic"
|
||||
// value in [0.0, 1.0].
|
||||
type HashRatioSampler struct{}
|
||||
|
||||
|
|
|
@ -352,8 +352,7 @@ func (f inspector) Visit(node Node, path []Node) (Visitor, error) {
|
|||
// f(node, path); node must not be nil. If f returns a nil error, Inspect invokes f
|
||||
// for all the non-nil children of node, recursively.
|
||||
func Inspect(node Node, f inspector) {
|
||||
//nolint: errcheck
|
||||
Walk(f, node, nil)
|
||||
Walk(f, node, nil) //nolint:errcheck
|
||||
}
|
||||
|
||||
// Children returns a list of all child nodes of a syntax tree node.
|
||||
|
@ -419,7 +418,7 @@ func mergeRanges(first, last Node) posrange.PositionRange {
|
|||
}
|
||||
}
|
||||
|
||||
// Item implements the Node interface.
|
||||
// PositionRange implements the Node interface.
|
||||
// This makes it possible to call mergeRanges on them.
|
||||
func (i *Item) PositionRange() posrange.PositionRange {
|
||||
return posrange.PositionRange{
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
// Write a MetricFamily into a protobuf.
|
||||
// MetricFamilyToProtobuf writes a MetricFamily into a protobuf.
|
||||
// This function is intended for testing scraping by providing protobuf serialized input.
|
||||
func MetricFamilyToProtobuf(metricFamily *dto.MetricFamily) ([]byte, error) {
|
||||
buffer := &bytes.Buffer{}
|
||||
|
@ -34,7 +34,7 @@ func MetricFamilyToProtobuf(metricFamily *dto.MetricFamily) ([]byte, error) {
|
|||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
// Append a MetricFamily protobuf representation to a buffer.
|
||||
// AddMetricFamilyToProtobuf appends a MetricFamily protobuf representation to a buffer.
|
||||
// This function is intended for testing scraping by providing protobuf serialized input.
|
||||
func AddMetricFamilyToProtobuf(buffer *bytes.Buffer, metricFamily *dto.MetricFamily) error {
|
||||
protoBuf, err := proto.Marshal(metricFamily)
|
||||
|
|
|
@ -36,4 +36,4 @@ jobs:
|
|||
uses: golangci/golangci-lint-action@aaa42aa0628b4ae2578232a66b541047968fac86 # v6.1.0
|
||||
with:
|
||||
args: --verbose
|
||||
version: v1.60.1
|
||||
version: v1.60.2
|
||||
|
|
|
@ -228,9 +228,9 @@ type LabelHints struct {
|
|||
Limit int
|
||||
}
|
||||
|
||||
// TODO(bwplotka): Move to promql/engine_test.go?
|
||||
// QueryableFunc is an adapter to allow the use of ordinary functions as
|
||||
// Queryables. It follows the idea of http.HandlerFunc.
|
||||
// TODO(bwplotka): Move to promql/engine_test.go?
|
||||
type QueryableFunc func(mint, maxt int64) (Querier, error)
|
||||
|
||||
// Querier calls f() with the given parameters.
|
||||
|
|
|
@ -31,13 +31,15 @@ import (
|
|||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Clouds.
|
||||
const (
|
||||
// Clouds.
|
||||
AzureChina = "AzureChina"
|
||||
AzureGovernment = "AzureGovernment"
|
||||
AzurePublic = "AzurePublic"
|
||||
)
|
||||
|
||||
// Audiences.
|
||||
// Audiences.
|
||||
const (
|
||||
IngestionChinaAudience = "https://monitor.azure.cn//.default"
|
||||
IngestionGovernmentAudience = "https://monitor.azure.us//.default"
|
||||
IngestionPublicAudience = "https://monitor.azure.com//.default"
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
const defaultZeroThreshold = 1e-128
|
||||
|
@ -33,13 +34,15 @@ const defaultZeroThreshold = 1e-128
|
|||
// addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series
|
||||
// as native histogram samples.
|
||||
func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, promName string) error {
|
||||
resource pcommon.Resource, settings Settings, promName string) (annotations.Annotations, error) {
|
||||
var annots annotations.Annotations
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
pt := dataPoints.At(x)
|
||||
|
||||
histogram, err := exponentialToNativeHistogram(pt)
|
||||
histogram, ws, err := exponentialToNativeHistogram(pt)
|
||||
annots.Merge(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
return annots, err
|
||||
}
|
||||
|
||||
lbls := createAttributes(
|
||||
|
@ -58,15 +61,16 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr
|
|||
ts.Exemplars = append(ts.Exemplars, exemplars...)
|
||||
}
|
||||
|
||||
return nil
|
||||
return annots, nil
|
||||
}
|
||||
|
||||
// exponentialToNativeHistogram translates OTel Exponential Histogram data point
|
||||
// to Prometheus Native Histogram.
|
||||
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) {
|
||||
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) {
|
||||
var annots annotations.Annotations
|
||||
scale := p.Scale()
|
||||
if scale < -4 {
|
||||
return prompb.Histogram{},
|
||||
return prompb.Histogram{}, annots,
|
||||
fmt.Errorf("cannot convert exponential to native histogram."+
|
||||
" Scale must be >= -4, was %d", scale)
|
||||
}
|
||||
|
@ -114,8 +118,11 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
|
|||
h.Sum = p.Sum()
|
||||
}
|
||||
h.Count = &prompb.Histogram_CountInt{CountInt: p.Count()}
|
||||
if p.Count() == 0 && h.Sum != 0 {
|
||||
annots.Add(fmt.Errorf("exponential histogram data point has zero count, but non-zero sum: %f", h.Sum))
|
||||
}
|
||||
}
|
||||
return h, nil
|
||||
return h, annots, nil
|
||||
}
|
||||
|
||||
// convertBucketsLayout translates OTel Exponential Histogram dense buckets
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
type Settings struct {
|
||||
|
@ -53,7 +54,7 @@ func NewPrometheusConverter() *PrometheusConverter {
|
|||
}
|
||||
|
||||
// FromMetrics converts pmetric.Metrics to Prometheus remote write format.
|
||||
func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (errs error) {
|
||||
func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) {
|
||||
resourceMetricsSlice := md.ResourceMetrics()
|
||||
for i := 0; i < resourceMetricsSlice.Len(); i++ {
|
||||
resourceMetrics := resourceMetricsSlice.At(i)
|
||||
|
@ -107,12 +108,14 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings)
|
|||
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
|
||||
break
|
||||
}
|
||||
errs = multierr.Append(errs, c.addExponentialHistogramDataPoints(
|
||||
ws, err := c.addExponentialHistogramDataPoints(
|
||||
dataPoints,
|
||||
resource,
|
||||
settings,
|
||||
promName,
|
||||
))
|
||||
)
|
||||
annots.Merge(ws)
|
||||
errs = multierr.Append(errs, err)
|
||||
case pmetric.MetricTypeSummary:
|
||||
dataPoints := metric.Summary().DataPoints()
|
||||
if dataPoints.Len() == 0 {
|
||||
|
@ -128,7 +131,7 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings)
|
|||
addResourceTargetInfo(resource, settings, mostRecentTimestamp, c)
|
||||
}
|
||||
|
||||
return
|
||||
return annots, errs
|
||||
}
|
||||
|
||||
func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool {
|
||||
|
|
|
@ -27,6 +27,41 @@ import (
|
|||
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
|
||||
)
|
||||
|
||||
func TestFromMetrics(t *testing.T) {
|
||||
t.Run("exponential histogram warnings for zero count and non-zero sum", func(t *testing.T) {
|
||||
request := pmetricotlp.NewExportRequest()
|
||||
rm := request.Metrics().ResourceMetrics().AppendEmpty()
|
||||
generateAttributes(rm.Resource().Attributes(), "resource", 10)
|
||||
|
||||
metrics := rm.ScopeMetrics().AppendEmpty().Metrics()
|
||||
ts := pcommon.NewTimestampFromTime(time.Now())
|
||||
|
||||
for i := 1; i <= 10; i++ {
|
||||
m := metrics.AppendEmpty()
|
||||
m.SetEmptyExponentialHistogram()
|
||||
m.SetName(fmt.Sprintf("histogram-%d", i))
|
||||
m.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
|
||||
h := m.ExponentialHistogram().DataPoints().AppendEmpty()
|
||||
h.SetTimestamp(ts)
|
||||
|
||||
h.SetCount(0)
|
||||
h.SetSum(155)
|
||||
|
||||
generateAttributes(h.Attributes(), "series", 10)
|
||||
}
|
||||
|
||||
converter := NewPrometheusConverter()
|
||||
annots, err := converter.FromMetrics(request.Metrics(), Settings{})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, annots)
|
||||
ws, infos := annots.AsStrings("", 0, 0)
|
||||
require.Empty(t, infos)
|
||||
require.Equal(t, []string{
|
||||
"exponential histogram data point has zero count, but non-zero sum: 155.000000",
|
||||
}, ws)
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
|
||||
for _, resourceAttributeCount := range []int{0, 5, 50} {
|
||||
b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) {
|
||||
|
@ -49,7 +84,9 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
|
|||
|
||||
for i := 0; i < b.N; i++ {
|
||||
converter := NewPrometheusConverter()
|
||||
require.NoError(b, converter.FromMetrics(payload.Metrics(), Settings{}))
|
||||
annots, err := converter.FromMetrics(payload.Metrics(), Settings{})
|
||||
require.NoError(b, err)
|
||||
require.Empty(b, annots)
|
||||
require.NotNil(b, converter.TimeSeries())
|
||||
}
|
||||
})
|
||||
|
|
|
@ -930,7 +930,7 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.
|
|||
}
|
||||
|
||||
func createSeriesMetadata(series []record.RefSeries) []record.RefMetadata {
|
||||
metas := make([]record.RefMetadata, len(series))
|
||||
metas := make([]record.RefMetadata, 0, len(series))
|
||||
|
||||
for _, s := range series {
|
||||
metas = append(metas, record.RefMetadata{
|
||||
|
|
|
@ -502,12 +502,17 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
otlpCfg := h.configFunc().OTLPConfig
|
||||
|
||||
converter := otlptranslator.NewPrometheusConverter()
|
||||
if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{
|
||||
annots, err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{
|
||||
AddMetricSuffixes: true,
|
||||
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
|
||||
}); err != nil {
|
||||
})
|
||||
if err != nil {
|
||||
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
|
||||
}
|
||||
ws, _ := annots.AsStrings("", 0, 0)
|
||||
if len(ws) > 0 {
|
||||
level.Warn(h.logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
|
||||
}
|
||||
|
||||
err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{
|
||||
Timeseries: converter.TimeSeries(),
|
||||
|
|
|
@ -166,7 +166,7 @@ func NewTemplateExpander(
|
|||
return html_template.HTML(text)
|
||||
},
|
||||
"match": regexp.MatchString,
|
||||
"title": strings.Title,
|
||||
"title": strings.Title, //nolint:staticcheck // TODO(beorn7): Need to come up with a replacement using the cases package.
|
||||
"toUpper": strings.ToUpper,
|
||||
"toLower": strings.ToLower,
|
||||
"graphLink": strutil.GraphLinkForExpression,
|
||||
|
|
|
@ -133,6 +133,9 @@ type Meta struct {
|
|||
// Time range the data covers.
|
||||
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
|
||||
MinTime, MaxTime int64
|
||||
|
||||
// Flag to indicate that this meta needs merge with OOO data.
|
||||
MergeOOO bool
|
||||
}
|
||||
|
||||
// ChunkFromSamples requires all samples to have the same type.
|
||||
|
|
|
@ -191,7 +191,7 @@ func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 {
|
|||
// ChunkDiskMapper is for writing the Head block chunks to disk
|
||||
// and access chunks via mmapped files.
|
||||
type ChunkDiskMapper struct {
|
||||
/// Writer.
|
||||
// Writer.
|
||||
dir *os.File
|
||||
writeBufferSize int
|
||||
|
||||
|
@ -210,7 +210,7 @@ type ChunkDiskMapper struct {
|
|||
crc32 hash.Hash
|
||||
writePathMtx sync.Mutex
|
||||
|
||||
/// Reader.
|
||||
// Reader.
|
||||
// The int key in the map is the file number on the disk.
|
||||
mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index.
|
||||
closers map[int]io.Closer // Closers for resources behind the byte slices.
|
||||
|
|
|
@ -49,7 +49,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// Default duration of a block in milliseconds.
|
||||
// DefaultBlockDuration in milliseconds.
|
||||
DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond)
|
||||
|
||||
// Block dir suffixes to make deletion and creation operations atomic.
|
||||
|
|
183
tsdb/db_test.go
183
tsdb/db_test.go
|
@ -5463,7 +5463,6 @@ func testQuerierOOOQuery(t *testing.T,
|
|||
sampleFunc func(ts int64) chunks.Sample,
|
||||
) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
|
||||
|
||||
series1 := labels.FromStrings("foo", "bar1")
|
||||
|
@ -5487,6 +5486,7 @@ func testQuerierOOOQuery(t *testing.T,
|
|||
totalAppended++
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
require.Positive(t, totalAppended, 0) // Sanity check that filter is not too zealous.
|
||||
return expSamples, totalAppended
|
||||
}
|
||||
|
||||
|
@ -5500,12 +5500,14 @@ func testQuerierOOOQuery(t *testing.T,
|
|||
|
||||
tests := []struct {
|
||||
name string
|
||||
oooCap int64
|
||||
queryMinT int64
|
||||
queryMaxT int64
|
||||
batches []sampleBatch
|
||||
}{
|
||||
{
|
||||
name: "query interval covering ooomint and inordermaxt returns all ingested samples",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
|
@ -5524,6 +5526,7 @@ func testQuerierOOOQuery(t *testing.T,
|
|||
},
|
||||
{
|
||||
name: "partial query interval returns only samples within interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(20),
|
||||
queryMaxT: minutes(180),
|
||||
batches: []sampleBatch{
|
||||
|
@ -5566,7 +5569,8 @@ func testQuerierOOOQuery(t *testing.T,
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo samples returns all ingested samples",
|
||||
name: "query overlapping inorder and ooo samples returns all ingested samples at the end of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
|
@ -5577,16 +5581,89 @@ func testQuerierOOOQuery(t *testing.T,
|
|||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(180 - opts.OutOfOrderCapMax/2), // Make sure to fit into the OOO head.
|
||||
minT: minutes(170),
|
||||
maxT: minutes(180),
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo in-memory samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(110),
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query inorder contain ooo mmaped samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 5,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(101),
|
||||
maxT: minutes(101 + (5-1)*2), // Append samples to fit in a single mmmaped OOO chunk and fit inside the first in-order mmaped chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
{
|
||||
minT: minutes(191),
|
||||
maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo mmaped samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(101),
|
||||
maxT: minutes(101 + (30-1)*2), // Append samples to fit in a single mmmaped OOO chunk and overlap the first in-order mmaped chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
{
|
||||
minT: minutes(191),
|
||||
maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||
opts.OutOfOrderCapMax = tc.oooCap
|
||||
db := openTestDB(t, opts, nil)
|
||||
db.DisableCompactions()
|
||||
db.EnableNativeHistograms()
|
||||
|
@ -5600,7 +5677,6 @@ func testQuerierOOOQuery(t *testing.T,
|
|||
|
||||
for _, batch := range tc.batches {
|
||||
expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter, batch.counterReset)
|
||||
require.Positive(t, appendedCount) // Sanity check that filter is not too zealous.
|
||||
if batch.isOOO {
|
||||
oooSamples += appendedCount
|
||||
}
|
||||
|
@ -5698,7 +5774,7 @@ func testChunkQuerierOOOQuery(t *testing.T,
|
|||
app := db.Appender(context.Background())
|
||||
totalAppended := 0
|
||||
for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() {
|
||||
if !filter(m) {
|
||||
if !filter(m / time.Minute.Milliseconds()) {
|
||||
continue
|
||||
}
|
||||
_, err := appendFunc(app, m, counterReset)
|
||||
|
@ -5709,6 +5785,7 @@ func testChunkQuerierOOOQuery(t *testing.T,
|
|||
totalAppended++
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
require.Positive(t, totalAppended) // Sanity check that filter is not too zealous.
|
||||
return expSamples, totalAppended
|
||||
}
|
||||
|
||||
|
@ -5722,12 +5799,14 @@ func testChunkQuerierOOOQuery(t *testing.T,
|
|||
|
||||
tests := []struct {
|
||||
name string
|
||||
oooCap int64
|
||||
queryMinT int64
|
||||
queryMaxT int64
|
||||
batches []sampleBatch
|
||||
}{
|
||||
{
|
||||
name: "query interval covering ooomint and inordermaxt returns all ingested samples",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
|
@ -5746,6 +5825,7 @@ func testChunkQuerierOOOQuery(t *testing.T,
|
|||
},
|
||||
{
|
||||
name: "partial query interval returns only samples within interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(20),
|
||||
queryMaxT: minutes(180),
|
||||
batches: []sampleBatch{
|
||||
|
@ -5787,9 +5867,102 @@ func testChunkQuerierOOOQuery(t *testing.T,
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo samples returns all ingested samples at the end of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(170),
|
||||
maxT: minutes(180),
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo in-memory samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(110),
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query inorder contain ooo mmaped samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 5,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(101),
|
||||
maxT: minutes(101 + (5-1)*2), // Append samples to fit in a single mmmaped OOO chunk and fit inside the first in-order mmaped chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
{
|
||||
minT: minutes(191),
|
||||
maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo mmaped samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(101),
|
||||
maxT: minutes(101 + (30-1)*2), // Append samples to fit in a single mmmaped OOO chunk and overlap the first in-order mmaped chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
{
|
||||
minT: minutes(191),
|
||||
maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||
opts.OutOfOrderCapMax = tc.oooCap
|
||||
db := openTestDB(t, opts, nil)
|
||||
db.DisableCompactions()
|
||||
db.EnableNativeHistograms()
|
||||
|
|
|
@ -201,8 +201,8 @@ func (d *Decbuf) UvarintStr() string {
|
|||
return string(d.UvarintBytes())
|
||||
}
|
||||
|
||||
// The return value becomes invalid if the byte slice goes away.
|
||||
// Compared to UvarintStr, this avoid allocations.
|
||||
// UvarintBytes returns invalid values if the byte slice goes away.
|
||||
// Compared to UvarintStr, it avoid allocations.
|
||||
func (d *Decbuf) UvarintBytes() []byte {
|
||||
l := d.Uvarint64()
|
||||
if d.E != nil {
|
||||
|
|
|
@ -26,7 +26,7 @@ func (s *memSeries) labels() labels.Labels {
|
|||
return s.lset
|
||||
}
|
||||
|
||||
// No-op when not using dedupelabels.
|
||||
// RebuildSymbolTable is a no-op when not using dedupelabels.
|
||||
func (h *Head) RebuildSymbolTable(logger log.Logger) *labels.SymbolTable {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -196,8 +196,9 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
|
|||
return toc, d.Err()
|
||||
}
|
||||
|
||||
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
|
||||
// It uses the given encoder to encode each postings list.
|
||||
// NewWriterWithEncoder returns a new Writer to the given filename. It
|
||||
// serializes data in format version 2. It uses the given encoder to encode each
|
||||
// postings list.
|
||||
func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) {
|
||||
dir := filepath.Dir(fn)
|
||||
|
||||
|
|
|
@ -139,10 +139,11 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
|
||||
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
||||
tmpChks = append(tmpChks, chunks.Meta{
|
||||
MinTime: minT,
|
||||
MaxTime: maxT,
|
||||
Ref: ref,
|
||||
Chunk: chunk,
|
||||
MinTime: minT,
|
||||
MaxTime: maxT,
|
||||
Ref: ref,
|
||||
Chunk: chunk,
|
||||
MergeOOO: true,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -210,6 +211,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
if c.Chunk != nil {
|
||||
(*chks)[len(*chks)-1].Chunk = c.Chunk
|
||||
}
|
||||
(*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,8 +293,8 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader,
|
|||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO && meta.Chunk == nil { // meta.Chunk can have a copy of OOO head samples, even on non-OOO chunk ID.
|
||||
sid, _, _ := unpackHeadChunkRef(meta.Ref)
|
||||
if !meta.MergeOOO {
|
||||
return cr.cr.ChunkOrIterable(meta)
|
||||
}
|
||||
|
||||
|
@ -313,11 +315,10 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu
|
|||
return nil, mc, err
|
||||
}
|
||||
|
||||
// ChunkOrIterableWithCopy: implements ChunkReaderWithCopy. The special Copy behaviour
|
||||
// is only implemented for the in-order head chunk.
|
||||
// ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy
|
||||
// behaviour is only implemented for the in-order head chunk.
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO {
|
||||
if !meta.MergeOOO {
|
||||
return cr.cr.ChunkOrIterableWithCopy(meta)
|
||||
}
|
||||
chk, iter, err := cr.ChunkOrIterable(meta)
|
||||
|
|
|
@ -315,9 +315,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
var expChunks []chunks.Meta
|
||||
for _, e := range tc.expChunks {
|
||||
meta := chunks.Meta{
|
||||
Chunk: chunkenc.Chunk(nil),
|
||||
MinTime: e.mint,
|
||||
MaxTime: e.maxt,
|
||||
Chunk: chunkenc.Chunk(nil),
|
||||
MinTime: e.mint,
|
||||
MaxTime: e.maxt,
|
||||
MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head.
|
||||
}
|
||||
|
||||
// Ref to whatever Ref the chunk has, that we refer to by ID
|
||||
|
@ -494,7 +495,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
|
||||
defer cr.Close()
|
||||
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
|
||||
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
|
||||
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true,
|
||||
})
|
||||
require.Nil(t, iterable)
|
||||
require.Equal(t, err, fmt.Errorf("not found"))
|
||||
|
|
|
@ -972,7 +972,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
|
|||
// Check if the encoding has changed (i.e. we need to create a new
|
||||
// chunk as chunks can't have multiple encoding types).
|
||||
// For the first sample, the following condition will always be true as
|
||||
// ValNoneNone != ValFloat | ValHistogram | ValFloatHistogram.
|
||||
// ValNone != ValFloat | ValHistogram | ValFloatHistogram.
|
||||
if currentValueType != prevValueType {
|
||||
if prevValueType != chunkenc.ValNone {
|
||||
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
|
||||
|
|
|
@ -58,15 +58,16 @@ type WriteTo interface {
|
|||
StoreSeries([]record.RefSeries, int)
|
||||
StoreMetadata([]record.RefMetadata)
|
||||
|
||||
// Next two methods are intended for garbage-collection: first we call
|
||||
// UpdateSeriesSegment on all current series
|
||||
// UpdateSeriesSegment and SeriesReset are intended for
|
||||
// garbage-collection:
|
||||
// First we call UpdateSeriesSegment on all current series.
|
||||
UpdateSeriesSegment([]record.RefSeries, int)
|
||||
// Then SeriesReset is called to allow the deletion
|
||||
// of all series created in a segment lower than the argument.
|
||||
// Then SeriesReset is called to allow the deletion of all series
|
||||
// created in a segment lower than the argument.
|
||||
SeriesReset(int)
|
||||
}
|
||||
|
||||
// Used to notify the watcher that data has been written so that it can read.
|
||||
// WriteNotified notifies the watcher that data has been written so that it can read.
|
||||
type WriteNotified interface {
|
||||
Notify()
|
||||
}
|
||||
|
|
|
@ -38,8 +38,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB
|
||||
pageSize = 32 * 1024 // 32KB
|
||||
DefaultSegmentSize = 128 * 1024 * 1024 // DefaultSegmentSize is 128 MB.
|
||||
pageSize = 32 * 1024 // pageSize is 32KB.
|
||||
recordHeaderSize = 7
|
||||
WblDirName = "wbl"
|
||||
)
|
||||
|
|
|
@ -174,7 +174,7 @@ func NewInvalidQuantileWarning(q float64, pos posrange.PositionRange) error {
|
|||
}
|
||||
}
|
||||
|
||||
// NewInvalidQuantileWarning is used when the user specifies an invalid ratio
|
||||
// NewInvalidRatioWarning is used when the user specifies an invalid ratio
|
||||
// value, i.e. a float that is outside the range [-1, 1] or NaN.
|
||||
func NewInvalidRatioWarning(q, to float64, pos posrange.PositionRange) error {
|
||||
return annoErr{
|
||||
|
|
|
@ -23,13 +23,14 @@ import (
|
|||
"github.com/prometheus/prometheus/model/labels"
|
||||
)
|
||||
|
||||
// Replacement for require.Equal using go-cmp adapted for Prometheus data structures, instead of DeepEqual.
|
||||
// RequireEqual is a replacement for require.Equal using go-cmp adapted for
|
||||
// Prometheus data structures, instead of DeepEqual.
|
||||
func RequireEqual(t testing.TB, expected, actual interface{}, msgAndArgs ...interface{}) {
|
||||
t.Helper()
|
||||
RequireEqualWithOptions(t, expected, actual, nil, msgAndArgs...)
|
||||
}
|
||||
|
||||
// As RequireEqual but allows extra cmp.Options.
|
||||
// RequireEqualWithOptions works like RequireEqual but allows extra cmp.Options.
|
||||
func RequireEqualWithOptions(t testing.TB, expected, actual interface{}, extra []cmp.Option, msgAndArgs ...interface{}) {
|
||||
t.Helper()
|
||||
options := append([]cmp.Option{cmp.Comparer(labels.Equal)}, extra...)
|
||||
|
|
Loading…
Reference in a new issue