Merge branch 'main' into arve/loggercheck

This commit is contained in:
Bryan Boreham 2024-06-19 05:47:54 -04:00 committed by GitHub
commit 5a1886d247
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 457 additions and 498 deletions

View file

@ -2,16 +2,23 @@
## unreleased
This release changes the default for GOGC, the Go runtime control for the trade-off between excess memory use and CPU usage. We have found that Prometheus operates with minimal additional CPU usage, but greatly reduced memory by adjusting the upstream Go default from 100 to 50.
## 2.53.0 / 2024-06-16
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
* [CHANGE] Runtime: Change GOGC threshold from 100 to 50 #14176
* [FEATURE] Rules: Add new option `query_offset` for each rule group via rule group configuration file and `rule_query_offset` as part of the global configuration to have more resilience for remote write delays. #14061
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
This release changes the default for GOGC, the Go runtime control for the trade-off between excess memory use and CPU usage. We have found that Prometheus operates with minimal additional CPU usage, but greatly reduced memory by adjusting the upstream Go default from 100 to 75.
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980 #14048
* [CHANGE] Runtime: Change GOGC threshold from 100 to 75 #14176 #14285
* [FEATURE] Rules: Add new option `query_offset` for each rule group via rule group configuration file and `rule_query_offset` as part of the global configuration to have more resilience for remote write delays. #14061 #14216 #14273
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` metric to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
* [BUGFIX] OTLP: Don't generate target_info unless at least one identifying label is defined. #13991
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics. #13991
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics and at least one identifying label is defined. #13991
* [BUGFIX] Scrape: Do no try to ingest native histograms when the native histograms feature is turned off. This happened when protobuf scrape was enabled by for example the created time feature. #13987
* [BUGFIX] Scaleway SD: Use the instance's public IP if no private IP is available as the `__address__` meta label. #13941
* [BUGFIX] Query logger: Do not leak file descriptors on error. #13948
* [BUGFIX] TSDB: Let queries with heavy regex matches be cancelled and not use up the CPU. #14096 #14103 #14118 #14199
* [BUGFIX] API: Do not warn if result count is equal to the limit, only when exceeding the limit for the series, label-names and label-values APIs. #14116
* [BUGFIX] TSDB: Fix head stats and hooks when replaying a corrupted snapshot. #14079
## 2.52.1 / 2024-05-29

View file

@ -149,6 +149,8 @@ Changes for a patch release or release candidate should be merged into the previ
Bump the version in the `VERSION` file and update `CHANGELOG.md`. Do this in a proper PR pointing to the release branch as this gives others the opportunity to chime in on the release in general and on the addition to the changelog in particular. For a release candidate, append something like `-rc.0` to the version (with the corresponding changes to the tag name, the release name etc.).
When updating the `CHANGELOG.md` look at all PRs included in the release since the last release and verify if they need a changelog entry.
Note that `CHANGELOG.md` should only document changes relevant to users of Prometheus, including external API changes, performance improvements, and new features. Do not document changes of internal interfaces, code refactorings and clean-ups, changes to the build process, etc. People interested in these are asked to refer to the git history.
For release candidates still update `CHANGELOG.md`, but when you cut the final release later, merge all the changes from the pre-releases into the one final update.

View file

@ -1 +1 @@
2.52.1
2.53.0

View file

@ -120,6 +120,16 @@ func Name(n string) func(*Manager) {
}
}
// Updatert sets the updatert of the manager.
// Used to speed up tests.
func Updatert(u time.Duration) func(*Manager) {
return func(m *Manager) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.updatert = u
}
}
// HTTPClientOptions sets the list of HTTP client options to expose to
// Discoverers. It is up to Discoverers to choose to use the options provided.
func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) {

View file

@ -1608,7 +1608,16 @@ and serves as an interface to plug in custom service discovery mechanisms.
It reads a set of files containing a list of zero or more
`<static_config>`s. Changes to all defined files are detected via disk watches
and applied immediately. Files may be provided in YAML or JSON format. Only
and applied immediately.
While those individual files are watched for changes,
the parent directory is also watched implicitly. This is to handle [atomic
renaming](https://github.com/fsnotify/fsnotify/blob/c1467c02fba575afdb5f4201072ab8403bbf00f4/README.md?plain=1#L128) efficiently and to detect new files that match the configured globs.
This may cause issues if the parent directory contains a large number of other files,
as each of these files will be watched too, even though the events related
to them are not relevant.
Files may be provided in YAML or JSON format. Only
changes resulting in well-formed target groups are applied.
Files must contain a list of static configs, using these formats:

View file

@ -61,8 +61,11 @@ A Prometheus server's data directory looks something like this:
Note that a limitation of local storage is that it is not clustered or
replicated. Thus, it is not arbitrarily scalable or durable in the face of
drive or node outages and should be managed like any other single node
database. The use of RAID is suggested for storage availability, and
[snapshots](querying/api.md#snapshot) are recommended for backups. With proper
database.
[Snapshots](querying/api.md#snapshot) are recommended for backups. Backups
made without snapshots run the risk of losing data that was recorded since
the last WAL sync, which typically happens every two hours. With proper
architecture, it is possible to retain years of data in local storage.
Alternatively, external storage may be used via the

View file

@ -18,6 +18,7 @@ import (
"encoding/json"
"slices"
"strconv"
"unsafe"
"github.com/prometheus/common/model"
)
@ -215,3 +216,7 @@ func contains(s []Label, n string) bool {
}
return false
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}

View file

@ -20,7 +20,6 @@ import (
"slices"
"strings"
"sync"
"unsafe"
"github.com/cespare/xxhash/v2"
)
@ -426,10 +425,6 @@ func EmptyLabels() Labels {
return Labels{}
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
// New returns a sorted Labels from the given labels.
// The caller has to guarantee that all label names are unique.
// Note this function is not efficient; should not be used in performance-critical places.

View file

@ -299,11 +299,6 @@ func Equal(ls, o Labels) bool {
func EmptyLabels() Labels {
return Labels{}
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
func yoloBytes(s string) (b []byte) {
*(*string)(unsafe.Pointer(&b)) = s
(*reflect.SliceHeader)(unsafe.Pointer(&b)).Cap = len(s)

View file

@ -798,39 +798,23 @@ func (m *equalMultiStringMapMatcher) Matches(s string) bool {
// toNormalisedLower normalise the input string using "Unicode Normalization Form D" and then convert
// it to lower case.
func toNormalisedLower(s string) string {
// Check if the string is all ASCII chars and convert any upper case character to lower case character.
isASCII := true
var (
b strings.Builder
pos int
)
b.Grow(len(s))
var buf []byte
for i := 0; i < len(s); i++ {
c := s[i]
if isASCII && c >= utf8.RuneSelf {
isASCII = false
break
if c >= utf8.RuneSelf {
return strings.Map(unicode.ToLower, norm.NFKD.String(s))
}
if 'A' <= c && c <= 'Z' {
c += 'a' - 'A'
if pos < i {
b.WriteString(s[pos:i])
if buf == nil {
buf = []byte(s)
}
b.WriteByte(c)
pos = i + 1
buf[i] = c + 'a' - 'A'
}
}
if pos < len(s) {
b.WriteString(s[pos:])
if buf == nil {
return s
}
// Optimize for ASCII-only strings. In this case we don't have to do any normalization.
if isASCII {
return b.String()
}
// Normalise and convert to lower.
return strings.Map(unicode.ToLower, norm.NFKD.String(b.String()))
return yoloString(buf)
}
// anyStringWithoutNewlineMatcher is a stringMatcher which matches any string

View file

@ -1209,6 +1209,10 @@ func visitStringMatcher(matcher StringMatcher, callback func(matcher StringMatch
func TestToNormalisedLower(t *testing.T) {
testCases := map[string]string{
"foo": "foo",
"FOO": "foo",
"Foo": "foo",
"foO": "foo",
"fOo": "foo",
"AAAAAAAAAAAAAAAAAAAAAAAA": "aaaaaaaaaaaaaaaaaaaaaaaa",
"cccccccccccccccccccccccC": "cccccccccccccccccccccccc",
"ſſſſſſſſſſſſſſſſſſſſſſſſS": "sssssssssssssssssssssssss",

View file

@ -298,25 +298,14 @@ func (n *Manager) nextBatch() []*Alert {
return alerts
}
// Run dispatches notifications continuously.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
// sendLoop continuously consumes the notifications queue and sends alerts to
// the configured Alertmanagers.
func (n *Manager) sendLoop() {
for {
// The select is split in two parts, such as we will first try to read
// new alertmanager targets if they are available, before sending new
// alerts.
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
default:
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
}
case <-n.more:
}
alerts := n.nextBatch()
@ -330,6 +319,21 @@ func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
}
}
// Run receives updates of target groups and triggers a reload.
// The dispatching of notifications occurs in the background to prevent blocking the receipt of target updates.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
go n.sendLoop()
for {
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
}
}
}
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
n.mtx.Lock()
defer n.mtx.Unlock()
@ -471,10 +475,6 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
numSuccess atomic.Uint64
)
for _, ams := range amSets {
if len(ams.ams) == 0 {
continue
}
var (
payload []byte
err error
@ -483,6 +483,11 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
ams.mtx.RLock()
if len(ams.ams) == 0 {
ams.mtx.RUnlock()
continue
}
if len(ams.cfg.AlertRelabelConfigs) > 0 {
amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts)
if len(amAlerts) == 0 {

View file

@ -26,13 +26,17 @@ import (
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
@ -697,117 +701,149 @@ func TestLabelsToOpenAPILabelSet(t *testing.T) {
require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.FromStrings("aaa", "111", "bbb", "222")))
}
// TestHangingNotifier validates that targets updates happen even when there are
// queued alerts.
// TestHangingNotifier ensures that the notifier takes into account SD changes even when there are
// queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676.
// and https://github.com/prometheus/prometheus/issues/8768.
func TestHangingNotifier(t *testing.T) {
// Note: When targets are not updated in time, this test is flaky because go
// selects are not deterministic. Therefore we run 10 subtests to run into the issue.
for i := 0; i < 10; i++ {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var (
done = make(chan struct{})
changed = make(chan struct{})
syncCh = make(chan map[string][]*targetgroup.Group)
)
const (
batches = 100
alertsCount = maxBatchSize * batches
)
defer func() {
close(done)
}()
var (
sendTimeout = 10 * time.Millisecond
sdUpdatert = sendTimeout / 2
var calledOnce bool
// Setting up a bad server. This server hangs for 2 seconds.
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if calledOnce {
t.Fatal("hanging server called multiple times")
}
calledOnce = true
select {
case <-done:
case <-time.After(2 * time.Second):
}
}))
badURL, err := url.Parse(badServer.URL)
require.NoError(t, err)
badAddress := badURL.Host // Used for __name__ label in targets.
done = make(chan struct{})
)
// Setting up a bad server. This server returns fast, signaling requests on
// by closing the changed channel.
goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(changed)
}))
goodURL, err := url.Parse(goodServer.URL)
require.NoError(t, err)
goodAddress := goodURL.Host // Used for __name__ label in targets.
defer func() {
close(done)
}()
h := NewManager(
&Options{
QueueCapacity: 20 * maxBatchSize,
},
nil,
)
// Set up a faulty Alertmanager.
var faultyCalled atomic.Bool
faultyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
faultyCalled.Store(true)
select {
case <-done:
case <-time.After(time.Hour):
}
}))
faultyURL, err := url.Parse(faultyServer.URL)
require.NoError(t, err)
h.alertmanagers = make(map[string]*alertmanagerSet)
// Set up a functional Alertmanager.
var functionalCalled atomic.Bool
functionalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
functionalCalled.Store(true)
}))
functionalURL, err := url.Parse(functionalServer.URL)
require.NoError(t, err)
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(200 * time.Millisecond)
// Initialize the discovery manager
// This is relevant as the updates aren't sent continually in real life, but only each updatert.
// The old implementation of TestHangingNotifier didn't take that into acount.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reg := prometheus.NewRegistry()
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
require.NoError(t, err)
sdManager := discovery.NewManager(
ctx,
log.NewNopLogger(),
reg,
sdMetrics,
discovery.Name("sd-manager"),
discovery.Updatert(sdUpdatert),
)
go sdManager.Run()
h.alertmanagers["config-0"] = &alertmanagerSet{
ams: []alertmanager{},
cfg: &am1Cfg,
metrics: h.metrics,
}
go h.Run(syncCh)
defer h.Stop()
// Set up the notifier with both faulty and functional Alertmanagers.
notifier := NewManager(
&Options{
QueueCapacity: alertsCount,
},
nil,
)
notifier.alertmanagers = make(map[string]*alertmanagerSet)
amCfg := config.DefaultAlertmanagerConfig
amCfg.Timeout = model.Duration(sendTimeout)
notifier.alertmanagers["config-0"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return faultyURL.String() },
},
alertmanagerMock{
urlf: func() string { return functionalURL.String() },
},
},
cfg: &amCfg,
metrics: notifier.metrics,
}
go notifier.Run(sdManager.SyncCh())
defer notifier.Stop()
var alerts []*Alert
for i := range make([]struct{}, 20*maxBatchSize) {
alerts = append(alerts, &Alert{
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
})
}
require.Len(t, notifier.Alertmanagers(), 2)
// Injecting the hanging server URL.
syncCh <- map[string][]*targetgroup.Group{
"config-0": {
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(badAddress),
},
},
},
},
}
// Queing alerts.
h.Send(alerts...)
// Updating with a working alertmanager target.
go func() {
select {
case syncCh <- map[string][]*targetgroup.Group{
"config-0": {
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(goodAddress),
},
},
},
},
}:
case <-done:
}
}()
select {
case <-time.After(1 * time.Second):
t.Fatalf("Timeout after 1 second, targets not synced in time.")
case <-changed:
// The good server has been hit in less than 3 seconds, therefore
// targets have been updated before a second call could be made to the
// bad server.
}
// Enqueue the alerts.
var alerts []*Alert
for i := range make([]struct{}, alertsCount) {
alerts = append(alerts, &Alert{
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
})
}
notifier.Send(alerts...)
// Wait for the Alertmanagers to start receiving alerts.
// 10*sdUpdatert is used as an arbitrary timeout here.
timeout := time.After(10 * sdUpdatert)
loop1:
for {
select {
case <-timeout:
t.Fatalf("Timeout waiting for the alertmanagers to be reached for the first time.")
default:
if faultyCalled.Load() && functionalCalled.Load() {
break loop1
}
}
}
// Request to remove the faulty Alertmanager.
c := map[string]discovery.Configs{
"config-0": {
discovery.StaticConfig{
&targetgroup.Group{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(functionalURL.Host),
},
},
},
},
},
}
require.NoError(t, sdManager.ApplyConfig(c))
// The notifier should not wait until the alerts queue is empty to apply the discovery changes
// A faulty Alertmanager could cause each alert sending cycle to take up to AlertmanagerConfig.Timeout
// The queue may never be emptied, as the arrival rate could be larger than the departure rate
// It could even overflow and alerts could be dropped.
timeout = time.After(batches * sendTimeout)
loop2:
for {
select {
case <-timeout:
t.Fatalf("Timeout, the faulty alertmanager not removed on time.")
default:
// The faulty alertmanager was dropped.
if len(notifier.Alertmanagers()) == 1 {
// Prevent from TOCTOU.
require.Positive(t, notifier.queueLen())
break loop2
}
require.Positive(t, notifier.queueLen(), "The faulty alertmanager wasn't dropped before the alerts queue was emptied.")
}
}
}

View file

@ -2015,47 +2015,6 @@ func TestSubquerySelector(t *testing.T) {
}
}
func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) {
engine := newTestEngine()
storage := promqltest.LoadedStorage(t, `
load 1m
metric 0+1x1000
`)
t.Cleanup(func() { storage.Close() })
query := "timestamp(metric)"
start := time.Unix(0, 0)
end := time.Unix(61, 0)
interval := time.Second
// We expect the value to be 0 for t=0s to t=59s (inclusive), then 60 for t=60s and t=61s.
expectedPoints := []promql.FPoint{}
for t := 0; t <= 59; t++ {
expectedPoints = append(expectedPoints, promql.FPoint{F: 0, T: int64(t * 1000)})
}
expectedPoints = append(
expectedPoints,
promql.FPoint{F: 60, T: 60_000},
promql.FPoint{F: 60, T: 61_000},
)
expectedResult := promql.Matrix{
promql.Series{
Floats: expectedPoints,
Metric: labels.EmptyLabels(),
},
}
qry, err := engine.NewRangeQuery(context.Background(), storage, nil, query, start, end, interval)
require.NoError(t, err)
res := qry.Exec(context.Background())
require.NoError(t, res.Err)
testutil.RequireEqual(t, expectedResult, res.Value)
}
type FakeQueryLogger struct {
closed bool
logs []interface{}
@ -3061,167 +3020,6 @@ func TestEngineOptsValidation(t *testing.T) {
}
}
func TestRangeQuery(t *testing.T) {
cases := []struct {
Name string
Load string
Query string
Result parser.Value
Start time.Time
End time.Time
Interval time.Duration
}{
{
Name: "sum_over_time with all values",
Load: `load 30s
bar 0 1 10 100 1000`,
Query: "sum_over_time(bar[30s])",
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
Metric: labels.EmptyLabels(),
},
},
Start: time.Unix(0, 0),
End: time.Unix(120, 0),
Interval: 60 * time.Second,
},
{
Name: "sum_over_time with trailing values",
Load: `load 30s
bar 0 1 10 100 1000 0 0 0 0`,
Query: "sum_over_time(bar[30s])",
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
Metric: labels.EmptyLabels(),
},
},
Start: time.Unix(0, 0),
End: time.Unix(120, 0),
Interval: 60 * time.Second,
},
{
Name: "sum_over_time with all values long",
Load: `load 30s
bar 0 1 10 100 1000 10000 100000 1000000 10000000`,
Query: "sum_over_time(bar[30s])",
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}, {F: 110000, T: 180000}, {F: 11000000, T: 240000}},
Metric: labels.EmptyLabels(),
},
},
Start: time.Unix(0, 0),
End: time.Unix(240, 0),
Interval: 60 * time.Second,
},
{
Name: "sum_over_time with all values random",
Load: `load 30s
bar 5 17 42 2 7 905 51`,
Query: "sum_over_time(bar[30s])",
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 5, T: 0}, {F: 59, T: 60000}, {F: 9, T: 120000}, {F: 956, T: 180000}},
Metric: labels.EmptyLabels(),
},
},
Start: time.Unix(0, 0),
End: time.Unix(180, 0),
Interval: 60 * time.Second,
},
{
Name: "metric query",
Load: `load 30s
metric 1+1x4`,
Query: "metric",
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 1, T: 0}, {F: 3, T: 60000}, {F: 5, T: 120000}},
Metric: labels.FromStrings("__name__", "metric"),
},
},
Start: time.Unix(0, 0),
End: time.Unix(120, 0),
Interval: 1 * time.Minute,
},
{
Name: "metric query with trailing values",
Load: `load 30s
metric 1+1x8`,
Query: "metric",
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 1, T: 0}, {F: 3, T: 60000}, {F: 5, T: 120000}},
Metric: labels.FromStrings("__name__", "metric"),
},
},
Start: time.Unix(0, 0),
End: time.Unix(120, 0),
Interval: 1 * time.Minute,
},
{
Name: "short-circuit",
Load: `load 30s
foo{job="1"} 1+1x4
bar{job="2"} 1+1x4`,
Query: `foo > 2 or bar`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 1, T: 0}, {F: 3, T: 60000}, {F: 5, T: 120000}},
Metric: labels.FromStrings(
"__name__", "bar",
"job", "2",
),
},
promql.Series{
Floats: []promql.FPoint{{F: 3, T: 60000}, {F: 5, T: 120000}},
Metric: labels.FromStrings(
"__name__", "foo",
"job", "1",
),
},
},
Start: time.Unix(0, 0),
End: time.Unix(120, 0),
Interval: 1 * time.Minute,
},
{
Name: "drop-metric-name",
Load: `load 30s
requests{job="1", __address__="bar"} 100`,
Query: `requests * 2`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 200, T: 0}, {F: 200, T: 60000}, {F: 200, T: 120000}},
Metric: labels.FromStrings(
"__address__", "bar",
"job", "1",
),
},
},
Start: time.Unix(0, 0),
End: time.Unix(120, 0),
Interval: 1 * time.Minute,
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
engine := newTestEngine()
storage := promqltest.LoadedStorage(t, c.Load)
t.Cleanup(func() { storage.Close() })
qry, err := engine.NewRangeQuery(context.Background(), storage, nil, c.Query, c.Start, c.End, c.Interval)
require.NoError(t, err)
res := qry.Exec(context.Background())
require.NoError(t, res.Err)
testutil.RequireEqual(t, c.Result, res.Value)
})
}
}
func TestInstantQueryWithRangeVectorSelector(t *testing.T) {
engine := newTestEngine()

View file

@ -1213,3 +1213,11 @@ eval instant at 5m log10(exp_root_log - 20)
{l="y"} -Inf
clear
# Test that timestamp() handles the scenario where there are more steps than samples.
load 1m
metric 0+1x1000
# We expect the value to be 0 for t=0s to t=59s (inclusive), then 60 for t=60s and t=61s.
eval range from 0 to 61s step 1s timestamp(metric)
{} 0x59 60 60

View file

@ -0,0 +1,73 @@
# sum_over_time with all values
load 30s
bar 0 1 10 100 1000
eval range from 0 to 2m step 1m sum_over_time(bar[30s])
{} 0 11 1100
clear
# sum_over_time with trailing values
load 30s
bar 0 1 10 100 1000 0 0 0 0
eval range from 0 to 2m step 1m sum_over_time(bar[30s])
{} 0 11 1100
clear
# sum_over_time with all values long
load 30s
bar 0 1 10 100 1000 10000 100000 1000000 10000000
eval range from 0 to 4m step 1m sum_over_time(bar[30s])
{} 0 11 1100 110000 11000000
clear
# sum_over_time with all values random
load 30s
bar 5 17 42 2 7 905 51
eval range from 0 to 3m step 1m sum_over_time(bar[30s])
{} 5 59 9 956
clear
# metric query
load 30s
metric 1+1x4
eval range from 0 to 2m step 1m metric
metric 1 3 5
clear
# metric query with trailing values
load 30s
metric 1+1x8
eval range from 0 to 2m step 1m metric
metric 1 3 5
clear
# short-circuit
load 30s
foo{job="1"} 1+1x4
bar{job="2"} 1+1x4
eval range from 0 to 2m step 1m foo > 2 or bar
foo{job="1"} _ 3 5
bar{job="2"} 1 3 5
clear
# Drop metric name
load 30s
requests{job="1", __address__="bar"} 100
eval range from 0 to 2m step 1m requests * 2
{job="1", __address__="bar"} 200 200 200
clear

View file

@ -95,7 +95,7 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
// ToQuery builds a Query proto.
func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error) {
ms, err := toLabelMatchers(matchers)
ms, err := ToLabelMatchers(matchers)
if err != nil {
return nil, err
}
@ -566,7 +566,8 @@ func validateLabelsAndMetricName(ls []prompb.Label) error {
return nil
}
func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
// ToLabelMatchers converts Prometheus label matchers to protobuf label matchers.
func ToLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers))
for _, m := range matchers {
var mType prompb.LabelMatcher_Type
@ -591,7 +592,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error)
return pbMatchers, nil
}
// FromLabelMatchers parses protobuf label matchers to Prometheus label matchers.
// FromLabelMatchers converts protobuf label matchers to Prometheus label matchers.
func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) {
result := make([]*labels.Matcher, 0, len(matchers))
for _, matcher := range matchers {

View file

@ -1552,7 +1552,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
seriesRemoved := len(deleted)
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
@ -1561,7 +1561,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
h.numSeries.Sub(uint64(seriesRemoved))
// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)
h.postings.Delete(deleted, affected)
// Remove tombstones referring to the deleted series.
h.tombstones.DeleteTombstones(deleted)
@ -1869,9 +1869,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
// and there's no easy way to cast maps.
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) {
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
var (
deleted = map[storage.SeriesRef]struct{}{}
affected = map[labels.Label]struct{}{}
rmChunks = 0
actualMint int64 = math.MaxInt64
minOOOTime int64 = math.MaxInt64
@ -1927,6 +1928,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
}
deleted[storage.SeriesRef(series.ref)] = struct{}{}
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
s.hashes[hashShard].del(hash, series.ref)
delete(s.series[refShard], series.ref)
deletedForCallback[series.ref] = series.lset
@ -1938,7 +1940,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
actualMint = mint
}
return deleted, rmChunks, actualMint, minOOOTime, minMmapFile
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
}
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.

View file

@ -814,6 +814,80 @@ func TestHead_UnknownWALRecord(t *testing.T) {
require.NoError(t, head.Close())
}
// BenchmarkHead_Truncate is quite heavy, so consider running it with
// -benchtime=10x or similar to get more stable and comparable results.
func BenchmarkHead_Truncate(b *testing.B) {
const total = 1e6
prepare := func(b *testing.B, churn int) *Head {
h, _ := newTestHead(b, 1000, wlog.CompressionNone, false)
b.Cleanup(func() {
require.NoError(b, h.Close())
})
h.initTime(0)
internedItoa := map[int]string{}
var mtx sync.RWMutex
itoa := func(i int) string {
mtx.RLock()
s, ok := internedItoa[i]
mtx.RUnlock()
if ok {
return s
}
mtx.Lock()
s = strconv.Itoa(i)
internedItoa[i] = s
mtx.Unlock()
return s
}
allSeries := [total]labels.Labels{}
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]
// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))
if i%13 == 0 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i])
s.mmappedChunks = []*mmappedChunk{
{minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)},
}
}
return h
}
for _, churn := range []int{10, 100, 1000} {
b.Run(fmt.Sprintf("churn=%d", churn), func(b *testing.B) {
if b.N > total/churn {
// Just to make sure that benchmark still makes sense.
panic("benchmark not prepared")
}
h := prepare(b, churn)
b.ResetTimer()
for i := 0; i < b.N; i++ {
require.NoError(b, h.Truncate(1000*int64(i)))
// Make sure the benchmark is meaningful and it's actually truncating the expected amount of series.
require.Equal(b, total-churn*i, int(h.NumSeries()))
}
})
}
}
func TestHead_Truncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
defer func() {

View file

@ -288,89 +288,34 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
}
// Delete removes all ids in the given map from the postings lists.
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) {
// We will take an optimistic read lock for the entire method,
// and only lock for writing when we actually find something to delete.
//
// Each SeriesRef can appear in several Postings.
// To change each one, we need to know the label name and value that it is indexed under.
// We iterate over all label names, then for each name all values,
// and look for individual series to be deleted.
p.mtx.RLock()
defer p.mtx.RUnlock()
// affectedLabels contains all the labels that are affected by the deletion, there's no need to check other labels.
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) {
p.mtx.Lock()
defer p.mtx.Unlock()
// Collect all keys relevant for deletion once. New keys added afterwards
// can by definition not be affected by any of the given deletes.
keys := make([]string, 0, len(p.m))
maxVals := 0
for n := range p.m {
keys = append(keys, n)
if len(p.m[n]) > maxVals {
maxVals = len(p.m[n])
process := func(l labels.Label) {
orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
for _, id := range orig {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[l.Name][l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 {
delete(p.m, l.Name)
}
}
}
vals := make([]string, 0, maxVals)
for _, n := range keys {
// Copy the values and iterate the copy: if we unlock in the loop below,
// another goroutine might modify the map while we are part-way through it.
vals = vals[:0]
for v := range p.m[n] {
vals = append(vals, v)
}
// For each posting we first analyse whether the postings list is affected by the deletes.
// If no, we remove the label value from the vals list.
// This way we only need to Lock once later.
for i := 0; i < len(vals); {
found := false
refs := p.m[n][vals[i]]
for _, id := range refs {
if _, ok := deleted[id]; ok {
i++
found = true
break
}
}
if !found {
// Didn't match, bring the last value to this position, make the slice shorter and check again.
// The order of the slice doesn't matter as it comes from a map iteration.
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
}
}
// If no label values have deleted ids, just continue.
if len(vals) == 0 {
continue
}
// The only vals left here are the ones that contain deleted ids.
// Now we take the write lock and remove the ids.
p.mtx.RUnlock()
p.mtx.Lock()
for _, l := range vals {
repl := make([]storage.SeriesRef, 0, len(p.m[n][l]))
for _, id := range p.m[n][l] {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[n][l] = repl
} else {
delete(p.m[n], l)
}
}
// Delete the key if we removed all values.
if len(p.m[n]) == 0 {
delete(p.m, n)
}
p.mtx.Unlock()
p.mtx.RLock()
for l := range affected {
process(l)
}
process(allPostingsKey)
}
// Iter calls f for each postings list. It aborts if f returns an error and returns it.

View file

@ -979,9 +979,13 @@ func TestMemPostings_Delete(t *testing.T) {
p.Add(3, labels.FromStrings("lbl2", "a"))
before := p.Get(allPostingsKey.Name, allPostingsKey.Value)
p.Delete(map[storage.SeriesRef]struct{}{
deletedRefs := map[storage.SeriesRef]struct{}{
2: {},
})
}
affectedLabels := map[labels.Label]struct{}{
{Name: "lbl1", Value: "b"}: {},
}
p.Delete(deletedRefs, affectedLabels)
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
// Make sure postings gotten before the delete have the old data when
@ -1022,33 +1026,23 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
}
const total = 1e6
prepare := func() *MemPostings {
var ref storage.SeriesRef
next := func() storage.SeriesRef {
ref++
return ref
allSeries := [total]labels.Labels{}
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]
// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))
if i < 100 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}
p := NewMemPostings()
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]
// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))
if i < 100 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}
p.Add(next(), labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...))
}
return p
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
}
for _, refs := range []int{1, 100, 10_000} {
@ -1060,7 +1054,11 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
panic("benchmark not prepared")
}
p := prepare()
p := NewMemPostings()
for i := range allSeries {
p.Add(storage.SeriesRef(i), allSeries[i])
}
stop := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < reads; i++ {
@ -1086,11 +1084,16 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
deleted := map[storage.SeriesRef]struct{}{}
deleted := make(map[storage.SeriesRef]struct{}, refs)
affected := make(map[labels.Label]struct{}, refs)
for i := 0; i < refs; i++ {
deleted[storage.SeriesRef(n*refs+i)] = struct{}{}
ref := storage.SeriesRef(n*refs + i)
deleted[ref] = struct{}{}
allSeries[ref].Range(func(l labels.Label) {
affected[l] = struct{}{}
})
}
p.Delete(deleted)
p.Delete(deleted, affected)
}
})
}

View file

@ -1,6 +1,6 @@
{
"name": "@prometheus-io/codemirror-promql",
"version": "0.52.1",
"version": "0.53.0",
"description": "a CodeMirror mode for the PromQL language",
"types": "dist/esm/index.d.ts",
"module": "dist/esm/index.js",
@ -29,7 +29,7 @@
},
"homepage": "https://github.com/prometheus/prometheus/blob/main/web/ui/module/codemirror-promql/README.md",
"dependencies": {
"@prometheus-io/lezer-promql": "0.52.1",
"@prometheus-io/lezer-promql": "0.53.0",
"lru-cache": "^7.18.3"
},
"devDependencies": {

View file

@ -1,6 +1,6 @@
{
"name": "@prometheus-io/lezer-promql",
"version": "0.52.1",
"version": "0.53.0",
"description": "lezer-based PromQL grammar",
"main": "dist/index.cjs",
"type": "module",

View file

@ -1,12 +1,12 @@
{
"name": "prometheus-io",
"version": "0.52.1",
"version": "0.53.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "prometheus-io",
"version": "0.52.1",
"version": "0.53.0",
"workspaces": [
"react-app",
"module/*"
@ -30,10 +30,10 @@
},
"module/codemirror-promql": {
"name": "@prometheus-io/codemirror-promql",
"version": "0.52.1",
"version": "0.53.0",
"license": "Apache-2.0",
"dependencies": {
"@prometheus-io/lezer-promql": "0.52.1",
"@prometheus-io/lezer-promql": "0.53.0",
"lru-cache": "^7.18.3"
},
"devDependencies": {
@ -69,7 +69,7 @@
},
"module/lezer-promql": {
"name": "@prometheus-io/lezer-promql",
"version": "0.52.1",
"version": "0.53.0",
"license": "Apache-2.0",
"devDependencies": {
"@lezer/generator": "^1.7.0",
@ -19331,7 +19331,7 @@
},
"react-app": {
"name": "@prometheus-io/app",
"version": "0.52.1",
"version": "0.53.0",
"dependencies": {
"@codemirror/autocomplete": "^6.16.2",
"@codemirror/commands": "^6.6.0",
@ -19349,7 +19349,7 @@
"@lezer/lr": "^1.4.1",
"@nexucis/fuzzy": "^0.4.1",
"@nexucis/kvsearch": "^0.8.1",
"@prometheus-io/codemirror-promql": "0.52.1",
"@prometheus-io/codemirror-promql": "0.53.0",
"bootstrap": "^4.6.2",
"css.escape": "^1.5.1",
"downshift": "^9.0.6",

View file

@ -28,5 +28,5 @@
"ts-jest": "^29.1.4",
"typescript": "^4.9.5"
},
"version": "0.52.1"
"version": "0.53.0"
}

View file

@ -1,6 +1,6 @@
{
"name": "@prometheus-io/app",
"version": "0.52.1",
"version": "0.53.0",
"private": true,
"dependencies": {
"@codemirror/autocomplete": "^6.16.2",
@ -19,7 +19,7 @@
"@lezer/lr": "^1.4.1",
"@nexucis/fuzzy": "^0.4.1",
"@nexucis/kvsearch": "^0.8.1",
"@prometheus-io/codemirror-promql": "0.52.1",
"@prometheus-io/codemirror-promql": "0.53.0",
"bootstrap": "^4.6.2",
"css.escape": "^1.5.1",
"downshift": "^9.0.6",