From 88cac6fb49958eb83faf8eb166eeec62fbee49a0 Mon Sep 17 00:00:00 2001 From: machine424 Date: Wed, 14 Aug 2024 19:06:49 +0200 Subject: [PATCH 1/8] docs(configuration.md): clarify the explanations about some temp labels and format for better visibility. Signed-off-by: machine424 --- docs/configuration/configuration.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 313a7f2f37..1c43f750b1 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -3265,12 +3265,16 @@ Initially, aside from the configured per-target labels, a target's `job` label is set to the `job_name` value of the respective scrape configuration. The `__address__` label is set to the `:` address of the target. After relabeling, the `instance` label is set to the value of `__address__` by default if -it was not set during relabeling. The `__scheme__` and `__metrics_path__` labels -are set to the scheme and metrics path of the target respectively. The `__param_` -label is set to the value of the first passed URL parameter called ``. +it was not set during relabeling. + +The `__scheme__` and `__metrics_path__` labels +are set to the scheme and metrics path of the target respectively, as specified in `scrape_config`. + +The `__param_` +label is set to the value of the first passed URL parameter called ``, as defined in `scrape_config`. The `__scrape_interval__` and `__scrape_timeout__` labels are set to the target's -interval and timeout. +interval and timeout, as specified in `scrape_config`. Additional labels prefixed with `__meta_` may be available during the relabeling phase. They are set by the service discovery mechanism that provided From 21106611217feefa32bb767f938be73c946700e5 Mon Sep 17 00:00:00 2001 From: cuishuang Date: Wed, 21 Aug 2024 18:15:25 +0800 Subject: [PATCH 2/8] fix: fix slice init length Signed-off-by: cuishuang --- storage/remote/queue_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 1c06173a59..032a1a92f7 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -930,7 +930,7 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record. } func createSeriesMetadata(series []record.RefSeries) []record.RefMetadata { - metas := make([]record.RefMetadata, len(series)) + metas := make([]record.RefMetadata, 0, len(series)) for _, s := range series { metas = append(metas, record.RefMetadata{ From b50c5d42feb66dfb74d70fcdc0eac9d1a15006e2 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 21 Aug 2024 15:22:38 +0200 Subject: [PATCH 3/8] OTLP receiver: Warn when encountering invalid exponential histograms Signed-off-by: Arve Knudsen --- CHANGELOG.md | 1 + .../prometheusremotewrite/histograms.go | 21 ++++++---- .../prometheusremotewrite/metrics_to_prw.go | 11 ++++-- .../metrics_to_prw_test.go | 39 ++++++++++++++++++- storage/remote/write_handler.go | 9 ++++- 5 files changed, 67 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0036a4a762..6fa8410c98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## unreleased * [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200 +* [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706 * [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042 ## 2.54.0-rc.1 / 2024-08-05 diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index 73528019d8..ec93387fc6 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/util/annotations" ) const defaultZeroThreshold = 1e-128 @@ -33,13 +34,15 @@ const defaultZeroThreshold = 1e-128 // addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series // as native histogram samples. func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, - resource pcommon.Resource, settings Settings, promName string) error { + resource pcommon.Resource, settings Settings, promName string) (annotations.Annotations, error) { + var annots annotations.Annotations for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - histogram, err := exponentialToNativeHistogram(pt) + histogram, ws, err := exponentialToNativeHistogram(pt) + annots.Merge(ws) if err != nil { - return err + return annots, err } lbls := createAttributes( @@ -58,15 +61,16 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr ts.Exemplars = append(ts.Exemplars, exemplars...) } - return nil + return annots, nil } // exponentialToNativeHistogram translates OTel Exponential Histogram data point // to Prometheus Native Histogram. -func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) { +func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) { + var annots annotations.Annotations scale := p.Scale() if scale < -4 { - return prompb.Histogram{}, + return prompb.Histogram{}, annots, fmt.Errorf("cannot convert exponential to native histogram."+ " Scale must be >= -4, was %d", scale) } @@ -114,8 +118,11 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom h.Sum = p.Sum() } h.Count = &prompb.Histogram_CountInt{CountInt: p.Count()} + if p.Count() == 0 && h.Sum != 0 { + annots.Add(fmt.Errorf("exponential histogram data point has zero count, but non-zero sum: %f", h.Sum)) + } } - return h, nil + return h, annots, nil } // convertBucketsLayout translates OTel Exponential Histogram dense buckets diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index a3a7897232..9d76800809 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/prompb" prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + "github.com/prometheus/prometheus/util/annotations" ) type Settings struct { @@ -53,7 +54,7 @@ func NewPrometheusConverter() *PrometheusConverter { } // FromMetrics converts pmetric.Metrics to Prometheus remote write format. -func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (errs error) { +func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) { resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { resourceMetrics := resourceMetricsSlice.At(i) @@ -107,12 +108,14 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - errs = multierr.Append(errs, c.addExponentialHistogramDataPoints( + ws, err := c.addExponentialHistogramDataPoints( dataPoints, resource, settings, promName, - )) + ) + annots.Merge(ws) + errs = multierr.Append(errs, err) case pmetric.MetricTypeSummary: dataPoints := metric.Summary().DataPoints() if dataPoints.Len() == 0 { @@ -128,7 +131,7 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) addResourceTargetInfo(resource, settings, mostRecentTimestamp, c) } - return + return annots, errs } func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool { diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index 37ac677747..bdc1c9d0b2 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -27,6 +27,41 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" ) +func TestFromMetrics(t *testing.T) { + t.Run("exponential histogram warnings for zero count and non-zero sum", func(t *testing.T) { + request := pmetricotlp.NewExportRequest() + rm := request.Metrics().ResourceMetrics().AppendEmpty() + generateAttributes(rm.Resource().Attributes(), "resource", 10) + + metrics := rm.ScopeMetrics().AppendEmpty().Metrics() + ts := pcommon.NewTimestampFromTime(time.Now()) + + for i := 1; i <= 10; i++ { + m := metrics.AppendEmpty() + m.SetEmptyExponentialHistogram() + m.SetName(fmt.Sprintf("histogram-%d", i)) + m.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + h := m.ExponentialHistogram().DataPoints().AppendEmpty() + h.SetTimestamp(ts) + + h.SetCount(0) + h.SetSum(155) + + generateAttributes(h.Attributes(), "series", 10) + } + + converter := NewPrometheusConverter() + annots, err := converter.FromMetrics(request.Metrics(), Settings{}) + require.NoError(t, err) + require.NotEmpty(t, annots) + ws, infos := annots.AsStrings("", 0, 0) + require.Empty(t, infos) + require.Equal(t, []string{ + "exponential histogram data point has zero count, but non-zero sum: 155.000000", + }, ws) + }) +} + func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { for _, resourceAttributeCount := range []int{0, 5, 50} { b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) { @@ -49,7 +84,9 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { for i := 0; i < b.N; i++ { converter := NewPrometheusConverter() - require.NoError(b, converter.FromMetrics(payload.Metrics(), Settings{})) + annots, err := converter.FromMetrics(payload.Metrics(), Settings{}) + require.NoError(b, err) + require.Empty(b, annots) require.NotNil(b, converter.TimeSeries()) } }) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index aba79a561d..8ac7590465 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -502,12 +502,17 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { otlpCfg := h.configFunc().OTLPConfig converter := otlptranslator.NewPrometheusConverter() - if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{ + annots, err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{ AddMetricSuffixes: true, PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes, - }); err != nil { + }) + if err != nil { level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) } + ws, _ := annots.AsStrings("", 0, 0) + if len(ws) > 0 { + level.Warn(h.logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws) + } err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{ Timeseries: converter.TimeSeries(), From fbcd50f32c7a5f8f040cb4c9a4335f821a263f18 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 21 Aug 2024 18:55:28 +0200 Subject: [PATCH 4/8] Upgrade golangci-lint to v1.60.2 Signed-off-by: Arve Knudsen --- .github/workflows/ci.yml | 2 +- Makefile.common | 2 +- scripts/golangci-lint.yml | 2 +- template/template.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c3a1d68e98..c89c9507bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -186,7 +186,7 @@ jobs: with: args: --verbose # Make sure to sync this with Makefile.common and scripts/golangci-lint.yml. - version: v1.60.1 + version: v1.60.2 fuzzing: uses: ./.github/workflows/fuzzing.yml if: github.event_name == 'pull_request' diff --git a/Makefile.common b/Makefile.common index 2ecd5465c3..34d65bb56d 100644 --- a/Makefile.common +++ b/Makefile.common @@ -61,7 +61,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_ SKIP_GOLANGCI_LINT := GOLANGCI_LINT := GOLANGCI_LINT_OPTS ?= -GOLANGCI_LINT_VERSION ?= v1.60.1 +GOLANGCI_LINT_VERSION ?= v1.60.2 # golangci-lint only supports linux, darwin and windows platforms on i386/amd64/arm64. # windows isn't included here because of the path separator being different. ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin)) diff --git a/scripts/golangci-lint.yml b/scripts/golangci-lint.yml index fc0f9c6543..f4a7385bb5 100644 --- a/scripts/golangci-lint.yml +++ b/scripts/golangci-lint.yml @@ -36,4 +36,4 @@ jobs: uses: golangci/golangci-lint-action@aaa42aa0628b4ae2578232a66b541047968fac86 # v6.1.0 with: args: --verbose - version: v1.60.1 + version: v1.60.2 diff --git a/template/template.go b/template/template.go index 9ffed6ff61..c507dbe746 100644 --- a/template/template.go +++ b/template/template.go @@ -166,7 +166,7 @@ func NewTemplateExpander( return html_template.HTML(text) }, "match": regexp.MatchString, - "title": strings.Title, + "title": strings.Title, //nolint:staticcheck "toUpper": strings.ToUpper, "toLower": strings.ToLower, "graphLink": strutil.GraphLinkForExpression, From 0f760f63dd5137ad406e0bc9f08676898e04ac97 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 22 Aug 2024 13:59:36 +0200 Subject: [PATCH 5/8] lint: Revamp our linting rules, mostly around doc comments Several things done here: - Set `max-issues-per-linter` to 0 so that we actually see all linter warnings and not just 50 per linter. (As we also set `max-same-issues` to 0, I assume this was the intention from the beginning.) - Stop using the golangci-lint default excludes (by setting `exclude-use-default: false`. Those are too generous and don't match our style conventions. (I have re-added some of the excludes explicitly in this commit. See below.) - Re-add the `errcheck` exclusion we have used so far via the defaults. - Exclude the signature requirement `govet` has for `Seek` methods because we use non-standard `Seek` methods a lot. (But we keep other requirements, while the default excludes completely disabled the check for common method segnatures.) - Exclude warnings about missing doc comments on exported symbols. (We used to be pretty adamant about doc comments, but stopped that at some point in the past. By now, we have about 500 missing doc comments. We may consider reintroducing this check, but that's outside of the scope of this commit. The default excludes of golangci-lint essentially ignore doc comments completely.) - By stop using the default excludes, we now get warnings back on malformed doc comments. That's the most impactful change in this commit. It does not enforce doc comments (again), but _if_ there is a doc comment, it has to have the recommended form. (Most of the changes in this commit are fixing this form.) - Improve wording/spelling of some comments in .golangci.yml, and remove an outdated comment. - Leave `package-comments` inactive, but add a TODO asking if we should change that. - Add a new sub-linter `comment-spacings` (and fix corresponding comments), which avoids missing spaces after the leading `//`. Signed-off-by: beorn7 --- .golangci.yml | 30 +++++++++++++++++++++++----- cmd/promtool/main.go | 2 +- cmd/promtool/metrics.go | 2 +- discovery/discoverer_metrics_noop.go | 2 +- discovery/discovery.go | 22 ++++++++++---------- discovery/manager.go | 2 +- discovery/metrics_refresh.go | 2 +- discovery/util.go | 6 +++--- model/exemplar/exemplar.go | 6 ++++-- model/labels/labels.go | 11 +++++----- model/textparse/interface.go | 4 ++-- promql/engine.go | 6 +++--- promql/parser/ast.go | 5 ++--- scrape/clientprotobuf.go | 4 ++-- storage/interface.go | 2 +- storage/remote/azuread/azuread.go | 6 ++++-- template/template.go | 2 +- tsdb/chunks/head_chunks.go | 4 ++-- tsdb/db.go | 2 +- tsdb/encoding/encoding.go | 4 ++-- tsdb/head_other.go | 2 +- tsdb/index/index.go | 5 +++-- tsdb/ooo_head_read.go | 4 ++-- tsdb/wlog/watcher.go | 11 +++++----- tsdb/wlog/wlog.go | 4 ++-- util/annotations/annotations.go | 2 +- util/testutil/cmp.go | 5 +++-- 27 files changed, 93 insertions(+), 64 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index e924fe3d5b..303cd33d8b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -25,15 +25,34 @@ linters: - loggercheck issues: + max-issues-per-linter: 0 max-same-issues: 0 + # The default exclusions are too aggressive. For one, they + # essentially disable any linting on doc comments. We disable + # default exclusions here and add exclusions fitting our codebase + # further down. + exclude-use-default: false exclude-files: # Skip autogenerated files. - ^.*\.(pb|y)\.go$ exclude-dirs: - # Copied it from a different source + # Copied it from a different source. - storage/remote/otlptranslator/prometheusremotewrite - storage/remote/otlptranslator/prometheus exclude-rules: + - linters: + - errcheck + # Taken from the default exclusions (that are otherwise disabled above). + text: Error return value of .((os\.)?std(out|err)\..*|.*Close|.*Flush|os\.Remove(All)?|.*print(f|ln)?|os\.(Un)?Setenv). is not checked + - linters: + - govet + # We use many Seek methods that do not follow the usual pattern. + text: "stdmethods: method Seek.* should have signature Seek" + - linters: + - revive + # We have stopped at some point to write doc comments on exported symbols. + # TODO(beorn7): Maybe we should enforce this again? There are ~500 offenders right now. + text: exported (.+) should have comment( \(or a comment on this block\))? or be unexported - linters: - gocritic text: "appendAssign" @@ -94,15 +113,14 @@ linters-settings: errorf: false revive: # By default, revive will enable only the linting rules that are named in the configuration file. - # So, it's needed to explicitly set in configuration all required rules. - # The following configuration enables all the rules from the defaults.toml - # https://github.com/mgechev/revive/blob/master/defaults.toml + # So, it's needed to explicitly enable all required rules here. rules: # https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md - name: blank-imports + - name: comment-spacings - name: context-as-argument arguments: - # allow functions with test or bench signatures + # Allow functions with test or bench signatures. - allowTypesBefore: "*testing.T,testing.TB" - name: context-keys-type - name: dot-imports @@ -118,6 +136,8 @@ linters-settings: - name: increment-decrement - name: indent-error-flow - name: package-comments + # TODO(beorn7): Currently, we have a lot of missing package doc comments. Maybe we should have them. + disabled: true - name: range - name: receiver-naming - name: redefines-builtin-id diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 1c8e1dd1c8..4d033c3c09 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -471,7 +471,7 @@ func (ls lintConfig) lintDuplicateRules() bool { return ls.all || ls.duplicateRules } -// Check server status - healthy & ready. +// CheckServerStatus - healthy & ready. func CheckServerStatus(serverURL *url.URL, checkEndpoint string, roundTripper http.RoundTripper) error { if serverURL.Scheme == "" { serverURL.Scheme = "http" diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 6d162f459a..4c91d1d6fe 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -31,7 +31,7 @@ import ( "github.com/prometheus/prometheus/util/fmtutil" ) -// Push metrics to a prometheus remote write (for testing purpose only). +// PushMetrics to a prometheus remote write (for testing purpose only). func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int { addressURL, err := url.Parse(url.String()) if err != nil { diff --git a/discovery/discoverer_metrics_noop.go b/discovery/discoverer_metrics_noop.go index 638317ace1..4321204b6c 100644 --- a/discovery/discoverer_metrics_noop.go +++ b/discovery/discoverer_metrics_noop.go @@ -13,7 +13,7 @@ package discovery -// Create a dummy metrics struct, because this SD doesn't have any metrics. +// NoopDiscovererMetrics creates a dummy metrics struct, because this SD doesn't have any metrics. type NoopDiscovererMetrics struct{} var _ DiscovererMetrics = (*NoopDiscovererMetrics)(nil) diff --git a/discovery/discovery.go b/discovery/discovery.go index a5826f8176..a91faf6c86 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -39,7 +39,7 @@ type Discoverer interface { Run(ctx context.Context, up chan<- []*targetgroup.Group) } -// Internal metrics of service discovery mechanisms. +// DiscovererMetrics are internal metrics of service discovery mechanisms. type DiscovererMetrics interface { Register() error Unregister() @@ -56,7 +56,7 @@ type DiscovererOptions struct { HTTPClientOptions []config.HTTPClientOption } -// Metrics used by the "refresh" package. +// RefreshMetrics are used by the "refresh" package. // We define them here in the "discovery" package in order to avoid a cyclic dependency between // "discovery" and "refresh". type RefreshMetrics struct { @@ -64,17 +64,18 @@ type RefreshMetrics struct { Duration prometheus.Observer } -// Instantiate the metrics used by the "refresh" package. +// RefreshMetricsInstantiator instantiates the metrics used by the "refresh" package. type RefreshMetricsInstantiator interface { Instantiate(mech string) *RefreshMetrics } -// An interface for registering, unregistering, and instantiating metrics for the "refresh" package. -// Refresh metrics are registered and unregistered outside of the service discovery mechanism. -// This is so that the same metrics can be reused across different service discovery mechanisms. -// To manage refresh metrics inside the SD mechanism, we'd need to use const labels which are -// specific to that SD. However, doing so would also expose too many unused metrics on -// the Prometheus /metrics endpoint. +// RefreshMetricsManager is an interface for registering, unregistering, and +// instantiating metrics for the "refresh" package. Refresh metrics are +// registered and unregistered outside of the service discovery mechanism. This +// is so that the same metrics can be reused across different service discovery +// mechanisms. To manage refresh metrics inside the SD mechanism, we'd need to +// use const labels which are specific to that SD. However, doing so would also +// expose too many unused metrics on the Prometheus /metrics endpoint. type RefreshMetricsManager interface { DiscovererMetrics RefreshMetricsInstantiator @@ -145,7 +146,8 @@ func (c StaticConfig) NewDiscoverer(DiscovererOptions) (Discoverer, error) { return staticDiscoverer(c), nil } -// No metrics are needed for this service discovery mechanism. +// NewDiscovererMetrics returns NoopDiscovererMetrics because no metrics are +// needed for this service discovery mechanism. func (c StaticConfig) NewDiscovererMetrics(prometheus.Registerer, RefreshMetricsInstantiator) DiscovererMetrics { return &NoopDiscovererMetrics{} } diff --git a/discovery/manager.go b/discovery/manager.go index 897d7d151c..48bea85bb7 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -64,7 +64,7 @@ func (p *Provider) Config() interface{} { return p.config } -// Registers the metrics needed for SD mechanisms. +// CreateAndRegisterSDMetrics registers the metrics needed for SD mechanisms. // Does not register the metrics for the Discovery Manager. // TODO(ptodev): Add ability to unregister the metrics? func CreateAndRegisterSDMetrics(reg prometheus.Registerer) (map[string]DiscovererMetrics, error) { diff --git a/discovery/metrics_refresh.go b/discovery/metrics_refresh.go index d621165ced..ef49e591a3 100644 --- a/discovery/metrics_refresh.go +++ b/discovery/metrics_refresh.go @@ -17,7 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// Metric vectors for the "refresh" package. +// RefreshMetricsVecs are metric vectors for the "refresh" package. // We define them here in the "discovery" package in order to avoid a cyclic dependency between // "discovery" and "refresh". type RefreshMetricsVecs struct { diff --git a/discovery/util.go b/discovery/util.go index 83cc640dd9..4e2a088518 100644 --- a/discovery/util.go +++ b/discovery/util.go @@ -19,8 +19,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// A utility to be used by implementations of discovery.Discoverer -// which need to manage the lifetime of their metrics. +// MetricRegisterer is used by implementations of discovery.Discoverer that need +// to manage the lifetime of their metrics. type MetricRegisterer interface { RegisterMetrics() error UnregisterMetrics() @@ -34,7 +34,7 @@ type metricRegistererImpl struct { var _ MetricRegisterer = &metricRegistererImpl{} -// Creates an instance of a MetricRegisterer. +// NewMetricRegisterer creates an instance of a MetricRegisterer. // Typically called inside the implementation of the NewDiscoverer() method. func NewMetricRegisterer(reg prometheus.Registerer, metrics []prometheus.Collector) MetricRegisterer { return &metricRegistererImpl{ diff --git a/model/exemplar/exemplar.go b/model/exemplar/exemplar.go index 08f55374ef..2c28b17257 100644 --- a/model/exemplar/exemplar.go +++ b/model/exemplar/exemplar.go @@ -15,7 +15,9 @@ package exemplar import "github.com/prometheus/prometheus/model/labels" -// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters +// ExemplarMaxLabelSetLength is defined by OpenMetrics: "The combined length of +// the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 +// UTF-8 characters." // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars const ExemplarMaxLabelSetLength = 128 @@ -49,7 +51,7 @@ func (e Exemplar) Equals(e2 Exemplar) bool { return e.Value == e2.Value } -// Sort first by timestamp, then value, then labels. +// Compare first timestamps, then values, then labels. func Compare(a, b Exemplar) int { if a.Ts < b.Ts { return -1 diff --git a/model/labels/labels.go b/model/labels/labels.go index cd30f4f8ff..f4de7496ce 100644 --- a/model/labels/labels.go +++ b/model/labels/labels.go @@ -315,7 +315,8 @@ func Compare(a, b Labels) int { return len(a) - len(b) } -// Copy labels from b on top of whatever was in ls previously, reusing memory or expanding if needed. +// CopyFrom copies labels from b on top of whatever was in ls previously, +// reusing memory or expanding if needed. func (ls *Labels) CopyFrom(b Labels) { (*ls) = append((*ls)[:0], b...) } @@ -422,7 +423,7 @@ type ScratchBuilder struct { add Labels } -// Symbol-table is no-op, just for api parity with dedupelabels. +// SymbolTable is no-op, just for api parity with dedupelabels. type SymbolTable struct{} func NewSymbolTable() *SymbolTable { return nil } @@ -458,7 +459,7 @@ func (b *ScratchBuilder) Add(name, value string) { b.add = append(b.add, Label{Name: name, Value: value}) } -// Add a name/value pair, using []byte instead of string. +// UnsafeAddBytes adds a name/value pair, using []byte instead of string. // The '-tags stringlabels' version of this function is unsafe, hence the name. // This version is safe - it copies the strings immediately - but we keep the same name so everything compiles. func (b *ScratchBuilder) UnsafeAddBytes(name, value []byte) { @@ -475,14 +476,14 @@ func (b *ScratchBuilder) Assign(ls Labels) { b.add = append(b.add[:0], ls...) // Copy on top of our slice, so we don't retain the input slice. } -// Return the name/value pairs added so far as a Labels object. +// Labels returns the name/value pairs added so far as a Labels object. // Note: if you want them sorted, call Sort() first. func (b *ScratchBuilder) Labels() Labels { // Copy the slice, so the next use of ScratchBuilder doesn't overwrite. return append([]Label{}, b.add...) } -// Write the newly-built Labels out to ls. +// Overwrite the newly-built Labels out to ls. // Callers must ensure that there are no other references to ls, or any strings fetched from it. func (b *ScratchBuilder) Overwrite(ls *Labels) { *ls = append((*ls)[:0], b.add...) diff --git a/model/textparse/interface.go b/model/textparse/interface.go index df01dbc34f..0b5d9281e4 100644 --- a/model/textparse/interface.go +++ b/model/textparse/interface.go @@ -106,8 +106,8 @@ const ( EntryInvalid Entry = -1 EntryType Entry = 0 EntryHelp Entry = 1 - EntrySeries Entry = 2 // A series with a simple float64 as value. + EntrySeries Entry = 2 // EntrySeries marks a series with a simple float64 as value. EntryComment Entry = 3 EntryUnit Entry = 4 - EntryHistogram Entry = 5 // A series with a native histogram as a value. + EntryHistogram Entry = 5 // EntryHistogram marks a series with a native histogram as a value. ) diff --git a/promql/engine.go b/promql/engine.go index daa46ab6f2..60c8e06394 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -573,7 +573,7 @@ func (ng *Engine) validateOpts(expr parser.Expr) error { return validationErr } -// NewTestQuery: inject special behaviour into Query for testing. +// NewTestQuery injects special behaviour into Query for testing. func (ng *Engine) NewTestQuery(f func(context.Context) error) Query { qry := &query{ q: "test statement", @@ -3531,14 +3531,14 @@ func makeInt64Pointer(val int64) *int64 { return valp } -// Add RatioSampler interface to allow unit-testing (previously: Randomizer). +// RatioSampler allows unit-testing (previously: Randomizer). type RatioSampler interface { // Return this sample "offset" between [0.0, 1.0] sampleOffset(ts int64, sample *Sample) float64 AddRatioSample(r float64, sample *Sample) bool } -// Use Hash(labels.String()) / maxUint64 as a "deterministic" +// HashRatioSampler uses Hash(labels.String()) / maxUint64 as a "deterministic" // value in [0.0, 1.0]. type HashRatioSampler struct{} diff --git a/promql/parser/ast.go b/promql/parser/ast.go index 830e8a2c5e..162d7817ab 100644 --- a/promql/parser/ast.go +++ b/promql/parser/ast.go @@ -352,8 +352,7 @@ func (f inspector) Visit(node Node, path []Node) (Visitor, error) { // f(node, path); node must not be nil. If f returns a nil error, Inspect invokes f // for all the non-nil children of node, recursively. func Inspect(node Node, f inspector) { - //nolint: errcheck - Walk(f, node, nil) + Walk(f, node, nil) //nolint:errcheck } // Children returns a list of all child nodes of a syntax tree node. @@ -419,7 +418,7 @@ func mergeRanges(first, last Node) posrange.PositionRange { } } -// Item implements the Node interface. +// PositionRange implements the Node interface. // This makes it possible to call mergeRanges on them. func (i *Item) PositionRange() posrange.PositionRange { return posrange.PositionRange{ diff --git a/scrape/clientprotobuf.go b/scrape/clientprotobuf.go index 2213268d59..e632035b40 100644 --- a/scrape/clientprotobuf.go +++ b/scrape/clientprotobuf.go @@ -23,7 +23,7 @@ import ( dto "github.com/prometheus/client_model/go" ) -// Write a MetricFamily into a protobuf. +// MetricFamilyToProtobuf writes a MetricFamily into a protobuf. // This function is intended for testing scraping by providing protobuf serialized input. func MetricFamilyToProtobuf(metricFamily *dto.MetricFamily) ([]byte, error) { buffer := &bytes.Buffer{} @@ -34,7 +34,7 @@ func MetricFamilyToProtobuf(metricFamily *dto.MetricFamily) ([]byte, error) { return buffer.Bytes(), nil } -// Append a MetricFamily protobuf representation to a buffer. +// AddMetricFamilyToProtobuf appends a MetricFamily protobuf representation to a buffer. // This function is intended for testing scraping by providing protobuf serialized input. func AddMetricFamilyToProtobuf(buffer *bytes.Buffer, metricFamily *dto.MetricFamily) error { protoBuf, err := proto.Marshal(metricFamily) diff --git a/storage/interface.go b/storage/interface.go index f85f985e9d..2f125e5902 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -227,9 +227,9 @@ type LabelHints struct { Limit int } -// TODO(bwplotka): Move to promql/engine_test.go? // QueryableFunc is an adapter to allow the use of ordinary functions as // Queryables. It follows the idea of http.HandlerFunc. +// TODO(bwplotka): Move to promql/engine_test.go? type QueryableFunc func(mint, maxt int64) (Querier, error) // Querier calls f() with the given parameters. diff --git a/storage/remote/azuread/azuread.go b/storage/remote/azuread/azuread.go index 58520c6a5d..82f46b82f6 100644 --- a/storage/remote/azuread/azuread.go +++ b/storage/remote/azuread/azuread.go @@ -31,13 +31,15 @@ import ( "github.com/google/uuid" ) +// Clouds. const ( - // Clouds. AzureChina = "AzureChina" AzureGovernment = "AzureGovernment" AzurePublic = "AzurePublic" +) - // Audiences. +// Audiences. +const ( IngestionChinaAudience = "https://monitor.azure.cn//.default" IngestionGovernmentAudience = "https://monitor.azure.us//.default" IngestionPublicAudience = "https://monitor.azure.com//.default" diff --git a/template/template.go b/template/template.go index c507dbe746..0698c6c8ac 100644 --- a/template/template.go +++ b/template/template.go @@ -166,7 +166,7 @@ func NewTemplateExpander( return html_template.HTML(text) }, "match": regexp.MatchString, - "title": strings.Title, //nolint:staticcheck + "title": strings.Title, //nolint:staticcheck // TODO(beorn7): Need to come up with a replacement using the cases package. "toUpper": strings.ToUpper, "toLower": strings.ToLower, "graphLink": strutil.GraphLinkForExpression, diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 6c8707c57b..876b42cb26 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -191,7 +191,7 @@ func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 { // ChunkDiskMapper is for writing the Head block chunks to disk // and access chunks via mmapped files. type ChunkDiskMapper struct { - /// Writer. + // Writer. dir *os.File writeBufferSize int @@ -210,7 +210,7 @@ type ChunkDiskMapper struct { crc32 hash.Hash writePathMtx sync.Mutex - /// Reader. + // Reader. // The int key in the map is the file number on the disk. mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index. closers map[int]io.Closer // Closers for resources behind the byte slices. diff --git a/tsdb/db.go b/tsdb/db.go index 706e5bbac1..a5b3a5e602 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -49,7 +49,7 @@ import ( ) const ( - // Default duration of a block in milliseconds. + // DefaultBlockDuration in milliseconds. DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond) // Block dir suffixes to make deletion and creation operations atomic. diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index cd98fbd82f..88fdd30c85 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -201,8 +201,8 @@ func (d *Decbuf) UvarintStr() string { return string(d.UvarintBytes()) } -// The return value becomes invalid if the byte slice goes away. -// Compared to UvarintStr, this avoid allocations. +// UvarintBytes returns invalid values if the byte slice goes away. +// Compared to UvarintStr, it avoid allocations. func (d *Decbuf) UvarintBytes() []byte { l := d.Uvarint64() if d.E != nil { diff --git a/tsdb/head_other.go b/tsdb/head_other.go index eb1b93a3e5..fea91530dc 100644 --- a/tsdb/head_other.go +++ b/tsdb/head_other.go @@ -26,7 +26,7 @@ func (s *memSeries) labels() labels.Labels { return s.lset } -// No-op when not using dedupelabels. +// RebuildSymbolTable is a no-op when not using dedupelabels. func (h *Head) RebuildSymbolTable(logger log.Logger) *labels.SymbolTable { return nil } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 3621054598..0e0e353719 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -196,8 +196,9 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { return toc, d.Err() } -// NewWriter returns a new Writer to the given filename. It serializes data in format version 2. -// It uses the given encoder to encode each postings list. +// NewWriterWithEncoder returns a new Writer to the given filename. It +// serializes data in format version 2. It uses the given encoder to encode each +// postings list. func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) { dir := filepath.Dir(fn) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index cc98df4729..f1d06a421b 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -263,8 +263,8 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu return nil, mc, err } -// ChunkOrIterableWithCopy: implements ChunkReaderWithCopy. The special Copy behaviour -// is only implemented for the in-order head chunk. +// ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy +// behaviour is only implemented for the in-order head chunk. func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { _, _, isOOO := unpackHeadChunkRef(meta.Ref) if !isOOO { diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index c1eed78f6d..86e6f9c81d 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -58,15 +58,16 @@ type WriteTo interface { StoreSeries([]record.RefSeries, int) StoreMetadata([]record.RefMetadata) - // Next two methods are intended for garbage-collection: first we call - // UpdateSeriesSegment on all current series + // UpdateSeriesSegment and SeriesReset are intended for + // garbage-collection: + // First we call UpdateSeriesSegment on all current series. UpdateSeriesSegment([]record.RefSeries, int) - // Then SeriesReset is called to allow the deletion - // of all series created in a segment lower than the argument. + // Then SeriesReset is called to allow the deletion of all series + // created in a segment lower than the argument. SeriesReset(int) } -// Used to notify the watcher that data has been written so that it can read. +// WriteNotified notifies the watcher that data has been written so that it can read. type WriteNotified interface { Notify() } diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index 993e930cef..b14521f358 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -38,8 +38,8 @@ import ( ) const ( - DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB - pageSize = 32 * 1024 // 32KB + DefaultSegmentSize = 128 * 1024 * 1024 // DefaultSegmentSize is 128 MB. + pageSize = 32 * 1024 // pageSize is 32KB. recordHeaderSize = 7 WblDirName = "wbl" ) diff --git a/util/annotations/annotations.go b/util/annotations/annotations.go index bc5d76db43..b0272b7fee 100644 --- a/util/annotations/annotations.go +++ b/util/annotations/annotations.go @@ -174,7 +174,7 @@ func NewInvalidQuantileWarning(q float64, pos posrange.PositionRange) error { } } -// NewInvalidQuantileWarning is used when the user specifies an invalid ratio +// NewInvalidRatioWarning is used when the user specifies an invalid ratio // value, i.e. a float that is outside the range [-1, 1] or NaN. func NewInvalidRatioWarning(q, to float64, pos posrange.PositionRange) error { return annoErr{ diff --git a/util/testutil/cmp.go b/util/testutil/cmp.go index 370d191f3f..24d39d514c 100644 --- a/util/testutil/cmp.go +++ b/util/testutil/cmp.go @@ -23,13 +23,14 @@ import ( "github.com/prometheus/prometheus/model/labels" ) -// Replacement for require.Equal using go-cmp adapted for Prometheus data structures, instead of DeepEqual. +// RequireEqual is a replacement for require.Equal using go-cmp adapted for +// Prometheus data structures, instead of DeepEqual. func RequireEqual(t testing.TB, expected, actual interface{}, msgAndArgs ...interface{}) { t.Helper() RequireEqualWithOptions(t, expected, actual, nil, msgAndArgs...) } -// As RequireEqual but allows extra cmp.Options. +// RequireEqualWithOptions works like RequireEqual but allows extra cmp.Options. func RequireEqualWithOptions(t testing.TB, expected, actual interface{}, extra []cmp.Option, msgAndArgs ...interface{}) { t.Helper() options := append([]cmp.Option{cmp.Comparer(labels.Equal)}, extra...) From b0aba26ed5f2abf280bef3360672953ecb8d185b Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 23 Aug 2024 08:20:20 +0200 Subject: [PATCH 6/8] tsdb: Fix ValNone typo in comment Signed-off-by: Arve Knudsen --- tsdb/querier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/querier.go b/tsdb/querier.go index 2e15f0b084..912c950329 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -972,7 +972,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { // Check if the encoding has changed (i.e. we need to create a new // chunk as chunks can't have multiple encoding types). // For the first sample, the following condition will always be true as - // ValNoneNone != ValFloat | ValHistogram | ValFloatHistogram. + // ValNone != ValFloat | ValHistogram | ValFloatHistogram. if currentValueType != prevValueType { if prevValueType != chunkenc.ValNone { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) From 41c076196ef6626585cac34ec758e6582019c010 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Thu, 22 Aug 2024 11:36:47 +0200 Subject: [PATCH 7/8] New cases in Test_ChunkQuerier_OOOQuery and Test_Querier_OOOQuery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Case 1: OOO in-memory head chunk overlaps with first mmaped in-order chunk. Query: |----------------------------------------------------------------| InO: |------mmap---------------||---------mem----------------------| OOO: |-----mem-----------| This triggers ChunkOrIterableWithCopy not including OOO head chunks bug. Similar to #14693 however testing the end of the interval doesn't trigger the problem because there the in-order head chunk will be trimmed with a tombstone, causing the code to switch to ChunkOrIterable which was fixed. See https://github.com/prometheus/prometheus/blob/a36d1a8a9261ba7169cf2fca862a606adc9f3d9d/tsdb/querier.go#L646 where len(p.bufIter.Intervals) will be non zero, because it includes the tombstone to trim the result to the query max time. Thus a new test is added to check the overlap at the beginning of the interval that has a separate chunk, which does not need trimming. Note: same test doesn't fail for sample querier in Test_Querier_OOOQuery as that doesn't use copy, that is copyHeadChunk is false in the if condition above. Case 2: OOO mmaped head chunk overlaps with first mmaped in-order chunk. Query: |----------------------------------------------------------------| InO: |------mmap---------------||---------mem----------------------| OOO: |-----mmap-----------| |--mem--| In this case the meta contains the reference of the in-order chunk and no indication that a merge is needed with the OOO mmaped chunk. Signed-off-by: György Krajcsovits --- tsdb/db_test.go | 377 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 304 insertions(+), 73 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 6d266e8724..4e3a077f6a 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -5036,16 +5036,15 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa func Test_Querier_OOOQuery(t *testing.T) { opts := DefaultOptions() - opts.OutOfOrderCapMax = 30 opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() series1 := labels.FromStrings("foo", "bar1") + type filterFunc func(t int64) bool + defaultFilterFunc := func(t int64) bool { return true } + minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter func(int64) bool) ([]chunks.Sample, int) { - if filter == nil { - filter = func(int64) bool { return true } - } + addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc) ([]chunks.Sample, int) { app := db.Appender(context.Background()) totalAppended := 0 for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() { @@ -5060,68 +5059,173 @@ func Test_Querier_OOOQuery(t *testing.T) { totalAppended++ } require.NoError(t, app.Commit()) + require.Positive(t, totalAppended, 0) // Sanity check that filter is not too zealous. return expSamples, totalAppended } + type sampleBatch struct { + minT int64 + maxT int64 + filter filterFunc + isOOO bool + } + tests := []struct { - name string - queryMinT int64 - queryMaxT int64 - inOrderMinT int64 - inOrderMaxT int64 - oooMinT int64 - oooMaxT int64 + name string + oooCap int64 + queryMinT int64 + queryMaxT int64 + batches []sampleBatch }{ { - name: "query interval covering ooomint and inordermaxt returns all ingested samples", - queryMinT: minutes(0), - queryMaxT: minutes(200), - inOrderMinT: minutes(100), - inOrderMaxT: minutes(200), - oooMinT: minutes(0), - oooMaxT: minutes(99), + name: "query interval covering ooomint and inordermaxt returns all ingested samples", + oooCap: 30, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: defaultFilterFunc, + }, + { + minT: minutes(0), + maxT: minutes(99), + filter: defaultFilterFunc, + isOOO: true, + }, + }, }, { - name: "partial query interval returns only samples within interval", - queryMinT: minutes(20), - queryMaxT: minutes(180), - inOrderMinT: minutes(100), - inOrderMaxT: minutes(200), - oooMinT: minutes(0), - oooMaxT: minutes(99), + name: "partial query interval returns only samples within interval", + oooCap: 30, + queryMinT: minutes(20), + queryMaxT: minutes(180), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: defaultFilterFunc, + }, + { + minT: minutes(0), + maxT: minutes(99), + filter: defaultFilterFunc, + isOOO: true, + }, + }, }, { - name: "query overlapping inorder and ooo samples returns all ingested samples", - queryMinT: minutes(0), - queryMaxT: minutes(200), - inOrderMinT: minutes(100), - inOrderMaxT: minutes(200), - oooMinT: minutes(180 - opts.OutOfOrderCapMax/2), // Make sure to fit into the OOO head. - oooMaxT: minutes(180), + name: "query overlapping inorder and ooo samples returns all ingested samples at the end of the interval", + oooCap: 30, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: false, + }, + { + minT: minutes(170), + maxT: minutes(180), + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + }, + }, + { + name: "query overlapping inorder and ooo in-memory samples returns all ingested samples at the beginning of the interval", + oooCap: 30, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: false, + }, + { + minT: minutes(100), + maxT: minutes(110), + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + }, + }, + { + name: "query inorder contain ooo mmaped samples returns all ingested samples at the beginning of the interval", + oooCap: 5, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: false, + }, + { + minT: minutes(101), + maxT: minutes(101 + (5-1)*2), // Append samples to fit in a single mmmaped OOO chunk and fit inside the first in-order mmaped chunk. + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + { + minT: minutes(191), + maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk. + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + }, + }, + { + name: "query overlapping inorder and ooo mmaped samples returns all ingested samples at the beginning of the interval", + oooCap: 30, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: false, + }, + { + minT: minutes(101), + maxT: minutes(101 + (30-1)*2), // Append samples to fit in a single mmmaped OOO chunk and overlap the first in-order mmaped chunk. + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + { + minT: minutes(191), + maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk. + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + }, }, } for _, tc := range tests { t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { + opts.OutOfOrderCapMax = tc.oooCap db := openTestDB(t, opts, nil) db.DisableCompactions() defer func() { require.NoError(t, db.Close()) }() - var ( - expSamples []chunks.Sample - inoSamples int - ) + var expSamples []chunks.Sample + var oooSamples, appendedCount int - // Add in-order samples (at even minutes). - expSamples, inoSamples = addSample(db, tc.inOrderMinT, tc.inOrderMaxT, tc.queryMinT, tc.queryMaxT, expSamples, func(t int64) bool { return t%2 == 0 }) - // Sanity check that filter is not too zealous. - require.Positive(t, inoSamples, 0) - - // Add out-of-order samples (at odd minutes). - expSamples, oooSamples := addSample(db, tc.oooMinT, tc.oooMaxT, tc.queryMinT, tc.queryMaxT, expSamples, func(t int64) bool { return t%2 == 1 }) - // Sanity check that filter is not too zealous. - require.Positive(t, oooSamples, 0) + for _, batch := range tc.batches { + expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter) + if batch.isOOO { + oooSamples += appendedCount + } + } sort.Slice(expSamples, func(i, j int) bool { return expSamples[i].T() < expSamples[j].T() @@ -5147,11 +5251,17 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { series1 := labels.FromStrings("foo", "bar1") + type filterFunc func(t int64) bool + defaultFilterFunc := func(t int64) bool { return true } + minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample) ([]chunks.Sample, int) { + addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc) ([]chunks.Sample, int) { app := db.Appender(context.Background()) totalAppended := 0 for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() { + if !filter(m / time.Minute.Milliseconds()) { + continue + } _, err := app.Append(0, series1, m, float64(m)) if m >= queryMinT && m <= queryMaxT { expSamples = append(expSamples, sample{t: m, f: float64(m)}) @@ -5160,39 +5270,158 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { totalAppended++ } require.NoError(t, app.Commit()) + require.Positive(t, totalAppended) // Sanity check that filter is not too zealous. return expSamples, totalAppended } + type sampleBatch struct { + minT int64 + maxT int64 + filter filterFunc + isOOO bool + } + tests := []struct { - name string - queryMinT int64 - queryMaxT int64 - inOrderMinT int64 - inOrderMaxT int64 - oooMinT int64 - oooMaxT int64 + name string + oooCap int64 + queryMinT int64 + queryMaxT int64 + batches []sampleBatch }{ { - name: "query interval covering ooomint and inordermaxt returns all ingested samples", - queryMinT: minutes(0), - queryMaxT: minutes(200), - inOrderMinT: minutes(100), - inOrderMaxT: minutes(200), - oooMinT: minutes(0), - oooMaxT: minutes(99), + name: "query interval covering ooomint and inordermaxt returns all ingested samples", + oooCap: 30, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: defaultFilterFunc, + }, + { + minT: minutes(0), + maxT: minutes(99), + filter: defaultFilterFunc, + isOOO: true, + }, + }, }, { - name: "partial query interval returns only samples within interval", - queryMinT: minutes(20), - queryMaxT: minutes(180), - inOrderMinT: minutes(100), - inOrderMaxT: minutes(200), - oooMinT: minutes(0), - oooMaxT: minutes(99), + name: "partial query interval returns only samples within interval", + oooCap: 30, + queryMinT: minutes(20), + queryMaxT: minutes(180), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: defaultFilterFunc, + }, + { + minT: minutes(0), + maxT: minutes(99), + filter: defaultFilterFunc, + isOOO: true, + }, + }, + }, + { + name: "query overlapping inorder and ooo samples returns all ingested samples at the end of the interval", + oooCap: 30, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: false, + }, + { + minT: minutes(170), + maxT: minutes(180), + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + }, + }, + { + name: "query overlapping inorder and ooo in-memory samples returns all ingested samples at the beginning of the interval", + oooCap: 30, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: false, + }, + { + minT: minutes(100), + maxT: minutes(110), + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + }, + }, + { + name: "query inorder contain ooo mmaped samples returns all ingested samples at the beginning of the interval", + oooCap: 5, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: false, + }, + { + minT: minutes(101), + maxT: minutes(101 + (5-1)*2), // Append samples to fit in a single mmmaped OOO chunk and fit inside the first in-order mmaped chunk. + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + { + minT: minutes(191), + maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk. + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + }, + }, + { + name: "query overlapping inorder and ooo mmaped samples returns all ingested samples at the beginning of the interval", + oooCap: 30, + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: false, + }, + { + minT: minutes(101), + maxT: minutes(101 + (30-1)*2), // Append samples to fit in a single mmmaped OOO chunk and overlap the first in-order mmaped chunk. + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + { + minT: minutes(191), + maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk. + filter: func(t int64) bool { return t%2 == 1 }, + isOOO: true, + }, + }, }, } for _, tc := range tests { t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { + opts.OutOfOrderCapMax = tc.oooCap db := openTestDB(t, opts, nil) db.DisableCompactions() defer func() { @@ -5200,12 +5429,14 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { }() var expSamples []chunks.Sample + var oooSamples, appendedCount int - // Add in-order samples. - expSamples, _ = addSample(db, tc.inOrderMinT, tc.inOrderMaxT, tc.queryMinT, tc.queryMaxT, expSamples) - - // Add out-of-order samples. - expSamples, oooSamples := addSample(db, tc.oooMinT, tc.oooMaxT, tc.queryMinT, tc.queryMaxT, expSamples) + for _, batch := range tc.batches { + expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter) + if batch.isOOO { + oooSamples += appendedCount + } + } sort.Slice(expSamples, func(i, j int) bool { return expSamples[i].T() < expSamples[j].T() From 183bbc39a23123789baf77eb9f915103bb83fc83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Thu, 22 Aug 2024 11:59:53 +0200 Subject: [PATCH 8/8] Make requesting merge with OOO head explicit in chunk.Meta MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: György Krajcsovits --- tsdb/chunks/chunks.go | 3 +++ tsdb/ooo_head_read.go | 17 +++++++++-------- tsdb/ooo_head_read_test.go | 9 +++++---- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index ec0f6d4036..69201c6db7 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -133,6 +133,9 @@ type Meta struct { // Time range the data covers. // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. MinTime, MaxTime int64 + + // Flag to indicate that this meta needs merge with OOO data. + MergeOOO bool } // ChunkFromSamples requires all samples to have the same type. diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index f1d06a421b..a50decd623 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -91,10 +91,11 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { tmpChks = append(tmpChks, chunks.Meta{ - MinTime: minT, - MaxTime: maxT, - Ref: ref, - Chunk: chunk, + MinTime: minT, + MaxTime: maxT, + Ref: ref, + Chunk: chunk, + MergeOOO: true, }) } @@ -160,6 +161,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap if c.Chunk != nil { (*chks)[len(*chks)-1].Chunk = c.Chunk } + (*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO } } @@ -241,8 +243,8 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, } func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { - sid, _, isOOO := unpackHeadChunkRef(meta.Ref) - if !isOOO && meta.Chunk == nil { // meta.Chunk can have a copy of OOO head samples, even on non-OOO chunk ID. + sid, _, _ := unpackHeadChunkRef(meta.Ref) + if !meta.MergeOOO { return cr.cr.ChunkOrIterable(meta) } @@ -266,8 +268,7 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu // ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy // behaviour is only implemented for the in-order head chunk. func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { - _, _, isOOO := unpackHeadChunkRef(meta.Ref) - if !isOOO { + if !meta.MergeOOO { return cr.cr.ChunkOrIterableWithCopy(meta) } chk, iter, err := cr.ChunkOrIterable(meta) diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index f71d497320..e565933f83 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -308,9 +308,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { var expChunks []chunks.Meta for _, e := range tc.expChunks { meta := chunks.Meta{ - Chunk: chunkenc.Chunk(nil), - MinTime: e.mint, - MaxTime: e.maxt, + Chunk: chunkenc.Chunk(nil), + MinTime: e.mint, + MaxTime: e.maxt, + MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head. } // Ref to whatever Ref the chunk has, that we refer to by ID @@ -484,7 +485,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ - Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, + Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true, }) require.Nil(t, iterable) require.Equal(t, err, fmt.Errorf("not found"))