Merge pull request #1805 from prometheus/higher-level-storage-interface

Make the storage interface higher-level.
This commit is contained in:
Fabian Reinartz 2016-08-05 16:17:14 +02:00 committed by GitHub
commit 70490fe568
14 changed files with 445 additions and 513 deletions

View file

@ -1,181 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promql
import (
"errors"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local"
)
// An Analyzer traverses an expression and determines which data has to be requested
// from the storage. It is bound to a context that allows cancellation and timing out.
type Analyzer struct {
// The storage from which to query data.
Storage local.Querier
// The expression being analyzed.
Expr Expr
// The time range for evaluation of Expr.
Start, End model.Time
// The preload times for different query time offsets.
offsetPreloadTimes map[time.Duration]preloadTimes
}
// preloadTimes tracks which instants or ranges to preload for a set of
// fingerprints. One of these structs is collected for each offset by the query
// analyzer.
type preloadTimes struct {
// Ranges require loading a range of samples. They can be triggered by
// two type of expressions: First a range expression AKA matrix
// selector, where the Duration in the ranges map is the length of the
// range in the range expression. Second an instant expression AKA
// vector selector, where the Duration in the ranges map is the
// StalenessDelta. In preloading, both types of expressions result in
// the same effect: Preload everything between the specified start time
// minus the Duration in the ranges map up to the specified end time.
ranges map[model.Fingerprint]time.Duration
// Instants require a single sample to be loaded. This only happens for
// instant expressions AKA vector selectors iff the specified start ond
// end time are the same, Thus, instants is only populated if start and
// end time are the same.
instants map[model.Fingerprint]struct{}
}
// Analyze the provided expression and attach metrics and fingerprints to data-selecting
// AST nodes that are later used to preload the data from the storage.
func (a *Analyzer) Analyze(ctx context.Context) error {
a.offsetPreloadTimes = map[time.Duration]preloadTimes{}
getPreloadTimes := func(offset time.Duration) preloadTimes {
if pt, ok := a.offsetPreloadTimes[offset]; ok {
return pt
}
pt := preloadTimes{
instants: map[model.Fingerprint]struct{}{},
ranges: map[model.Fingerprint]time.Duration{},
}
a.offsetPreloadTimes[offset] = pt
return pt
}
// Retrieve fingerprints and metrics for the required time range for
// each metric or matrix selector node.
Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
n.metrics = a.Storage.MetricsForLabelMatchers(
a.Start.Add(-n.Offset-StalenessDelta), a.End.Add(-n.Offset),
n.LabelMatchers...,
)
n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics))
pt := getPreloadTimes(n.Offset)
for fp := range n.metrics {
r, alreadyInRanges := pt.ranges[fp]
if a.Start.Equal(a.End) && !alreadyInRanges {
// A true instant, we only need one value.
pt.instants[fp] = struct{}{}
continue
}
if r < StalenessDelta {
pt.ranges[fp] = StalenessDelta
}
}
case *MatrixSelector:
n.metrics = a.Storage.MetricsForLabelMatchers(
a.Start.Add(-n.Offset-n.Range), a.End.Add(-n.Offset),
n.LabelMatchers...,
)
n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics))
pt := getPreloadTimes(n.Offset)
for fp := range n.metrics {
if pt.ranges[fp] < n.Range {
pt.ranges[fp] = n.Range
// Delete the fingerprint from the instants. Ranges always contain more
// points and span more time than instants, so we don't need to track
// an instant for the same fingerprint, should we have one.
delete(pt.instants, fp)
}
}
}
return true
})
// Currently we do not return an error but we might place a context check in here
// or extend the stage in some other way.
return nil
}
// Prepare the expression evaluation by preloading all required chunks from the storage
// and setting the respective storage iterators in the AST nodes.
func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
const env = "query preparation"
if a.offsetPreloadTimes == nil {
return nil, errors.New("analysis must be performed before preparing query")
}
var err error
// The preloader must not be closed unless an error occured as closing
// unpins the preloaded chunks.
p := a.Storage.NewPreloader()
defer func() {
if err != nil {
p.Close()
}
}()
// Preload all analyzed ranges.
iters := map[time.Duration]map[model.Fingerprint]local.SeriesIterator{}
for offset, pt := range a.offsetPreloadTimes {
itersForDuration := map[model.Fingerprint]local.SeriesIterator{}
iters[offset] = itersForDuration
start := a.Start.Add(-offset)
end := a.End.Add(-offset)
for fp, rangeDuration := range pt.ranges {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
itersForDuration[fp] = p.PreloadRange(fp, start.Add(-rangeDuration), end)
}
for fp := range pt.instants {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
itersForDuration[fp] = p.PreloadInstant(fp, start, StalenessDelta)
}
}
// Attach storage iterators to AST nodes.
Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
for fp := range n.metrics {
n.iterators[fp] = iters[n.Offset][fp]
}
case *MatrixSelector:
for fp := range n.metrics {
n.iterators[fp] = iters[n.Offset][fp]
}
}
return true
})
return p, nil
}

View file

@ -135,9 +135,8 @@ type MatrixSelector struct {
Offset time.Duration Offset time.Duration
LabelMatchers metric.LabelMatchers LabelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time. // The series iterators are populated at query preparation time.
iterators map[model.Fingerprint]local.SeriesIterator iterators []local.SeriesIterator
metrics map[model.Fingerprint]metric.Metric
} }
// NumberLiteral represents a number. // NumberLiteral represents a number.
@ -169,9 +168,8 @@ type VectorSelector struct {
Offset time.Duration Offset time.Duration
LabelMatchers metric.LabelMatchers LabelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time. // The series iterators are populated at query preparation time.
iterators map[model.Fingerprint]local.SeriesIterator iterators []local.SeriesIterator
metrics map[model.Fingerprint]metric.Metric
} }
func (e *AggregateExpr) Type() model.ValueType { return model.ValVector } func (e *AggregateExpr) Type() model.ValueType { return model.ValVector }

View file

@ -216,10 +216,10 @@ func contextDone(ctx context.Context, env string) error {
} }
// Engine handles the lifetime of queries from beginning to end. // Engine handles the lifetime of queries from beginning to end.
// It is connected to a storage. // It is connected to a querier.
type Engine struct { type Engine struct {
// The storage on which the engine operates. // The querier on which the engine operates.
storage local.Querier querier local.Querier
// The base context for all queries and its cancellation function. // The base context for all queries and its cancellation function.
baseCtx context.Context baseCtx context.Context
@ -231,13 +231,13 @@ type Engine struct {
} }
// NewEngine returns a new engine. // NewEngine returns a new engine.
func NewEngine(storage local.Querier, o *EngineOptions) *Engine { func NewEngine(querier local.Querier, o *EngineOptions) *Engine {
if o == nil { if o == nil {
o = DefaultEngineOptions o = DefaultEngineOptions
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &Engine{ return &Engine{
storage: storage, querier: querier,
baseCtx: ctx, baseCtx: ctx,
cancelQueries: cancel, cancelQueries: cancel,
gate: newQueryGate(o.MaxConcurrentQueries), gate: newQueryGate(o.MaxConcurrentQueries),
@ -309,9 +309,8 @@ func (ng *Engine) newQuery(expr Expr, start, end model.Time, interval time.Durat
// of an arbitrary function during handling. It is used to test the Engine. // of an arbitrary function during handling. It is used to test the Engine.
type testStmt func(context.Context) error type testStmt func(context.Context) error
func (testStmt) String() string { return "test statement" } func (testStmt) String() string { return "test statement" }
func (testStmt) DotGraph() string { return "test statement" } func (testStmt) stmt() {}
func (testStmt) stmt() {}
func (ng *Engine) newTestQuery(f func(context.Context) error) Query { func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
qry := &query{ qry := &query{
@ -365,35 +364,14 @@ func (ng *Engine) exec(q *query) (model.Value, error) {
// execEvalStmt evaluates the expression of an evaluation statement for the given time range. // execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) { func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) {
prepareTimer := query.stats.GetTimer(stats.TotalQueryPreparationTime).Start() prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
analyzeTimer := query.stats.GetTimer(stats.QueryAnalysisTime).Start() err := ng.populateIterators(s)
// Only one execution statement per query is allowed.
analyzer := &Analyzer{
Storage: ng.storage,
Expr: s.Expr,
Start: s.Start,
End: s.End,
}
err := analyzer.Analyze(ctx)
if err != nil {
analyzeTimer.Stop()
prepareTimer.Stop()
return nil, err
}
analyzeTimer.Stop()
preloadTimer := query.stats.GetTimer(stats.PreloadTime).Start()
closer, err := analyzer.Prepare(ctx)
if err != nil {
preloadTimer.Stop()
prepareTimer.Stop()
return nil, err
}
defer closer.Close()
preloadTimer.Stop()
prepareTimer.Stop() prepareTimer.Stop()
if err != nil {
return nil, err
}
defer ng.closeIterators(s)
evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start()
// Instant evaluation. // Instant evaluation.
@ -498,8 +476,70 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
return resMatrix, nil return resMatrix, nil
} }
func (ng *Engine) populateIterators(s *EvalStmt) error {
var queryErr error
Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
var iterators []local.SeriesIterator
var err error
if s.Start.Equal(s.End) {
iterators, err = ng.querier.QueryInstant(
s.Start.Add(-n.Offset),
StalenessDelta,
n.LabelMatchers...,
)
} else {
iterators, err = ng.querier.QueryRange(
s.Start.Add(-n.Offset-StalenessDelta),
s.End.Add(-n.Offset),
n.LabelMatchers...,
)
}
if err != nil {
queryErr = err
return false
}
for _, it := range iterators {
n.iterators = append(n.iterators, it)
}
case *MatrixSelector:
iterators, err := ng.querier.QueryRange(
s.Start.Add(-n.Offset-n.Range),
s.End.Add(-n.Offset),
n.LabelMatchers...,
)
if err != nil {
queryErr = err
return false
}
for _, it := range iterators {
n.iterators = append(n.iterators, it)
}
}
return true
})
return queryErr
}
func (ng *Engine) closeIterators(s *EvalStmt) {
Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
for _, it := range n.iterators {
it.Close()
}
case *MatrixSelector:
for _, it := range n.iterators {
it.Close()
}
}
return true
})
}
// An evaluator evaluates given expressions at a fixed timestamp. It is attached to an // An evaluator evaluates given expressions at a fixed timestamp. It is attached to an
// engine through which it connects to a storage and reports errors. On timeout or // engine through which it connects to a querier and reports errors. On timeout or
// cancellation of its context it terminates. // cancellation of its context it terminates.
type evaluator struct { type evaluator struct {
ctx context.Context ctx context.Context
@ -681,14 +721,14 @@ func (ev *evaluator) eval(expr Expr) model.Value {
// vectorSelector evaluates a *VectorSelector expression. // vectorSelector evaluates a *VectorSelector expression.
func (ev *evaluator) vectorSelector(node *VectorSelector) vector { func (ev *evaluator) vectorSelector(node *VectorSelector) vector {
vec := vector{} vec := vector{}
for fp, it := range node.iterators { for _, it := range node.iterators {
refTime := ev.Timestamp.Add(-node.Offset) refTime := ev.Timestamp.Add(-node.Offset)
samplePair := it.ValueAtOrBeforeTime(refTime) samplePair := it.ValueAtOrBeforeTime(refTime)
if samplePair.Timestamp.Before(refTime.Add(-StalenessDelta)) { if samplePair.Timestamp.Before(refTime.Add(-StalenessDelta)) {
continue // Sample outside of staleness policy window. continue // Sample outside of staleness policy window.
} }
vec = append(vec, &sample{ vec = append(vec, &sample{
Metric: node.metrics[fp], Metric: it.Metric(),
Value: samplePair.Value, Value: samplePair.Value,
Timestamp: ev.Timestamp, Timestamp: ev.Timestamp,
}) })
@ -704,7 +744,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix {
} }
sampleStreams := make([]*sampleStream, 0, len(node.iterators)) sampleStreams := make([]*sampleStream, 0, len(node.iterators))
for fp, it := range node.iterators { for _, it := range node.iterators {
samplePairs := it.RangeValues(interval) samplePairs := it.RangeValues(interval)
if len(samplePairs) == 0 { if len(samplePairs) == 0 {
continue continue
@ -717,7 +757,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix {
} }
sampleStream := &sampleStream{ sampleStream := &sampleStream{
Metric: node.metrics[fp], Metric: it.Metric(),
Values: samplePairs, Values: samplePairs,
} }
sampleStreams = append(sampleStreams, sampleStream) sampleStreams = append(sampleStreams, sampleStream)

View file

@ -50,7 +50,7 @@ func TestQueryConcurrency(t *testing.T) {
select { select {
case <-processing: case <-processing:
t.Fatalf("Query above concurrency threhosld being executed") t.Fatalf("Query above concurrency threshold being executed")
case <-time.After(20 * time.Millisecond): case <-time.After(20 * time.Millisecond):
// Expected. // Expected.
} }

View file

@ -38,8 +38,9 @@ type Storage interface {
// already or has too many chunks waiting for persistence. // already or has too many chunks waiting for persistence.
storage.SampleAppender storage.SampleAppender
// Drop all time series associated with the given fingerprints. // Drop all time series associated with the given label matchers. Returns
DropMetricsForFingerprints(...model.Fingerprint) // the number series that were dropped.
DropMetricsForLabelMatchers(...*metric.LabelMatcher) (int, error)
// Run the various maintenance loops in goroutines. Returns when the // Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background // storage is ready to use. Keeps everything running in the background
// until Stop is called. // until Stop is called.
@ -55,25 +56,32 @@ type Storage interface {
// Querier allows querying a time series storage. // Querier allows querying a time series storage.
type Querier interface { type Querier interface {
// NewPreloader returns a new Preloader which allows preloading and pinning // QueryRange returns a list of series iterators for the selected
// series data into memory for use within a query. // time range and label matchers. The iterators need to be closed
NewPreloader() Preloader // after usage.
QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
// QueryInstant returns a list of series iterators for the selected
// instant and label matchers. The iterators need to be closed after usage.
QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
// MetricsForLabelMatchers returns the metrics from storage that satisfy // MetricsForLabelMatchers returns the metrics from storage that satisfy
// the given label matchers. At least one label matcher must be // the given sets of label matchers. Each set of matchers must contain at
// specified that does not match the empty string, otherwise an empty // least one label matcher that does not match the empty string. Otherwise,
// map is returned. The times from and through are hints for the storage // an empty list is returned. Within one set of matchers, the intersection
// to optimize the search. The storage MAY exclude metrics that have no // of matching series is computed. The final return value will be the union
// samples in the specified interval from the returned map. In doubt, // of the per-set results. The times from and through are hints for the
// specify model.Earliest for from and model.Latest for through. // storage to optimize the search. The storage MAY exclude metrics that
MetricsForLabelMatchers(from, through model.Time, matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric // have no samples in the specified interval from the returned map. In
// doubt, specify model.Earliest for from and model.Latest for through.
MetricsForLabelMatchers(from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error)
// LastSampleForFingerprint returns the last sample that has been // LastSampleForFingerprint returns the last sample that has been
// ingested for the provided fingerprint. If this instance of the // ingested for the given sets of label matchers. If this instance of the
// Storage has never ingested a sample for the provided fingerprint (or // Storage has never ingested a sample for the provided fingerprint (or
// the last ingestion is so long ago that the series has been archived), // the last ingestion is so long ago that the series has been archived),
// ZeroSample is returned. // ZeroSample is returned. The label matching behavior is the same as in
LastSampleForFingerprint(model.Fingerprint) model.Sample // MetricsForLabelMatchers.
LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error)
// Get all of the label values that are associated with a given label name. // Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(model.LabelName) model.LabelValues LabelValuesForLabelName(model.LabelName) (model.LabelValues, error)
} }
// SeriesIterator enables efficient access of sample values in a series. Its // SeriesIterator enables efficient access of sample values in a series. Its
@ -88,21 +96,9 @@ type SeriesIterator interface {
ValueAtOrBeforeTime(model.Time) model.SamplePair ValueAtOrBeforeTime(model.Time) model.SamplePair
// Gets all values contained within a given interval. // Gets all values contained within a given interval.
RangeValues(metric.Interval) []model.SamplePair RangeValues(metric.Interval) []model.SamplePair
} // Returns the metric of the series that the iterator corresponds to.
Metric() metric.Metric
// A Preloader preloads series data necessary for a query into memory, pins it // Closes the iterator and releases the underlying data.
// until released via Close(), and returns an iterator for the pinned data. Its
// methods are generally not goroutine-safe.
type Preloader interface {
PreloadRange(
fp model.Fingerprint,
from, through model.Time,
) SeriesIterator
PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) SeriesIterator
// Close unpins any previously requested series data from memory.
Close() Close()
} }

View file

@ -343,13 +343,13 @@ func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerp
// name. This method is goroutine-safe but take into account that metrics queued // name. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index // for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.) // yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues { func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) {
lvs, _, err := p.labelNameToLabelValues.Lookup(ln) lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
if err != nil { if err != nil {
p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err)) p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err))
return nil return nil, err
} }
return lvs return lvs, nil
} }
// persistChunks persists a number of consecutive chunks of a series. It is the // persistChunks persists a number of consecutive chunks of a series. It is the

View file

@ -1045,7 +1045,10 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
// Compare label name -> label values mappings. // Compare label name -> label values mappings.
for ln, lvs := range b.expectedLnToLvs { for ln, lvs := range b.expectedLnToLvs {
outLvs := p.labelValuesForLabelName(ln) outLvs, err := p.labelValuesForLabelName(ln)
if err != nil {
t.Fatal(err)
}
outSet := codable.LabelValueSet{} outSet := codable.LabelValueSet{}
for _, lv := range outLvs { for _, lv := range outLvs {

View file

@ -1,55 +0,0 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"time"
"github.com/prometheus/common/model"
)
// memorySeriesPreloader is a Preloader for the MemorySeriesStorage.
type memorySeriesPreloader struct {
storage *MemorySeriesStorage
pinnedChunkDescs []*chunkDesc
}
// PreloadRange implements Preloader.
func (p *memorySeriesPreloader) PreloadRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) SeriesIterator {
cds, iter := p.storage.preloadChunksForRange(fp, from, through)
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter
}
// PreloadInstant implements Preloader
func (p *memorySeriesPreloader) PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) SeriesIterator {
cds, iter := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp)
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter
}
// Close implements Preloader.
func (p *memorySeriesPreloader) Close() {
for _, cd := range p.pinnedChunkDescs {
cd.unpin(p.storage.evictRequests)
}
chunkOps.WithLabelValues(unpin).Add(float64(len(p.pinnedChunkDescs)))
}

View file

@ -340,7 +340,7 @@ func (s *memorySeries) dropChunks(t model.Time) error {
// preloadChunks is an internal helper method. // preloadChunks is an internal helper method.
func (s *memorySeries) preloadChunks( func (s *memorySeries) preloadChunks(
indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage, indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) { ) (SeriesIterator, error) {
loadIndexes := []int{} loadIndexes := []int{}
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes))
for _, idx := range indexes { for _, idx := range indexes {
@ -364,7 +364,7 @@ func (s *memorySeries) preloadChunks(
cd.unpin(mss.evictRequests) cd.unpin(mss.evictRequests)
} }
chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs)))
return nil, nopIter, err return nopIter, err
} }
for i, c := range chunks { for i, c := range chunks {
s.chunkDescs[loadIndexes[i]].setChunk(c) s.chunkDescs[loadIndexes[i]].setChunk(c)
@ -380,18 +380,22 @@ func (s *memorySeries) preloadChunks(
} }
iter := &boundedIterator{ iter := &boundedIterator{
it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries), it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries, mss.evictRequests),
start: model.Now().Add(-mss.dropAfter), start: model.Now().Add(-mss.dropAfter),
} }
return pinnedChunkDescs, iter, nil return iter, nil
} }
// newIterator returns a new SeriesIterator for the provided chunkDescs (which // newIterator returns a new SeriesIterator for the provided chunkDescs (which
// must be pinned). // must be pinned).
// //
// The caller must have locked the fingerprint of the memorySeries. // The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine func(error)) SeriesIterator { func (s *memorySeries) newIterator(
pinnedChunkDescs []*chunkDesc,
quarantine func(error),
evictRequests chan<- evictRequest,
) SeriesIterator {
chunks := make([]chunk, 0, len(pinnedChunkDescs)) chunks := make([]chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
// It's OK to directly access cd.c here (without locking) as the // It's OK to directly access cd.c here (without locking) as the
@ -399,9 +403,12 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine fun
chunks = append(chunks, cd.c) chunks = append(chunks, cd.c)
} }
return &memorySeriesIterator{ return &memorySeriesIterator{
chunks: chunks, chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)), chunkIts: make([]chunkIterator, len(chunks)),
quarantine: quarantine, quarantine: quarantine,
metric: s.metric,
pinnedChunkDescs: pinnedChunkDescs,
evictRequests: evictRequests,
} }
} }
@ -413,7 +420,7 @@ func (s *memorySeries) preloadChunksForInstant(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
mss *MemorySeriesStorage, mss *MemorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) { ) (SeriesIterator, error) {
// If we have a lastSamplePair in the series, and thas last samplePair // If we have a lastSamplePair in the series, and thas last samplePair
// is in the interval, just take it in a singleSampleSeriesIterator. No // is in the interval, just take it in a singleSampleSeriesIterator. No
// need to pin or load anything. // need to pin or load anything.
@ -422,10 +429,13 @@ func (s *memorySeries) preloadChunksForInstant(
!from.After(lastSample.Timestamp) && !from.After(lastSample.Timestamp) &&
lastSample != ZeroSamplePair { lastSample != ZeroSamplePair {
iter := &boundedIterator{ iter := &boundedIterator{
it: &singleSampleSeriesIterator{samplePair: lastSample}, it: &singleSampleSeriesIterator{
samplePair: lastSample,
metric: s.metric,
},
start: model.Now().Add(-mss.dropAfter), start: model.Now().Add(-mss.dropAfter),
} }
return nil, iter, nil return iter, nil
} }
// If we are here, we are out of luck and have to delegate to the more // If we are here, we are out of luck and have to delegate to the more
// expensive method. // expensive method.
@ -438,7 +448,7 @@ func (s *memorySeries) preloadChunksForRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
mss *MemorySeriesStorage, mss *MemorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) { ) (SeriesIterator, error) {
firstChunkDescTime := model.Latest firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 { if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime() firstChunkDescTime = s.chunkDescs[0].firstTime()
@ -446,7 +456,7 @@ func (s *memorySeries) preloadChunksForRange(
if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) {
cds, err := mss.loadChunkDescs(fp, s.persistWatermark) cds, err := mss.loadChunkDescs(fp, s.persistWatermark)
if err != nil { if err != nil {
return nil, nopIter, err return nopIter, err
} }
s.chunkDescs = append(cds, s.chunkDescs...) s.chunkDescs = append(cds, s.chunkDescs...)
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
@ -455,7 +465,7 @@ func (s *memorySeries) preloadChunksForRange(
} }
if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) { if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) {
return nil, nopIter, nil return nopIter, nil
} }
// Find first chunk with start time after "from". // Find first chunk with start time after "from".
@ -471,10 +481,10 @@ func (s *memorySeries) preloadChunksForRange(
// series ends before "from" and we don't need to do anything. // series ends before "from" and we don't need to do anything.
lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime() lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime()
if err != nil { if err != nil {
return nil, nopIter, err return nopIter, err
} }
if lt.Before(from) { if lt.Before(from) {
return nil, nopIter, nil return nopIter, nil
} }
} }
if fromIdx > 0 { if fromIdx > 0 {
@ -547,10 +557,20 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc {
// memorySeriesIterator implements SeriesIterator. // memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct { type memorySeriesIterator struct {
chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. // Last chunkIterator used by ValueAtOrBeforeTime.
chunkIts []chunkIterator // Caches chunkIterators. chunkIt chunkIterator
chunks []chunk // Caches chunkIterators.
quarantine func(error) // Call to quarantine the series this iterator belongs to. chunkIts []chunkIterator
// The actual sample chunks.
chunks []chunk
// Call to quarantine the series this iterator belongs to.
quarantine func(error)
// The metric corresponding to the iterator.
metric model.Metric
// Chunks that were pinned for this iterator.
pinnedChunkDescs []*chunkDesc
// Where to send evict requests when unpinning pinned chunks.
evictRequests chan<- evictRequest
} }
// ValueAtOrBeforeTime implements SeriesIterator. // ValueAtOrBeforeTime implements SeriesIterator.
@ -630,6 +650,10 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
return values return values
} }
func (it *memorySeriesIterator) Metric() metric.Metric {
return metric.Metric{Metric: it.metric}
}
// chunkIterator returns the chunkIterator for the chunk at position i (and // chunkIterator returns the chunkIterator for the chunk at position i (and
// creates it if needed). // creates it if needed).
func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator {
@ -641,11 +665,19 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator {
return chunkIt return chunkIt
} }
func (it *memorySeriesIterator) Close() {
for _, cd := range it.pinnedChunkDescs {
cd.unpin(it.evictRequests)
}
chunkOps.WithLabelValues(unpin).Add(float64(len(it.pinnedChunkDescs)))
}
// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut // singleSampleSeriesIterator implements Series Iterator. It is a "shortcut
// iterator" that returns a single samplee only. The sample is saved in the // iterator" that returns a single sample only. The sample is saved in the
// iterator itself, so no chunks need to be pinned. // iterator itself, so no chunks need to be pinned.
type singleSampleSeriesIterator struct { type singleSampleSeriesIterator struct {
samplePair model.SamplePair samplePair model.SamplePair
metric model.Metric
} }
// ValueAtTime implements SeriesIterator. // ValueAtTime implements SeriesIterator.
@ -665,6 +697,13 @@ func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.Sa
return []model.SamplePair{it.samplePair} return []model.SamplePair{it.samplePair}
} }
func (it *singleSampleSeriesIterator) Metric() metric.Metric {
return metric.Metric{Metric: it.metric}
}
// Close implements SeriesIterator.
func (it *singleSampleSeriesIterator) Close() {}
// nopSeriesIterator implements Series Iterator. It never returns any values. // nopSeriesIterator implements Series Iterator. It never returns any values.
type nopSeriesIterator struct{} type nopSeriesIterator struct{}
@ -678,4 +717,12 @@ func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
return []model.SamplePair{} return []model.SamplePair{}
} }
// Metric implements SeriesIterator.
func (i nopSeriesIterator) Metric() metric.Metric {
return metric.Metric{}
}
// Close implements SeriesIterator.
func (i nopSeriesIterator) Close() {}
var nopIter nopSeriesIterator // A nopSeriesIterator for convenience. Can be shared. var nopIter nopSeriesIterator // A nopSeriesIterator for convenience. Can be shared.

View file

@ -142,6 +142,9 @@ const (
// synced or not. It does not need to be goroutine safe. // synced or not. It does not need to be goroutine safe.
type syncStrategy func() bool type syncStrategy func() bool
// A MemorySeriesStorage manages series in memory over time, while also
// interfacing with a persistence layer to make time series data persistent
// across restarts and evictable from memory.
type MemorySeriesStorage struct { type MemorySeriesStorage struct {
// archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations. // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations.
archiveHighWatermark model.Time // No archived series has samples after this time. archiveHighWatermark model.Time // No archived series has samples after this time.
@ -409,21 +412,38 @@ func (s *MemorySeriesStorage) WaitForIndexing() {
s.persistence.waitForIndexing() s.persistence.waitForIndexing()
} }
// LastSampleForFingerprint implements Storage. // LastSampleForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample { func (s *MemorySeriesStorage) LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
s.fpLocker.Lock(fp) fps := map[model.Fingerprint]struct{}{}
defer s.fpLocker.Unlock(fp) for _, matchers := range matcherSets {
fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...)
if err != nil {
return nil, err
}
for fp := range fpToMetric {
fps[fp] = struct{}{}
}
}
series, ok := s.fpToSeries.get(fp) res := make(model.Vector, 0, len(fps))
if !ok { for fp := range fps {
return ZeroSample s.fpLocker.Lock(fp)
}
sp := series.lastSamplePair() series, ok := s.fpToSeries.get(fp)
return model.Sample{ if !ok {
Metric: series.metric, // A series could have disappeared between resolving label matchers and here.
Value: sp.Value, s.fpLocker.Unlock(fp)
Timestamp: sp.Timestamp, continue
}
sp := series.lastSamplePair()
res = append(res, &model.Sample{
Metric: series.metric,
Value: sp.Value,
Timestamp: sp.Timestamp,
})
s.fpLocker.Unlock(fp)
} }
return res, nil
} }
// boundedIterator wraps a SeriesIterator and does not allow fetching // boundedIterator wraps a SeriesIterator and does not allow fetching
@ -452,11 +472,45 @@ func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.Sample
return bit.it.RangeValues(interval) return bit.it.RangeValues(interval)
} }
// NewPreloader implements Storage. // Metric implements SeriesIterator.
func (s *MemorySeriesStorage) NewPreloader() Preloader { func (bit *boundedIterator) Metric() metric.Metric {
return &memorySeriesPreloader{ return bit.it.Metric()
storage: s, }
// Close implements SeriesIterator.
func (bit *boundedIterator) Close() {
bit.it.Close()
}
// QueryRange implements Storage.
func (s *MemorySeriesStorage) QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...)
if err != nil {
return nil, err
} }
iterators := make([]SeriesIterator, 0, len(fpToMetric))
for fp := range fpToMetric {
it := s.preloadChunksForRange(fp, from, through)
iterators = append(iterators, it)
}
return iterators, nil
}
// QueryInstant implements Storage.
func (s *MemorySeriesStorage) QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
from := ts.Add(-stalenessDelta)
through := ts
fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...)
if err != nil {
return nil, err
}
iterators := make([]SeriesIterator, 0, len(fpToMetric))
for fp := range fpToMetric {
it := s.preloadChunksForInstant(fp, from, through)
iterators = append(iterators, it)
}
return iterators, nil
} }
// fingerprintsForLabelPair returns the fingerprints with the given // fingerprintsForLabelPair returns the fingerprints with the given
@ -486,14 +540,36 @@ func (s *MemorySeriesStorage) fingerprintsForLabelPair(
// MetricsForLabelMatchers implements Storage. // MetricsForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) MetricsForLabelMatchers( func (s *MemorySeriesStorage) MetricsForLabelMatchers(
from, through model.Time,
matcherSets ...metric.LabelMatchers,
) ([]metric.Metric, error) {
fpToMetric := map[model.Fingerprint]metric.Metric{}
for _, matchers := range matcherSets {
metrics, err := s.metricsForLabelMatchers(from, through, matchers...)
if err != nil {
return nil, err
}
for fp, m := range metrics {
fpToMetric[fp] = m
}
}
metrics := make([]metric.Metric, 0, len(fpToMetric))
for _, m := range fpToMetric {
metrics = append(metrics, m)
}
return metrics, nil
}
func (s *MemorySeriesStorage) metricsForLabelMatchers(
from, through model.Time, from, through model.Time,
matchers ...*metric.LabelMatcher, matchers ...*metric.LabelMatcher,
) map[model.Fingerprint]metric.Metric { ) (map[model.Fingerprint]metric.Metric, error) {
sort.Sort(metric.LabelMatchers(matchers)) sort.Sort(metric.LabelMatchers(matchers))
if len(matchers) == 0 || matchers[0].MatchesEmptyString() { if len(matchers) == 0 || matchers[0].MatchesEmptyString() {
// No matchers at all or even the best matcher matches the empty string. // No matchers at all or even the best matcher matches the empty string.
return nil return nil, nil
} }
var ( var (
@ -516,7 +592,7 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers(
remainingFPs, remainingFPs,
) )
if len(remainingFPs) == 0 { if len(remainingFPs) == 0 {
return nil return nil, nil
} }
} }
@ -526,9 +602,14 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers(
if m.MatchesEmptyString() { if m.MatchesEmptyString() {
break break
} }
lvs := m.Filter(s.LabelValuesForLabelName(m.Name))
lvs, err := s.LabelValuesForLabelName(m.Name)
if err != nil {
return nil, err
}
lvs = m.Filter(lvs)
if len(lvs) == 0 { if len(lvs) == 0 {
return nil return nil, nil
} }
fps := map[model.Fingerprint]struct{}{} fps := map[model.Fingerprint]struct{}{}
for _, lv := range lvs { for _, lv := range lvs {
@ -543,7 +624,7 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers(
} }
remainingFPs = fps remainingFPs = fps
if len(remainingFPs) == 0 { if len(remainingFPs) == 0 {
return nil return nil, nil
} }
} }
@ -562,7 +643,7 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers(
} }
} }
} }
return result return result, nil
} }
// metricForRange returns the metric for the given fingerprint if the // metricForRange returns the metric for the given fingerprint if the
@ -612,15 +693,20 @@ func (s *MemorySeriesStorage) metricForRange(
} }
// LabelValuesForLabelName implements Storage. // LabelValuesForLabelName implements Storage.
func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) (model.LabelValues, error) {
return s.persistence.labelValuesForLabelName(labelName) return s.persistence.labelValuesForLabelName(labelName)
} }
// DropMetric implements Storage. // DropMetricsForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(matchers ...*metric.LabelMatcher) (int, error) {
for _, fp := range fps { fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...)
if err != nil {
return 0, err
}
for fp := range fpToMetric {
s.purgeSeries(fp, nil, nil) s.purgeSeries(fp, nil, nil)
} }
return len(fpToMetric), nil
} }
var ( var (
@ -802,39 +888,39 @@ func (s *MemorySeriesStorage) seriesForRange(
func (s *MemorySeriesStorage) preloadChunksForRange( func (s *MemorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator) { ) SeriesIterator {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
series := s.seriesForRange(fp, from, through) series := s.seriesForRange(fp, from, through)
if series == nil { if series == nil {
return nil, nopIter return nopIter
} }
cds, iter, err := series.preloadChunksForRange(fp, from, through, s) iter, err := series.preloadChunksForRange(fp, from, through, s)
if err != nil { if err != nil {
s.quarantineSeries(fp, series.metric, err) s.quarantineSeries(fp, series.metric, err)
return nil, nopIter return nopIter
} }
return cds, iter return iter
} }
func (s *MemorySeriesStorage) preloadChunksForInstant( func (s *MemorySeriesStorage) preloadChunksForInstant(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator) { ) SeriesIterator {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
series := s.seriesForRange(fp, from, through) series := s.seriesForRange(fp, from, through)
if series == nil { if series == nil {
return nil, nopIter return nopIter
} }
cds, iter, err := series.preloadChunksForInstant(fp, from, through, s) iter, err := series.preloadChunksForInstant(fp, from, through, s)
if err != nil { if err != nil {
s.quarantineSeries(fp, series.metric, err) s.quarantineSeries(fp, series.metric, err)
return nil, nopIter return nopIter
} }
return cds, iter return iter
} }
func (s *MemorySeriesStorage) handleEvictList() { func (s *MemorySeriesStorage) handleEvictList() {

View file

@ -193,14 +193,18 @@ func TestMatches(t *testing.T) {
} }
for _, mt := range matcherTests { for _, mt := range matcherTests {
res := storage.MetricsForLabelMatchers( metrics, err := storage.MetricsForLabelMatchers(
model.Earliest, model.Latest, model.Earliest, model.Latest,
mt.matchers..., mt.matchers,
) )
if len(mt.expected) != len(res) { if err != nil {
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res)) t.Fatal(err)
} }
for fp1 := range res { if len(mt.expected) != len(metrics) {
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(metrics))
}
for _, m := range metrics {
fp1 := m.Metric.FastFingerprint()
found := false found := false
for _, fp2 := range mt.expected { for _, fp2 := range mt.expected {
if fp1 == fp2 { if fp1 == fp2 {
@ -213,16 +217,24 @@ func TestMatches(t *testing.T) {
} }
} }
// Smoketest for from/through. // Smoketest for from/through.
if len(storage.MetricsForLabelMatchers( metrics, err = storage.MetricsForLabelMatchers(
model.Earliest, -10000, model.Earliest, -10000,
mt.matchers..., mt.matchers,
)) > 0 { )
if err != nil {
t.Fatal(err)
}
if len(metrics) > 0 {
t.Error("expected no matches with 'through' older than any sample") t.Error("expected no matches with 'through' older than any sample")
} }
if len(storage.MetricsForLabelMatchers( metrics, err = storage.MetricsForLabelMatchers(
10000, model.Latest, 10000, model.Latest,
mt.matchers..., mt.matchers,
)) > 0 { )
if err != nil {
t.Fatal(err)
}
if len(metrics) > 0 {
t.Error("expected no matches with 'from' newer than any sample") t.Error("expected no matches with 'from' newer than any sample")
} }
// Now the tricky one, cut out something from the middle. // Now the tricky one, cut out something from the middle.
@ -230,10 +242,13 @@ func TestMatches(t *testing.T) {
from model.Time = 25 from model.Time = 25
through model.Time = 75 through model.Time = 75
) )
res = storage.MetricsForLabelMatchers( metrics, err = storage.MetricsForLabelMatchers(
from, through, from, through,
mt.matchers..., mt.matchers,
) )
if err != nil {
t.Fatal(err)
}
expected := model.Fingerprints{} expected := model.Fingerprints{}
for _, fp := range mt.expected { for _, fp := range mt.expected {
i := 0 i := 0
@ -246,10 +261,11 @@ func TestMatches(t *testing.T) {
expected = append(expected, fp) expected = append(expected, fp)
} }
} }
if len(expected) != len(res) { if len(expected) != len(metrics) {
t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res)) t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(metrics))
} }
for fp1 := range res { for _, m := range metrics {
fp1 := m.Metric.FastFingerprint()
found := false found := false
for _, fp2 := range expected { for _, fp2 := range expected {
if fp1 == fp2 { if fp1 == fp2 {
@ -348,7 +364,7 @@ func TestFingerprintsForLabels(t *testing.T) {
} }
} }
var benchLabelMatchingRes map[model.Fingerprint]metric.Metric var benchLabelMatchingRes []metric.Metric
func BenchmarkLabelMatching(b *testing.B) { func BenchmarkLabelMatching(b *testing.B) {
s, closer := NewTestStorage(b, 2) s, closer := NewTestStorage(b, 2)
@ -430,13 +446,17 @@ func BenchmarkLabelMatching(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
var err error
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{} benchLabelMatchingRes = []metric.Metric{}
for _, mt := range matcherTests { for _, mt := range matcherTests {
benchLabelMatchingRes = s.MetricsForLabelMatchers( benchLabelMatchingRes, err = s.MetricsForLabelMatchers(
model.Earliest, model.Latest, model.Earliest, model.Latest,
mt..., mt,
) )
if err != nil {
b.Fatal(err)
}
} }
} }
// Stop timer to not count the storage closing. // Stop timer to not count the storage closing.
@ -469,26 +489,25 @@ func TestRetentionCutoff(t *testing.T) {
} }
s.WaitForIndexing() s.WaitForIndexing()
var fp model.Fingerprint lm, err := metric.NewLabelMatcher(metric.Equal, "job", "test")
for f := range s.fingerprintsForLabelPair(model.LabelPair{ if err != nil {
Name: "job", Value: "test", t.Fatalf("error creating label matcher: %s", err)
}, nil, nil) { }
fp = f its, err := s.QueryRange(insertStart, now, lm)
break if err != nil {
t.Fatal(err)
} }
pl := s.NewPreloader() if len(its) != 1 {
defer pl.Close() t.Fatalf("expected one iterator but got %d", len(its))
}
// Preload everything. val := its[0].ValueAtOrBeforeTime(now.Add(-61 * time.Minute))
it := pl.PreloadRange(fp, insertStart, now)
val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute))
if val.Timestamp != model.Earliest { if val.Timestamp != model.Earliest {
t.Errorf("unexpected result for timestamp before retention period") t.Errorf("unexpected result for timestamp before retention period")
} }
vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) vals := its[0].RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
// We get 59 values here because the model.Now() is slightly later // We get 59 values here because the model.Now() is slightly later
// than our now. // than our now.
if len(vals) != 59 { if len(vals) != 59 {
@ -522,6 +541,15 @@ func TestDropMetrics(t *testing.T) {
m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"}
m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"} m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"}
lm1, err := metric.NewLabelMatcher(metric.Equal, "n1", "v1")
if err != nil {
t.Fatal(err)
}
lmAll, err := metric.NewLabelMatcher(metric.Equal, model.MetricNameLabel, "test")
if err != nil {
t.Fatal(err)
}
N := 120000 N := 120000
for j, m := range []model.Metric{m1, m2, m3} { for j, m := range []model.Metric{m1, m2, m3} {
@ -553,7 +581,13 @@ func TestDropMetrics(t *testing.T) {
fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived} fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived}
s.DropMetricsForFingerprints(fpList[0]) n, err := s.DropMetricsForLabelMatchers(lm1)
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatalf("expected 1 series to be dropped, got %d", n)
}
s.WaitForIndexing() s.WaitForIndexing()
fps2 := s.fingerprintsForLabelPair(model.LabelPair{ fps2 := s.fingerprintsForLabelPair(model.LabelPair{
@ -563,12 +597,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps2)) t.Errorf("unexpected number of fingerprints: %d", len(fps2))
} }
_, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
@ -580,7 +614,13 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("chunk file does not exist for fp=%v", fpList[2]) t.Errorf("chunk file does not exist for fp=%v", fpList[2])
} }
s.DropMetricsForFingerprints(fpList...) n, err = s.DropMetricsForLabelMatchers(lmAll)
if err != nil {
t.Fatal(err)
}
if n != 2 {
t.Fatalf("expected 2 series to be dropped, got %d", n)
}
s.WaitForIndexing() s.WaitForIndexing()
fps3 := s.fingerprintsForLabelPair(model.LabelPair{ fps3 := s.fingerprintsForLabelPair(model.LabelPair{
@ -590,12 +630,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps3)) t.Errorf("unexpected number of fingerprints: %d", len(fps3))
} }
_, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
@ -672,10 +712,9 @@ func TestQuarantineMetric(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps)) t.Errorf("unexpected number of fingerprints: %d", len(fps))
} }
pl := s.NewPreloader()
// This will access the corrupt file and lead to quarantining. // This will access the corrupt file and lead to quarantining.
pl.PreloadInstant(fpToBeArchived, now.Add(-2*time.Hour), time.Minute) iter := s.preloadChunksForInstant(fpToBeArchived, now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour))
pl.Close() iter.Close()
time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait.
s.WaitForIndexing() s.WaitForIndexing()
@ -816,7 +855,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
// #1 Exactly on a sample. // #1 Exactly on a sample.
for i, expected := range samples { for i, expected := range samples {
@ -894,7 +933,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
b.ResetTimer() b.ResetTimer()
@ -976,7 +1015,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
// #1 Zero length interval at sample. // #1 Zero length interval at sample.
for i, expected := range samples { for i, expected := range samples {
@ -1132,7 +1171,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
b.ResetTimer() b.ResetTimer()
@ -1182,7 +1221,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop ~half of the chunks. // Drop ~half of the chunks.
s.maintainMemorySeries(fp, 10000) s.maintainMemorySeries(fp, 10000)
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
actual := it.RangeValues(metric.Interval{ actual := it.RangeValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
NewestInclusive: 100000, NewestInclusive: 100000,
@ -1200,7 +1239,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop everything. // Drop everything.
s.maintainMemorySeries(fp, 100000) s.maintainMemorySeries(fp, 100000)
_, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) it = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
actual = it.RangeValues(metric.Interval{ actual = it.RangeValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
NewestInclusive: 100000, NewestInclusive: 100000,
@ -1364,14 +1403,13 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) {
} }
// Load everything back. // Load everything back.
p := s.NewPreloader() it := s.preloadChunksForRange(fp, 0, 100000)
p.PreloadRange(fp, 0, 100000)
if oldLen != len(series.chunkDescs) { if oldLen != len(series.chunkDescs) {
t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs))
} }
p.Close() it.Close()
// Now maintain series with drops to make sure nothing crazy happens. // Now maintain series with drops to make sure nothing crazy happens.
s.maintainMemorySeries(fp, 100000) s.maintainMemorySeries(fp, 100000)
@ -1693,8 +1731,7 @@ func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Sam
for _, i := range rand.Perm(len(samples)) { for _, i := range rand.Perm(len(samples)) {
sample := samples[i] sample := samples[i]
fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
p := s.NewPreloader() it := s.preloadChunksForInstant(fp, sample.Timestamp, sample.Timestamp)
it := p.PreloadInstant(fp, sample.Timestamp, 0)
found := it.ValueAtOrBeforeTime(sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp)
startTime := it.(*boundedIterator).start startTime := it.(*boundedIterator).start
switch { switch {
@ -1713,7 +1750,7 @@ func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Sam
) )
result = false result = false
} }
p.Close() it.Close()
} }
return result return result
} }
@ -1723,21 +1760,21 @@ func verifyStorageSequential(t testing.TB, s *MemorySeriesStorage, samples model
var ( var (
result = true result = true
fp model.Fingerprint fp model.Fingerprint
p = s.NewPreloader()
it SeriesIterator it SeriesIterator
r []model.SamplePair r []model.SamplePair
j int j int
) )
defer func() { defer func() {
p.Close() it.Close()
}() }()
for i, sample := range samples { for i, sample := range samples {
newFP := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) newFP := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
if it == nil || newFP != fp { if it == nil || newFP != fp {
fp = newFP fp = newFP
p.Close() if it != nil {
p = s.NewPreloader() it.Close()
it = p.PreloadRange(fp, sample.Timestamp, model.Latest) }
it = s.preloadChunksForRange(fp, sample.Timestamp, model.Latest)
r = it.RangeValues(metric.Interval{ r = it.RangeValues(metric.Interval{
OldestInclusive: sample.Timestamp, OldestInclusive: sample.Timestamp,
NewestInclusive: model.Latest, NewestInclusive: model.Latest,
@ -1858,10 +1895,8 @@ func TestAppendOutOfOrder(t *testing.T) {
fp := s.mapper.mapFP(m.FastFingerprint(), m) fp := s.mapper.mapFP(m.FastFingerprint(), m)
pl := s.NewPreloader() it := s.preloadChunksForRange(fp, 0, 2)
defer pl.Close() defer it.Close()
it := pl.PreloadRange(fp, 0, 2)
want := []model.SamplePair{ want := []model.SamplePair{
{ {

View file

@ -21,19 +21,10 @@ type QueryTiming int
const ( const (
TotalEvalTime QueryTiming = iota TotalEvalTime QueryTiming = iota
ResultSortTime ResultSortTime
JSONEncodeTime QueryPreparationTime
PreloadTime
TotalQueryPreparationTime
InnerViewBuildingTime
InnerEvalTime InnerEvalTime
ResultAppendTime ResultAppendTime
QueryAnalysisTime
GetValueAtTimeTime
GetRangeValuesTime
ExecQueueTime ExecQueueTime
ViewDiskPreparationTime
ViewDataExtractionTime
ViewDiskExtractionTime
) )
// Return a string representation of a QueryTiming identifier. // Return a string representation of a QueryTiming identifier.
@ -43,32 +34,14 @@ func (s QueryTiming) String() string {
return "Total eval time" return "Total eval time"
case ResultSortTime: case ResultSortTime:
return "Result sorting time" return "Result sorting time"
case JSONEncodeTime: case QueryPreparationTime:
return "JSON encoding time" return "Query preparation time"
case PreloadTime:
return "Query preloading time"
case TotalQueryPreparationTime:
return "Total query preparation time"
case InnerViewBuildingTime:
return "Inner view building time"
case InnerEvalTime: case InnerEvalTime:
return "Inner eval time" return "Inner eval time"
case ResultAppendTime: case ResultAppendTime:
return "Result append time" return "Result append time"
case QueryAnalysisTime:
return "Query analysis time"
case GetValueAtTimeTime:
return "GetValueAtTime() time"
case GetRangeValuesTime:
return "GetRangeValues() time"
case ExecQueueTime: case ExecQueueTime:
return "Exec queue wait time" return "Exec queue wait time"
case ViewDiskPreparationTime:
return "View building disk preparation time"
case ViewDataExtractionTime:
return "Total view data extraction time"
case ViewDiskExtractionTime:
return "View disk data extraction time"
default: default:
return "Unknown query timing" return "Unknown query timing"
} }

View file

@ -221,7 +221,10 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) {
if !model.LabelNameRE.MatchString(name) { if !model.LabelNameRE.MatchString(name) {
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}
} }
vals := api.Storage.LabelValuesForLabelName(model.LabelName(name)) vals, err := api.Storage.LabelValuesForLabelName(model.LabelName(name))
if err != nil {
return nil, &apiError{errorExec, err}
}
sort.Sort(vals) sort.Sort(vals)
return vals, nil return vals, nil
@ -255,19 +258,18 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
end = model.Latest end = model.Latest
} }
res := map[model.Fingerprint]metric.Metric{} var matcherSets []metric.LabelMatchers
for _, s := range r.Form["match[]"] {
for _, lm := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s)
matchers, err := promql.ParseMetricSelector(lm)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err} return nil, &apiError{errorBadData, err}
} }
for fp, met := range api.Storage.MetricsForLabelMatchers( matcherSets = append(matcherSets, matchers)
start, end, }
matchers...,
) { res, err := api.Storage.MetricsForLabelMatchers(start, end, matcherSets...)
res[fp] = met if err != nil {
} return nil, &apiError{errorExec, err}
} }
metrics := make([]model.Metric, 0, len(res)) metrics := make([]model.Metric, 0, len(res))
@ -282,28 +284,24 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
if len(r.Form["match[]"]) == 0 { if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}
} }
fps := map[model.Fingerprint]struct{}{}
for _, lm := range r.Form["match[]"] { numDeleted := 0
matchers, err := promql.ParseMetricSelector(lm) for _, s := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err} return nil, &apiError{errorBadData, err}
} }
for fp := range api.Storage.MetricsForLabelMatchers( n, err := api.Storage.DropMetricsForLabelMatchers(matchers...)
model.Earliest, model.Latest, // Get every series. if err != nil {
matchers..., return nil, &apiError{errorExec, err}
) {
fps[fp] = struct{}{}
} }
} numDeleted += n
for fp := range fps {
api.Storage.DropMetricsForFingerprints(fp)
} }
res := struct { res := struct {
NumDeleted int `json:"numDeleted"` NumDeleted int `json:"numDeleted"`
}{ }{
NumDeleted: len(fps), NumDeleted: numDeleted,
} }
return res, nil return res, nil
} }

View file

@ -17,13 +17,12 @@ import (
"net/http" "net/http"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
dto "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/metric"
) )
func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
@ -32,20 +31,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
req.ParseForm() req.ParseForm()
fps := map[model.Fingerprint]struct{}{} var matcherSets []metric.LabelMatchers
for _, s := range req.Form["match[]"] { for _, s := range req.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s) matchers, err := promql.ParseMetricSelector(s)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
for fp := range h.storage.MetricsForLabelMatchers( matcherSets = append(matcherSets, matchers)
model.Now().Add(-promql.StalenessDelta), model.Latest,
matchers...,
) {
fps[fp] = struct{}{}
}
} }
var ( var (
@ -64,15 +57,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
Type: dto.MetricType_UNTYPED.Enum(), Type: dto.MetricType_UNTYPED.Enum(),
} }
for fp := range fps { vector, err := h.storage.LastSampleForLabelMatchers(minTimestamp, matcherSets...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
for _, s := range vector {
globalUsed := map[model.LabelName]struct{}{} globalUsed := map[model.LabelName]struct{}{}
s := h.storage.LastSampleForFingerprint(fp)
// Discard if sample does not exist or lays before the staleness interval.
if s.Timestamp.Before(minTimestamp) {
continue
}
// Reset label slice. // Reset label slice.
protMetric.Label = protMetric.Label[:0] protMetric.Label = protMetric.Label[:0]