From f802f1e8cae58c5f98fe825729cf834683ee3c7c Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 20 Mar 2020 09:34:15 -0700 Subject: [PATCH 1/5] Fix bug with WAL watcher and Live Reader metrics usage. (#6998) * Fix bug with WAL watcher and Live Reader metrics usage. Calling NewXMetrics when creating a Watcher or LiveReader results in a registration error, which we're ignoring, and as a result other than the first Watcher/Reader created, we had no metrics for either. So we would only have metrics like Watcher Records Read for the first remote write config in a users config file. Signed-off-by: Callum Styan --- storage/remote/queue_manager.go | 4 ++-- storage/remote/queue_manager_test.go | 25 +++++++++++++------------ storage/remote/write.go | 22 +++++++++++++--------- tsdb/wal/live_reader.go | 13 ++++++------- tsdb/wal/watcher.go | 26 ++++++++++++++------------ tsdb/wal/watcher_test.go | 14 +++++++------- 6 files changed, 55 insertions(+), 49 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a4384e03e..932406217 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -271,7 +271,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { +func NewQueueManager(metrics *queueManagerMetrics, watcherMetrics *wal.WatcherMetrics, readerMetrics *wal.LiveReaderMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { if logger == nil { logger = log.NewNopLogger() } @@ -301,7 +301,7 @@ func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, lo metrics: metrics, } - t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, client.Name(), t, walDir) + t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir) t.shards = t.newShards() return t diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e4917441f..74a7fddbc 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -61,7 +61,7 @@ func TestSampleDelivery(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) // These should be received by the client. @@ -90,7 +90,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -131,7 +131,7 @@ func TestSampleDeliveryOrder(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -150,7 +150,8 @@ func TestShutdown(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -188,7 +189,7 @@ func TestSeriesReset(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -218,7 +219,7 @@ func TestReshard(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -251,7 +252,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil) - m = NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() h.Unlock() h.Lock() @@ -269,7 +270,7 @@ func TestReshardRaceWithStop(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { metrics := newQueueManagerMetrics(nil) c := NewTestStorageClient() - m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() for i := 1; i < 1000; i++ { @@ -316,7 +317,7 @@ func TestCalculateDesiredsShards(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil) client := NewTestStorageClient() - m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m.numShards = c.startingShards m.samplesIn.incr(c.samplesIn) m.samplesOut.incr(c.samplesOut) @@ -527,7 +528,7 @@ func BenchmarkSampleDelivery(b *testing.B) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) // These should be received by the client. @@ -569,7 +570,7 @@ func BenchmarkStartup(b *testing.B) { for n := 0; n < b.N; n++ { metrics := newQueueManagerMetrics(nil) c := NewTestBlockedStorageClient() - m := NewQueueManager(nil, metrics, logger, dir, + m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) @@ -620,7 +621,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(nil, metrics, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual diff --git a/storage/remote/write.go b/storage/remote/write.go index 665eb2b07..5d77ec751 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/wal" ) var ( @@ -46,11 +47,12 @@ var ( // WriteStorage represents all the remote write storage. type WriteStorage struct { - reg prometheus.Registerer logger log.Logger mtx sync.Mutex queueMetrics *queueManagerMetrics + watcherMetrics *wal.WatcherMetrics + liveReaderMetrics *wal.LiveReaderMetrics configHash string externalLabelHash string walDir string @@ -65,13 +67,14 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string logger = log.NewNopLogger() } rws := &WriteStorage{ - queues: make(map[string]*QueueManager), - reg: reg, - queueMetrics: newQueueManagerMetrics(reg), - logger: logger, - flushDeadline: flushDeadline, - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), - walDir: walDir, + queues: make(map[string]*QueueManager), + queueMetrics: newQueueManagerMetrics(reg), + watcherMetrics: wal.NewWatcherMetrics(reg), + liveReaderMetrics: wal.NewLiveReaderMetrics(reg), + logger: logger, + flushDeadline: flushDeadline, + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + walDir: walDir, } go rws.run() return rws @@ -152,8 +155,9 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { return err } newQueues[hash] = NewQueueManager( - rws.reg, rws.queueMetrics, + rws.watcherMetrics, + rws.liveReaderMetrics, rws.logger, rws.walDir, rws.samplesIn, diff --git a/tsdb/wal/live_reader.go b/tsdb/wal/live_reader.go index 446e85994..7124f6408 100644 --- a/tsdb/wal/live_reader.go +++ b/tsdb/wal/live_reader.go @@ -28,14 +28,14 @@ import ( ) // liveReaderMetrics holds all metrics exposed by the LiveReader. -type liveReaderMetrics struct { +type LiveReaderMetrics struct { readerCorruptionErrors *prometheus.CounterVec } // NewLiveReaderMetrics instantiates, registers and returns metrics to be injected // at LiveReader instantiation. -func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { - m := &liveReaderMetrics{ +func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics { + m := &LiveReaderMetrics{ readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "prometheus_tsdb_wal_reader_corruption_errors_total", Help: "Errors encountered when reading the WAL.", @@ -43,15 +43,14 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { } if reg != nil { - // TODO(codesome): log error. - _ = reg.Register(m.readerCorruptionErrors) + reg.MustRegister(m.readerCorruptionErrors) } return m } // NewLiveReader returns a new live reader. -func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader { +func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader { lr := &LiveReader{ logger: logger, rdr: r, @@ -89,7 +88,7 @@ type LiveReader struct { // NB the non-ive Reader implementation allows for this. permissive bool - metrics *liveReaderMetrics + metrics *LiveReaderMetrics } // Err returns any errors encountered reading the WAL. io.EOFs are not terminal diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index f92386f0d..4a9e7f455 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -66,7 +66,7 @@ type Watcher struct { walDir string lastCheckpoint string metrics *WatcherMetrics - readerMetrics *liveReaderMetrics + readerMetrics *LiveReaderMetrics startTime time.Time startTimestamp int64 // the start time as a Prometheus timestamp @@ -125,17 +125,17 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } if reg != nil { - _ = reg.Register(m.recordsRead) - _ = reg.Register(m.recordDecodeFails) - _ = reg.Register(m.samplesSentPreTailing) - _ = reg.Register(m.currentSegment) + reg.MustRegister(m.recordsRead) + reg.MustRegister(m.recordDecodeFails) + reg.MustRegister(m.samplesSentPreTailing) + reg.MustRegister(m.currentSegment) } return m } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { +func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { if logger == nil { logger = log.NewNopLogger() } @@ -143,7 +143,7 @@ func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.L logger: logger, writer: writer, metrics: metrics, - readerMetrics: NewLiveReaderMetrics(reg), + readerMetrics: readerMetrics, walDir: path.Join(walDir, "wal"), name: name, quit: make(chan struct{}), @@ -179,11 +179,13 @@ func (w *Watcher) Stop() { <-w.done // Records read metric has series and samples. - w.metrics.recordsRead.DeleteLabelValues(w.name, "series") - w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") - w.metrics.recordDecodeFails.DeleteLabelValues(w.name) - w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) - w.metrics.currentSegment.DeleteLabelValues(w.name) + if w.metrics != nil { + w.metrics.recordsRead.DeleteLabelValues(w.name, "series") + w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") + w.metrics.recordDecodeFails.DeleteLabelValues(w.name) + w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) + w.metrics.currentSegment.DeleteLabelValues(w.name) + } level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) } diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index db8e3e89f..482d96551 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -138,7 +138,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. @@ -148,7 +148,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) defer segment.Close() - reader := NewLiveReader(nil, NewLiveReaderMetrics(prometheus.DefaultRegisterer), segment) + reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment) // Use tail true so we can ensure we got the right number of samples. watcher.readSegment(reader, i, true) } @@ -217,7 +217,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expected := seriesCount @@ -303,7 +303,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expected := seriesCount * 2 @@ -368,7 +368,7 @@ func TestReadCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expectedSeries := seriesCount @@ -439,7 +439,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -510,7 +510,7 @@ func TestCheckpointSeriesReset(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.MaxSegment = -1 go watcher.Start() From d6ad5551c9ed0fde4fc886ed61df8186fce43dd5 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Fri, 20 Mar 2020 17:43:26 +0100 Subject: [PATCH 2/5] Scrape: do not put staleness marker when cache is reused (#7011) * Scrape: do not put staleness marker when cache is reused Signed-off-by: Julien Pivotto --- scrape/scrape.go | 12 +++++++++++- scrape/scrape_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index fbabba247..98cea3292 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -314,6 +314,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { for fp, oldLoop := range sp.loops { var cache *scrapeCache if oc := oldLoop.getCache(); reuseCache && oc != nil { + oldLoop.disableEndOfRunStalenessMarkers() cache = oc } else { cache = newScrapeCache() @@ -593,6 +594,7 @@ type loop interface { run(interval, timeout time.Duration, errc chan<- error) stop() getCache() *scrapeCache + disableEndOfRunStalenessMarkers() } type cacheEntry struct { @@ -619,6 +621,8 @@ type scrapeLoop struct { ctx context.Context cancel func() stopped chan struct{} + + disabledEndOfRunStalenessMarkers bool } // scrapeCache tracks mappings of exposed metric strings to label sets and @@ -996,7 +1000,9 @@ mainLoop: close(sl.stopped) - sl.endOfRunStaleness(last, ticker, interval) + if !sl.disabledEndOfRunStalenessMarkers { + sl.endOfRunStaleness(last, ticker, interval) + } } func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) { @@ -1054,6 +1060,10 @@ func (sl *scrapeLoop) stop() { <-sl.stopped } +func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() { + sl.disabledEndOfRunStalenessMarkers = true +} + func (sl *scrapeLoop) getCache() *scrapeCache { return sl.cache } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 608c956b8..da4f6261d 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -141,6 +141,9 @@ func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { l.startFunc(interval, timeout, errc) } +func (l *testLoop) disableEndOfRunStalenessMarkers() { +} + func (l *testLoop) stop() { l.stopFunc() } @@ -1839,3 +1842,39 @@ func TestScrapeAddFast(t *testing.T) { _, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second)) testutil.Ok(t, err) } + +func TestReuseCacheRace(t *testing.T) { + var ( + app = &nopAppendable{} + cfg = &config.ScrapeConfig{ + JobName: "Prometheus", + ScrapeTimeout: model.Duration(5 * time.Second), + ScrapeInterval: model.Duration(5 * time.Second), + MetricsPath: "/metrics", + } + sp, _ = newScrapePool(cfg, app, 0, nil) + t1 = &Target{ + discoveredLabels: labels.Labels{ + labels.Label{ + Name: "labelNew", + Value: "nameNew", + }, + }, + } + ) + sp.sync([]*Target{t1}) + + start := time.Now() + for i := uint(1); i > 0; i++ { + if time.Since(start) > 5*time.Second { + break + } + sp.reload(&config.ScrapeConfig{ + JobName: "Prometheus", + ScrapeTimeout: model.Duration(1 * time.Millisecond), + ScrapeInterval: model.Duration(1 * time.Millisecond), + MetricsPath: "/metrics", + SampleLimit: i, + }) + } +} From c4eefd1b3aef4d2f557f67ffc7c192ba1a63937d Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 12 Mar 2020 09:36:09 +0000 Subject: [PATCH 3/5] storage: Removed SelectSorted method; Simplified interface; Added requirement for remote read to sort response. This is technically BREAKING CHANGE, but it was like this from the beginning: I just notice that we rely in Prometheus on remote read being sorted. This is because we use selected data from remote reads in MergeSeriesSet which rely on sorting. I found during work on https://github.com/prometheus/prometheus/pull/5882 that we do so many repetitions because of this, for not good reason. I think I found a good balance between convenience and readability with just one method. Smaller the interface = better. Also I don't know what TestSelectSorted was testing, but now it's testing sorting. Signed-off-by: Bartlomiej Plotka --- promql/engine.go | 20 ++++----- promql/engine_test.go | 30 ++++++-------- promql/test_test.go | 2 +- rules/manager.go | 2 +- rules/manager_test.go | 42 +++++++++---------- scrape/scrape_test.go | 6 +-- storage/fanout.go | 24 +++++------ storage/fanout/fanout_test.go | 2 +- storage/interface.go | 12 +++--- storage/noop.go | 6 +-- storage/remote/codec.go | 25 ++++++----- storage/remote/codec_test.go | 2 +- storage/remote/read.go | 26 ++++-------- storage/remote/read_test.go | 8 ++-- tsdb/block_test.go | 2 +- tsdb/cmd/tsdb/main.go | 2 +- tsdb/db_test.go | 30 +++++++------- tsdb/head_test.go | 16 +++---- tsdb/querier.go | 78 ++++++++++++----------------------- tsdb/querier_bench_test.go | 8 +--- tsdb/querier_test.go | 10 ++--- web/api/v1/api.go | 41 +++++++++--------- web/api/v1/api_test.go | 12 +++--- web/federate.go | 7 +--- 24 files changed, 182 insertions(+), 231 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index ba52501b4..bf55e6032 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -655,7 +655,7 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { var set storage.SeriesSet var wrn storage.Warnings - params := &storage.SelectParams{ + hints := &storage.SelectHints{ Start: timestamp.FromTime(s.Start), End: timestamp.FromTime(s.End), Step: durationToInt64Millis(s.Interval), @@ -667,29 +667,29 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s // from end also. subqOffset := ng.cumulativeSubqueryOffset(path) offsetMilliseconds := durationMilliseconds(subqOffset) - params.Start = params.Start - offsetMilliseconds + hints.Start = hints.Start - offsetMilliseconds switch n := node.(type) { case *parser.VectorSelector: if evalRange == 0 { - params.Start = params.Start - durationMilliseconds(ng.lookbackDelta) + hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta) } else { - params.Range = durationMilliseconds(evalRange) + hints.Range = durationMilliseconds(evalRange) // For all matrix queries we want to ensure that we have (end-start) + range selected // this way we have `range` data before the start time - params.Start = params.Start - durationMilliseconds(evalRange) + hints.Start = hints.Start - durationMilliseconds(evalRange) evalRange = 0 } - params.Func = extractFuncFromPath(path) - params.By, params.Grouping = extractGroupsFromPath(path) + hints.Func = extractFuncFromPath(path) + hints.By, hints.Grouping = extractGroupsFromPath(path) if n.Offset > 0 { offsetMilliseconds := durationMilliseconds(n.Offset) - params.Start = params.Start - offsetMilliseconds - params.End = params.End - offsetMilliseconds + hints.Start = hints.Start - offsetMilliseconds + hints.End = hints.End - offsetMilliseconds } - set, wrn, err = querier.Select(params, n.LabelMatchers...) + set, wrn, err = querier.Select(false, hints, n.LabelMatchers...) warnings = append(warnings, wrn...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) diff --git a/promql/engine_test.go b/promql/engine_test.go index e75c27bca..96204b392 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -175,15 +175,12 @@ type errQuerier struct { err error } -func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return errSeriesSet{err: q.err}, nil, q.err } -func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return errSeriesSet{err: q.err}, nil, q.err -} -func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*errQuerier) Close() error { return nil } +func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*errQuerier) Close() error { return nil } // errSeriesSet implements storage.SeriesSet which always returns error. type errSeriesSet struct { @@ -224,9 +221,9 @@ func TestQueryError(t *testing.T) { testutil.Equals(t, errStorage, res.Err) } -// paramCheckerQuerier implements storage.Querier which checks the start and end times -// in params. -type paramCheckerQuerier struct { +// hintCheckerQuerier implements storage.Querier which checks the start and end times +// in hints. +type hintCheckerQuerier struct { start int64 end int64 grouping []string @@ -237,10 +234,7 @@ type paramCheckerQuerier struct { t *testing.T } -func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.SelectSorted(sp, m...) -} -func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *hintCheckerQuerier) Select(_ bool, sp *storage.SelectHints, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { testutil.Equals(q.t, q.start, sp.Start) testutil.Equals(q.t, q.end, sp.End) testutil.Equals(q.t, q.grouping, sp.Grouping) @@ -250,11 +244,11 @@ func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*label return errSeriesSet{err: nil}, nil, nil } -func (*paramCheckerQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { +func (*hintCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*paramCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*paramCheckerQuerier) Close() error { return nil } +func (*hintCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*hintCheckerQuerier) Close() error { return nil } func TestParamsSetCorrectly(t *testing.T) { opts := EngineOpts{ @@ -435,7 +429,7 @@ func TestParamsSetCorrectly(t *testing.T) { for _, tc := range cases { engine := NewEngine(opts) queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return ¶mCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil + return &hintCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil }) var ( diff --git a/promql/test_test.go b/promql/test_test.go index f03f6d203..7de4387f5 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -133,7 +133,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { } // Get the series for the matcher. - ss, _, err := querier.Select(nil, matchers...) + ss, _, err := querier.Select(false, nil, matchers...) testutil.Ok(t, err) testutil.Assert(t, ss.Next(), "") storageSeries := ss.At() diff --git a/rules/manager.go b/rules/manager.go index 425fb01a8..383d38327 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -707,7 +707,7 @@ func (g *Group) RestoreForState(ts time.Time) { matchers = append(matchers, mt) } - sset, err, _ := q.Select(nil, matchers...) + sset, err, _ := q.Select(false, nil, matchers...) if err != nil { level.Error(g.logger).Log("msg", "Failed to restore 'for' state", labels.AlertName, alertRule.Name(), "stage", "Select", "err", err) diff --git a/rules/manager_test.go b/rules/manager_test.go index ac74f9ea0..4d26885c8 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -512,8 +512,8 @@ func TestForStateRestore(t *testing.T) { } func TestStaleness(t *testing.T) { - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() engineOpts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -522,9 +522,9 @@ func TestStaleness(t *testing.T) { } engine := promql.NewEngine(engineOpts) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(engine, storage), - Appendable: storage, - TSDB: storage, + QueryFunc: EngineQueryFunc(engine, st), + Appendable: st, + TSDB: st, Context: context.Background(), Logger: log.NewNopLogger(), } @@ -541,7 +541,7 @@ func TestStaleness(t *testing.T) { }) // A time series that has two samples and then goes stale. - app := storage.Appender() + app := st.Appender() app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) @@ -556,14 +556,14 @@ func TestStaleness(t *testing.T) { group.Eval(ctx, time.Unix(1, 0)) group.Eval(ctx, time.Unix(2, 0)) - querier, err := storage.Querier(context.Background(), 0, 2000) + querier, err := st.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(false, nil, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) @@ -658,8 +658,8 @@ func TestCopyState(t *testing.T) { } func TestDeletedRuleMarkedStale(t *testing.T) { - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() oldGroup := &Group{ rules: []Rule{ NewRecordingRule("rule1", nil, labels.Labels{{Name: "l1", Value: "v1"}}), @@ -672,21 +672,21 @@ func TestDeletedRuleMarkedStale(t *testing.T) { rules: []Rule{}, seriesInPreviousEval: []map[string]labels.Labels{}, opts: &ManagerOptions{ - Appendable: storage, + Appendable: st, }, } newGroup.CopyState(oldGroup) newGroup.Eval(context.Background(), time.Unix(0, 0)) - querier, err := storage.Querier(context.Background(), 0, 2000) + querier, err := st.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(false, nil, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) @@ -704,8 +704,8 @@ func TestUpdate(t *testing.T) { expected := map[string]labels.Labels{ "test": labels.FromStrings("name", "value"), } - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -714,9 +714,9 @@ func TestUpdate(t *testing.T) { } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ - Appendable: storage, - TSDB: storage, - QueryFunc: EngineQueryFunc(engine, storage), + Appendable: st, + TSDB: st, + QueryFunc: EngineQueryFunc(engine, st), Context: context.Background(), Logger: log.NewNopLogger(), }) @@ -1096,16 +1096,16 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine") } -func countStaleNaN(t *testing.T, storage storage.Storage) int { +func countStaleNaN(t *testing.T, st storage.Storage) int { var c int - querier, err := storage.Querier(context.Background(), 0, time.Now().Unix()*1000) + querier, err := st.Querier(context.Background(), 0, time.Now().Unix()*1000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(false, nil, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index da4f6261d..1d3864bb9 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1574,7 +1574,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) testutil.Ok(t, err) testutil.Equals(t, false, series.Next(), "series found in tsdb") @@ -1584,7 +1584,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) + series, _, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) testutil.Ok(t, err) testutil.Equals(t, true, series.Next(), "series not found in tsdb") testutil.Equals(t, false, series.Next(), "more than one series found in tsdb") @@ -1620,7 +1620,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) testutil.Ok(t, err) testutil.Equals(t, false, series.Next(), "series found in tsdb") } diff --git a/storage/fanout.go b/storage/fanout.go index f55f4c8dc..2366fb272 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -221,20 +221,16 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { } // Select returns a set of series that matches the given label matchers. -func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { - if len(q.queriers) != 1 { - // We need to sort for NewMergeSeriesSet to work. - return q.SelectSorted(params, matchers...) +func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { + if len(q.queriers) == 1 { + return q.queriers[0].Select(sortSeries, hints, matchers...) } - return q.queriers[0].Select(params, matchers...) -} -// SelectSorted returns a set of sorted series that matches the given label matchers. -func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { - seriesSets := make([]SeriesSet, 0, len(q.queriers)) - var warnings Warnings - - var priErr error = nil + var ( + seriesSets = make([]SeriesSet, 0, len(q.queriers)) + warnings Warnings + priErr error + ) type queryResult struct { qr Querier set SeriesSet @@ -242,9 +238,11 @@ func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Ma selectError error } queryResultChan := make(chan *queryResult) + for _, querier := range q.queriers { go func(qr Querier) { - set, wrn, err := qr.SelectSorted(params, matchers...) + // We need to sort for NewMergeSeriesSet to work. + set, wrn, err := qr.Select(true, hints, matchers...) queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err} }(querier) } diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index 7afb04202..0554444fd 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -79,7 +79,7 @@ func TestSelectSorted(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") testutil.Ok(t, err) - seriesSet, _, err := querier.SelectSorted(nil, matcher) + seriesSet, _, err := querier.Select(true, nil, matcher) testutil.Ok(t, err) result := make(map[int64]float64) diff --git a/storage/interface.go b/storage/interface.go index e85a159e4..4ac799ebb 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -60,10 +60,9 @@ type Queryable interface { // time range. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) - - // SelectSorted returns a sorted set of series that matches the given label matchers. - SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) + // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. + // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. + Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) // LabelValues returns all potential values for a label name. // It is not safe to use the strings beyond the lifefime of the querier. @@ -76,8 +75,9 @@ type Querier interface { Close() error } -// SelectParams specifies parameters passed to data selections. -type SelectParams struct { +// SelectHints specifies hints passed for data selections. +// This is used only as an option for implementation to use. +type SelectHints struct { Start int64 // Start time in milliseconds for this select. End int64 // End time in milliseconds for this select. diff --git a/storage/noop.go b/storage/noop.go index a8be634fd..4c0383233 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -24,11 +24,7 @@ func NoopQuerier() Querier { return noopQuerier{} } -func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { - return NoopSeriesSet(), nil, nil -} - -func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { +func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (SeriesSet, Warnings, error) { return NoopSeriesSet(), nil, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 8f07eb0d1..3b6556a12 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -79,22 +79,22 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error } // ToQuery builds a Query proto. -func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error) { +func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error) { ms, err := toLabelMatchers(matchers) if err != nil { return nil, err } var rp *prompb.ReadHints - if p != nil { + if hints != nil { rp = &prompb.ReadHints{ - StepMs: p.Step, - Func: p.Func, - StartMs: p.Start, - EndMs: p.End, - Grouping: p.Grouping, - By: p.By, - RangeMs: p.Range, + StartMs: hints.Start, + EndMs: hints.End, + StepMs: hints.Step, + Func: hints.Func, + Grouping: hints.Grouping, + By: hints.By, + RangeMs: hints.Range, } } @@ -145,7 +145,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, } // FromQueryResult unpacks and sorts a QueryResult proto. -func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { +func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet { series := make([]storage.Series, 0, len(res.Timeseries)) for _, ts := range res.Timeseries { labels := labelProtosToLabels(ts.Labels) @@ -158,7 +158,10 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { samples: ts.Samples, }) } - sort.Sort(byLabel(series)) + + if sortSeries { + sort.Sort(byLabel(series)) + } return &concreteSeriesSet{ series: series, } diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 9f7b2c218..d694b487c 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -178,7 +178,7 @@ func TestFromQueryResultWithDuplicates(t *testing.T) { }, } - series := FromQueryResult(&res) + series := FromQueryResult(false, &res) errSeries, isErrSeriesSet := series.(errSeriesSet) diff --git a/storage/remote/read.go b/storage/remote/read.go index 3e5c9573c..b03bb8869 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -57,16 +57,9 @@ type querier struct { client *Client } -// Select implements storage.Querier and uses the given matchers to read series -// sets from the Client. -func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.SelectSorted(p, matchers...) -} - -// SelectSorted implements storage.Querier and uses the given matchers to read series -// sets from the Client. -func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - query, err := ToQuery(q.mint, q.maxt, matchers, p) +// Select implements storage.Querier and uses the given matchers to read series sets from the Client. +func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + query, err := ToQuery(q.mint, q.maxt, matchers, hints) if err != nil { return nil, nil, err } @@ -80,12 +73,11 @@ func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matc return nil, nil, fmt.Errorf("remote_read: %v", err) } - // FromQueryResult sorts. - return FromQueryResult(res), nil, nil + return FromQueryResult(sortSeries, res), nil, nil } // LabelValues implements storage.Querier and is a noop. -func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) { +func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) { // TODO implement? return nil, nil, nil } @@ -124,9 +116,9 @@ type externalLabelsQuerier struct { // Select adds equality matchers for all external labels to the list of matchers // before calling the wrapped storage.Queryable. The added external labels are // removed from the returned series sets. -func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q externalLabelsQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { m, added := q.addExternalLabels(matchers) - s, warnings, err := q.Querier.Select(p, m...) + s, warnings, err := q.Querier.Select(sortSeries, hints, m...) if err != nil { return nil, warnings, err } @@ -177,7 +169,7 @@ type requiredMatchersQuerier struct { // Select returns a NoopSeriesSet if the given matchers don't match the label // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. -func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { ms := q.requiredMatchers for _, m := range matchers { for i, r := range ms { @@ -193,7 +185,7 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la if len(ms) > 0 { return storage.NoopSeriesSet(), nil, nil } - return q.Querier.Select(p, matchers...) + return q.Querier.Select(sortSeries, hints, matchers...) } // addExternalLabels adds matchers for each external label. External labels diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 28dfa79fb..0706d7bff 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -117,7 +117,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) { }, } want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) - have, _, err := q.Select(nil, matchers...) + have, _, err := q.Select(false, nil, matchers...) if err != nil { t.Error(err) } @@ -219,7 +219,7 @@ func TestSeriesSetFilter(t *testing.T) { } for i, tc := range tests { - filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove) + filtered := newSeriesSetFilter(FromQueryResult(true, tc.in), tc.toRemove) have, err := ToQueryResult(filtered, 1e6) if err != nil { t.Fatal(err) @@ -242,7 +242,7 @@ type mockSeriesSet struct { storage.SeriesSet } -func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return mockSeriesSet{}, nil, nil } @@ -398,7 +398,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) { requiredMatchers: test.requiredMatchers, } - have, _, err := q.Select(nil, test.matchers...) + have, _, err := q.Select(false, nil, test.matchers...) if err != nil { t.Error(err) } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index d1ef6c524..f812d2970 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) { querier, err := NewBlockQuerier(b, 0, 1) testutil.Ok(t, err) defer func() { testutil.Ok(t, querier.Close()) }() - set, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + set, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 458ed5351..3b2ebf296 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -617,7 +617,7 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) { err = merr.Err() }() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) if err != nil { return err } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index e824b0c40..545473f82 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -67,7 +67,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func() // query runs a matcher query against the querier and fully expands its data. func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample { - ss, ws, err := q.Select(nil, matchers...) + ss, ws, err := q.Select(false, nil, matchers...) defer func() { testutil.Ok(t, q.Close()) }() @@ -315,7 +315,7 @@ Outer: q, err := db.Querier(context.TODO(), 0, numSamples) testutil.Ok(t, err) - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -491,7 +491,7 @@ func TestDB_Snapshot(t *testing.T) { defer func() { testutil.Ok(t, querier.Close()) }() // sum values - seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -546,7 +546,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { defer func() { testutil.Ok(t, querier.Close()) }() // Sum values. - seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -618,7 +618,7 @@ Outer: testutil.Ok(t, err) defer func() { testutil.Ok(t, q.Close()) }() - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -792,7 +792,7 @@ func TestDB_e2e(t *testing.T) { q, err := db.Querier(context.TODO(), mint, maxt) testutil.Ok(t, err) - ss, ws, err := q.Select(nil, qry.ms...) + ss, ws, err := q.Select(false, nil, qry.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -951,7 +951,7 @@ func TestTombstoneClean(t *testing.T) { testutil.Ok(t, err) defer q.Close() - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1289,7 +1289,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { defer func() { testutil.Ok(t, q.Close()) }() for _, c := range cases { - ss, ws, err := q.Select(nil, c.selector...) + ss, ws, err := q.Select(false, nil, c.selector...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -2486,7 +2486,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { defer func() { testutil.Ok(t, querier.Close()) }() // Sum the values. - seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) + seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -2551,7 +2551,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) { testutil.Ok(t, err) defer querier.Close() - ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err := expandSeriesSet(ss) @@ -2591,13 +2591,13 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { defer querierAfterAddButBeforeCommit.Close() // None of the queriers should return anything after the Add but before the commit. - ss, _, err := querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err := querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err := expandSeriesSet(ss) testutil.Ok(t, err) testutil.Equals(t, map[string][]sample{}, seriesSet) - ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err = expandSeriesSet(ss) testutil.Ok(t, err) @@ -2608,14 +2608,14 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { testutil.Ok(t, err) // Nothing returned for querier created before the Add. - ss, _, err = querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err = querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err = expandSeriesSet(ss) testutil.Ok(t, err) testutil.Equals(t, map[string][]sample{}, seriesSet) // Series exists but has no samples for querier created after Add. - ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err = expandSeriesSet(ss) testutil.Ok(t, err) @@ -2626,7 +2626,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { defer querierAfterCommit.Close() // Samples are returned for querier created after Commit. - ss, _, err = querierAfterCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err = querierAfterCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err = expandSeriesSet(ss) testutil.Ok(t, err) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f7aa9975c..bea95271a 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -566,7 +566,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, h := range []*Head{head, reloadedHead} { q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) testutil.Ok(t, err) - actSeriesSet, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) + actSeriesSet, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -613,7 +613,7 @@ func TestDeleteUntilCurMax(t *testing.T) { // Test the series returns no samples. The series is cleared only after compaction. q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) testutil.Assert(t, res.Next(), "series is not present") @@ -628,7 +628,7 @@ func TestDeleteUntilCurMax(t *testing.T) { testutil.Ok(t, app.Commit()) q, err = NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) - res, ws, err = q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) testutil.Assert(t, res.Next(), "series don't exist") @@ -804,7 +804,7 @@ func TestDelete_e2e(t *testing.T) { q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.SelectSorted(nil, del.ms...) + ss, ws, err := q.Select(true, nil, del.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) // Build the mockSeriesSet. @@ -1092,7 +1092,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1120,7 +1120,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1405,7 +1405,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) { seriesCount := 0 samplesCount := 0 - ss, _, err := q.Select(nil, matcher) + ss, _, err := q.Select(false, nil, matcher) testutil.Ok(t, err) for ss.Next() { i := ss.At().Iterator() @@ -1445,7 +1445,7 @@ func TestMemSeriesIsolation(t *testing.T) { testutil.Ok(t, err) defer querier.Close() - ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err := expandSeriesSet(ss) diff --git a/tsdb/querier.go b/tsdb/querier.go index 83f7b6ad6..ea1100a52 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -85,23 +85,21 @@ func (q *querier) lvals(qs []storage.Querier, n string) ([]string, storage.Warni return mergeStrings(s1, s2), ws, nil } -func (q *querier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - if len(q.blocks) != 1 { - return q.SelectSorted(p, ms...) - } - // Sorting Head series is slow, and unneeded when only the - // Head is being queried. Sorting blocks is a noop. - return q.blocks[0].Select(p, ms...) -} - -func (q *querier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { if len(q.blocks) == 0 { return storage.EmptySeriesSet(), nil, nil } + if len(q.blocks) == 1 { + // Sorting Head series is slow, and unneeded when only the + // Head is being queried. + return q.blocks[0].Select(sortSeries, hints, ms...) + } + ss := make([]storage.SeriesSet, len(q.blocks)) var ws storage.Warnings for i, b := range q.blocks { - s, w, err := b.SelectSorted(p, ms...) + // We have to sort if blocks > 1 as MergedSeriesSet requires it. + s, w, err := b.Select(true, hints, ms...) ws = append(ws, w...) if err != nil { return nil, ws, err @@ -127,30 +125,26 @@ type verticalQuerier struct { querier } -func (q *verticalQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.sel(p, q.blocks, ms) +func (q *verticalQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.sel(sortSeries, hints, q.blocks, ms) } -func (q *verticalQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.sel(p, q.blocks, ms) -} - -func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *verticalQuerier) sel(sortSeries bool, hints *storage.SelectHints, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { if len(qs) == 0 { return storage.EmptySeriesSet(), nil, nil } if len(qs) == 1 { - return qs[0].SelectSorted(p, ms...) + return qs[0].Select(sortSeries, hints, ms...) } l := len(qs) / 2 var ws storage.Warnings - a, w, err := q.sel(p, qs[:l], ms) + a, w, err := q.sel(sortSeries, hints, qs[:l], ms) ws = append(ws, w...) if err != nil { return nil, ws, err } - b, w, err := q.sel(p, qs[l:], ms) + b, w, err := q.sel(sortSeries, hints, qs[l:], ms) ws = append(ws, w...) if err != nil { return nil, ws, err @@ -195,42 +189,24 @@ type blockQuerier struct { mint, maxt int64 } -func (q *blockQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - base, err := LookupChunkSeries(q.index, q.tombstones, ms...) +func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + var base storage.ChunkSeriesSet + var err error + + if sortSeries { + base, err = LookupChunkSeriesSorted(q.index, q.tombstones, ms...) + } else { + base, err = LookupChunkSeries(q.index, q.tombstones, ms...) + } if err != nil { return nil, nil, err } mint := q.mint maxt := q.maxt - if p != nil { - mint = p.Start - maxt = p.End - } - return &blockSeriesSet{ - set: &populatedChunkSeries{ - set: base, - chunks: q.chunks, - mint: mint, - maxt: maxt, - }, - - mint: mint, - maxt: maxt, - }, nil, nil -} - -func (q *blockQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...) - if err != nil { - return nil, nil, err - } - - mint := q.mint - maxt := q.maxt - if p != nil { - mint = p.Start - maxt = p.End + if hints != nil { + mint = hints.Start + maxt = hints.End } return &blockSeriesSet{ set: &populatedChunkSeries{ diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 765435203..95502be95 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -22,7 +22,6 @@ import ( "testing" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/testutil" ) @@ -147,12 +146,7 @@ func BenchmarkQuerierSelect(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - var ss storage.SeriesSet - if sorted { - ss, _, err = q.SelectSorted(nil, matcher) - } else { - ss, _, err = q.Select(nil, matcher) - } + ss, _, err := q.Select(sorted, nil, matcher) testutil.Ok(b, err) for ss.Next() { } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 52e316051..9668a21fe 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -373,7 +373,7 @@ Outer: maxt: c.maxt, } - res, ws, err := querier.Select(nil, c.ms...) + res, ws, err := querier.Select(false, nil, c.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -536,7 +536,7 @@ Outer: maxt: c.maxt, } - res, ws, err := querier.Select(nil, c.ms...) + res, ws, err := querier.Select(false, nil, c.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1710,7 +1710,7 @@ func BenchmarkQuerySeek(b *testing.B) { b.ResetTimer() b.ReportAllocs() - ss, ws, err := sq.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + ss, ws, err := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) for ss.Next() { it := ss.At().Iterator() for t := mint; t <= maxt; t++ { @@ -1848,7 +1848,7 @@ func BenchmarkSetMatcher(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - _, ws, err := que.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) + _, ws, err := que.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) testutil.Ok(b, err) testutil.Equals(b, 0, len(ws)) } @@ -2297,7 +2297,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - ss, ws, err := q.Select(nil, selectors...) + ss, ws, err := q.Select(false, nil, selectors...) testutil.Ok(b, err) testutil.Equals(b, 0, len(ws)) var actualExpansions int diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 5ea367fa2..204b2ff38 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -531,7 +531,7 @@ func (api *API) series(r *http.Request) apiFuncResult { var sets []storage.SeriesSet var warnings storage.Warnings for _, mset := range matcherSets { - s, wrn, err := q.Select(nil, mset...) //TODO + s, wrn, err := q.Select(false, nil, mset...) warnings = append(warnings, wrn...) if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil} @@ -1161,10 +1161,9 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { return } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { - // The streaming API provides sorted series. - // TODO(bwplotka): Handle warnings via query log. - set, _, err := querier.SelectSorted(selectParams, filteredMatchers...) + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error { + // The streaming API has to provide the series sorted. + set, _, err := querier.Select(true, hints, filteredMatchers...) if err != nil { return err } @@ -1195,8 +1194,8 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { Results: make([]*prompb.QueryResult, len(req.Queries)), } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { - set, _, err := querier.Select(selectParams, filteredMatchers...) + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error { + set, _, err := querier.Select(false, hints, filteredMatchers...) if err != nil { return err } @@ -1254,7 +1253,7 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe return filteredMatchers, nil } -func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error) error { +func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error) error { filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) if err != nil { return err @@ -1264,23 +1263,25 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern if err != nil { return err } - - var selectParams *storage.SelectParams - if query.Hints != nil { - selectParams = &storage.SelectParams{ - Start: query.Hints.StartMs, - End: query.Hints.EndMs, - Step: query.Hints.StepMs, - Func: query.Hints.Func, - } - } - defer func() { if err := querier.Close(); err != nil { level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error()) } }() - return seriesHandleFn(querier, selectParams, filteredMatchers) + + var hints *storage.SelectHints + if query.Hints != nil { + hints = &storage.SelectHints{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + Grouping: query.Hints.Grouping, + Range: query.Hints.RangeMs, + By: query.Hints.By, + } + } + return seriesHandleFn(querier, hints, filteredMatchers) } func (api *API) deleteSeries(r *http.Request) apiFuncResult { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index bfac2d4f1..2c8a7c6ac 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -488,9 +488,9 @@ func setupRemote(s storage.Storage) *httptest.Server { return } - var selectParams *storage.SelectParams + var hints *storage.SelectHints if query.Hints != nil { - selectParams = &storage.SelectParams{ + hints = &storage.SelectHints{ Start: query.Hints.StartMs, End: query.Hints.EndMs, Step: query.Hints.StepMs, @@ -505,7 +505,7 @@ func setupRemote(s storage.Storage) *httptest.Server { } defer querier.Close() - set, _, err := querier.Select(selectParams, matchers...) + set, _, err := querier.Select(false, hints, matchers...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -1615,7 +1615,7 @@ func TestSampledReadEndpoint(t *testing.T) { matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") testutil.Ok(t, err) - query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) + query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"}) testutil.Ok(t, err) req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} @@ -1714,7 +1714,7 @@ func TestStreamReadEndpoint(t *testing.T) { matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1") testutil.Ok(t, err) - query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{ + query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{ Step: 1, Func: "avg", Start: 0, @@ -1722,7 +1722,7 @@ func TestStreamReadEndpoint(t *testing.T) { }) testutil.Ok(t, err) - query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{ + query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectHints{ Step: 1, Func: "avg", Start: 0, diff --git a/web/federate.go b/web/federate.go index e358a3dab..93c9aece3 100644 --- a/web/federate.go +++ b/web/federate.go @@ -81,14 +81,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { vec := make(promql.Vector, 0, 8000) - params := &storage.SelectParams{ - Start: mint, - End: maxt, - } + hints := &storage.SelectHints{Start: mint, End: maxt} var sets []storage.SeriesSet for _, mset := range matcherSets { - s, wrns, err := q.Select(params, mset...) + s, wrns, err := q.Select(false, hints, mset...) if wrns != nil { level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns) federationWarnings.Add(float64(len(wrns))) From 526cff39b9dad6bbb5029c5feb7c48ed9cdfe496 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 19 Mar 2020 16:28:23 +0100 Subject: [PATCH 4/5] Fix tests that were broken by #7009 Signed-off-by: beorn7 --- storage/fanout/fanout_test.go | 8 ++------ tsdb/head_test.go | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index 0554444fd..d62b6f5e1 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -131,7 +131,7 @@ func TestFanoutErrors(t *testing.T) { defer querier.Close() matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") - ss, warnings, err := querier.SelectSorted(nil, matcher) + ss, warnings, err := querier.Select(true, nil, matcher) testutil.Equals(t, tc.err, err) testutil.Equals(t, tc.warnings, warnings) @@ -169,11 +169,7 @@ func (errStorage) Close() error { type errQuerier struct{} -func (errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return nil, nil, errSelect -} - -func (errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return nil, nil, errSelect } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index bea95271a..57e5c3a92 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1630,7 +1630,7 @@ func testHeadSeriesChunkRace(t *testing.T) { h.gc() wg.Done() }() - ss, _, err := q.Select(nil, matcher) + ss, _, err := q.Select(false, nil, matcher) testutil.Ok(t, err) testutil.Ok(t, ss.Err()) wg.Wait() From d47bdb9d12c086ce9bb8ba1ef7a2efe1a9f0884d Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Sat, 21 Mar 2020 20:04:16 +0100 Subject: [PATCH 5/5] Release 2.17.0-rc.4 (#7016) Signed-off-by: Julien Pivotto --- CHANGELOG.md | 5 ++++- VERSION | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dcfcadfc..2e467e5e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 2.17.0-rc.3 / 2020-03-18 +## 2.17.0-rc.4 / 2020-03-21 This release implements isolation in TSDB. API queries and recording rules are guaranteed to only see full scrapes and full recording rules. This comes with a @@ -20,8 +20,11 @@ some increase in memory usage, CPU usage, or query latency. * [BUGFIX] PromQL: Do not escape HTML-like chars in query log #6834 #6795 * [BUGFIX] React UI: Fix data table matrix values #6896 * [BUGFIX] React UI: Fix new targets page not loading when using non-ASCII characters #6892 +* [BUGFIX] Remote read: Fix duplication of metrics read from remote storage with external labels #6967 #7018 +* [BUGFIX] Remote write: Register WAL watcher and live reader metrics for all remotes, not just the first one #6998 * [BUGFIX] Scrape: Prevent removal of metric names upon relabeling #6891 * [BUGFIX] Scrape: Fix 'superfluous response.WriteHeader call' errors when scrape fails under some circonstances #6986 +* [BUGFIX] Scrape: Fix crash when reloads are separated by two scrape intervals #7011 ## 2.16.0 / 2020-02-13 diff --git a/VERSION b/VERSION index 9dba67016..f14ff579c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.17.0-rc.3 +2.17.0-rc.4