mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
c1b669bf9b
* Introduce out-of-order TSDB support This implementation is based on this design doc: https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing This commit adds support to accept out-of-order ("OOO") sample into the TSDB up to a configurable time allowance. If OOO is enabled, overlapping querying are automatically enabled. Most of the additions have been borrowed from https://github.com/grafana/mimir-prometheus/ Here is the list ist of the original commits cherry picked from mimir-prometheus into this branch: -4b2198d7ec
-2836e5513f
-00b379c3a5
-ff0dc75758
-a632c73352
-c6f3d4ab33
-5e8406a1d4
-abde1e0ba1
-e70e769889
-df59320886
Co-authored-by: Jesus Vazquez <jesus.vazquez@grafana.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * gofumpt files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add license header to missing files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO tests due to existing chunk disk mapper implementation Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix truncate int overflow Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add Sync method to the WAL and update tests Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * remove useless sync Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Update minOOOTime after truncating Head * Update minOOOTime after truncating Head Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix lint Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Add a unit test Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Load OutOfOrderTimeWindow only once per appender Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO Head LabelValues and PostingsForMatchers Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix replay of OOO mmap chunks Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Remove unnecessary err check Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Prevent panic with ApplyConfig Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run OOO compaction after restart if there is OOO data from WBL Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Apply Bartek's suggestions Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Refactor OOO compaction Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address comments and TODOs - Added a comment explaining why we need the allow overlapping compaction toggle - Clarified TSDBConfig OutOfOrderTimeWindow doc - Added an owner to all the TODOs in the code Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run go format Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix remaining review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix tests Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Change wbl reference when truncating ooo in TestHeadMinOOOTimeUpdate Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix TestWBLAndMmapReplay test failure on windows Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address most of the feedback Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Refactor the block meta for out of order Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix windows error Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
407 lines
15 KiB
Go
407 lines
15 KiB
Go
// Copyright 2014 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/metadata"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
)
|
|
|
|
// The errors exposed.
|
|
var (
|
|
ErrNotFound = errors.New("not found")
|
|
// ErrOutOfOrderSample is when out of order support is disabled and the sample is out of order.
|
|
ErrOutOfOrderSample = errors.New("out of order sample")
|
|
// ErrOutOfBounds is when out of order support is disabled and the sample is older than the min valid time for the append.
|
|
ErrOutOfBounds = errors.New("out of bounds")
|
|
// ErrTooOldSample is when out of order support is enabled but the sample is outside the time window allowed.
|
|
ErrTooOldSample = errors.New("too old sample")
|
|
// ErrDuplicateSampleForTimestamp is when the sample has same timestamp but different value.
|
|
ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp")
|
|
ErrOutOfOrderExemplar = errors.New("out of order exemplar")
|
|
ErrDuplicateExemplar = errors.New("duplicate exemplar")
|
|
ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength)
|
|
ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0")
|
|
)
|
|
|
|
// SeriesRef is a generic series reference. In prometheus it is either a
|
|
// HeadSeriesRef or BlockSeriesRef, though other implementations may have
|
|
// their own reference types.
|
|
type SeriesRef uint64
|
|
|
|
// Appendable allows creating appenders.
|
|
type Appendable interface {
|
|
// Appender returns a new appender for the storage. The implementation
|
|
// can choose whether or not to use the context, for deadlines or to check
|
|
// for errors.
|
|
Appender(ctx context.Context) Appender
|
|
}
|
|
|
|
// SampleAndChunkQueryable allows retrieving samples as well as encoded samples in form of chunks.
|
|
type SampleAndChunkQueryable interface {
|
|
Queryable
|
|
ChunkQueryable
|
|
}
|
|
|
|
// Storage ingests and manages samples, along with various indexes. All methods
|
|
// are goroutine-safe. Storage implements storage.Appender.
|
|
type Storage interface {
|
|
SampleAndChunkQueryable
|
|
Appendable
|
|
|
|
// StartTime returns the oldest timestamp stored in the storage.
|
|
StartTime() (int64, error)
|
|
|
|
// Close closes the storage and all its underlying resources.
|
|
Close() error
|
|
}
|
|
|
|
// ExemplarStorage ingests and manages exemplars, along with various indexes. All methods are
|
|
// goroutine-safe. ExemplarStorage implements storage.ExemplarAppender and storage.ExemplarQuerier.
|
|
type ExemplarStorage interface {
|
|
ExemplarQueryable
|
|
ExemplarAppender
|
|
}
|
|
|
|
// A Queryable handles queries against a storage.
|
|
// Use it when you need to have access to all samples without chunk encoding abstraction e.g promQL.
|
|
type Queryable interface {
|
|
// Querier returns a new Querier on the storage.
|
|
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
|
}
|
|
|
|
// A MockQueryable is used for testing purposes so that a mock Querier can be used.
|
|
type MockQueryable struct {
|
|
MockQuerier Querier
|
|
}
|
|
|
|
func (q *MockQueryable) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
|
return q.MockQuerier, nil
|
|
}
|
|
|
|
// Querier provides querying access over time series data of a fixed time range.
|
|
type Querier interface {
|
|
LabelQuerier
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
|
|
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
|
|
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
|
|
}
|
|
|
|
// MockQuerier is used for test purposes to mock the selected series that is returned.
|
|
type MockQuerier struct {
|
|
SelectMockFunction func(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
|
|
}
|
|
|
|
func (q *MockQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
func (q *MockQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
func (q *MockQuerier) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func (q *MockQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet {
|
|
return q.SelectMockFunction(sortSeries, hints, matchers...)
|
|
}
|
|
|
|
// A ChunkQueryable handles queries against a storage.
|
|
// Use it when you need to have access to samples in encoded format.
|
|
type ChunkQueryable interface {
|
|
// ChunkQuerier returns a new ChunkQuerier on the storage.
|
|
ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error)
|
|
}
|
|
|
|
// ChunkQuerier provides querying access over time series data of a fixed time range.
|
|
type ChunkQuerier interface {
|
|
LabelQuerier
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
|
|
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
|
|
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet
|
|
}
|
|
|
|
// LabelQuerier provides querying access over labels.
|
|
type LabelQuerier interface {
|
|
// LabelValues returns all potential values for a label name.
|
|
// It is not safe to use the strings beyond the lifetime of the querier.
|
|
// If matchers are specified the returned result set is reduced
|
|
// to label values of metrics matching the matchers.
|
|
LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error)
|
|
|
|
// LabelNames returns all the unique label names present in the block in sorted order.
|
|
// If matchers are specified the returned result set is reduced
|
|
// to label names of metrics matching the matchers.
|
|
LabelNames(matchers ...*labels.Matcher) ([]string, Warnings, error)
|
|
|
|
// Close releases the resources of the Querier.
|
|
Close() error
|
|
}
|
|
|
|
type ExemplarQueryable interface {
|
|
// ExemplarQuerier returns a new ExemplarQuerier on the storage.
|
|
ExemplarQuerier(ctx context.Context) (ExemplarQuerier, error)
|
|
}
|
|
|
|
// ExemplarQuerier provides reading access to time series data.
|
|
type ExemplarQuerier interface {
|
|
// Select all the exemplars that match the matchers.
|
|
// Within a single slice of matchers, it is an intersection. Between the slices, it is a union.
|
|
Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error)
|
|
}
|
|
|
|
// SelectHints specifies hints passed for data selections.
|
|
// This is used only as an option for implementation to use.
|
|
type SelectHints struct {
|
|
Start int64 // Start time in milliseconds for this select.
|
|
End int64 // End time in milliseconds for this select.
|
|
|
|
Step int64 // Query step size in milliseconds.
|
|
Func string // String representation of surrounding function or aggregation.
|
|
|
|
Grouping []string // List of label names used in aggregation.
|
|
By bool // Indicate whether it is without or by.
|
|
Range int64 // Range vector selector range in milliseconds.
|
|
|
|
// DisableTrimming allows to disable trimming of matching series chunks based on query Start and End time.
|
|
// When disabled, the result may contain samples outside the queried time range but Select() performances
|
|
// may be improved.
|
|
DisableTrimming bool
|
|
}
|
|
|
|
// TODO(bwplotka): Move to promql/engine_test.go?
|
|
// QueryableFunc is an adapter to allow the use of ordinary functions as
|
|
// Queryables. It follows the idea of http.HandlerFunc.
|
|
type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error)
|
|
|
|
// Querier calls f() with the given parameters.
|
|
func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
|
return f(ctx, mint, maxt)
|
|
}
|
|
|
|
// Appender provides batched appends against a storage.
|
|
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
|
|
//
|
|
// Operations on the Appender interface are not goroutine-safe.
|
|
type Appender interface {
|
|
// Append adds a sample pair for the given series.
|
|
// An optional series reference can be provided to accelerate calls.
|
|
// A series reference number is returned which can be used to add further
|
|
// samples to the given series in the same or later transactions.
|
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
|
// to Append() at any point. Adding the sample via Append() returns a new
|
|
// reference number.
|
|
// If the reference is 0 it must not be used for caching.
|
|
Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)
|
|
|
|
// Commit submits the collected samples and purges the batch. If Commit
|
|
// returns a non-nil error, it also rolls back all modifications made in
|
|
// the appender so far, as Rollback would do. In any case, an Appender
|
|
// must not be used anymore after Commit has been called.
|
|
Commit() error
|
|
|
|
// Rollback rolls back all modifications made in the appender so far.
|
|
// Appender has to be discarded after rollback.
|
|
Rollback() error
|
|
ExemplarAppender
|
|
MetadataUpdater
|
|
}
|
|
|
|
// GetRef is an extra interface on Appenders used by downstream projects
|
|
// (e.g. Cortex) to avoid maintaining a parallel set of references.
|
|
type GetRef interface {
|
|
// Returns reference number that can be used to pass to Appender.Append(),
|
|
// and a set of labels that will not cause another copy when passed to Appender.Append().
|
|
// 0 means the appender does not have a reference to this series.
|
|
GetRef(lset labels.Labels) (SeriesRef, labels.Labels)
|
|
}
|
|
|
|
// ExemplarAppender provides an interface for adding samples to exemplar storage, which
|
|
// within Prometheus is in-memory only.
|
|
type ExemplarAppender interface {
|
|
// AppendExemplar adds an exemplar for the given series labels.
|
|
// An optional reference number can be provided to accelerate calls.
|
|
// A reference number is returned which can be used to add further
|
|
// exemplars in the same or later transactions.
|
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
|
// to Append() at any point. Adding the sample via Append() returns a new
|
|
// reference number.
|
|
// If the reference is 0 it must not be used for caching.
|
|
// Note that in our current implementation of Prometheus' exemplar storage
|
|
// calls to Append should generate the reference numbers, AppendExemplar
|
|
// generating a new reference number should be considered possible erroneous behaviour and be logged.
|
|
AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error)
|
|
}
|
|
|
|
// MetadataUpdater provides an interface for associating metadata to stored series.
|
|
type MetadataUpdater interface {
|
|
// UpdateMetadata updates a metadata entry for the given series and labels.
|
|
// A series reference number is returned which can be used to modify the
|
|
// metadata of the given series in the same or later transactions.
|
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
|
// to UpdateMetadata() at any point. If the series does not exist,
|
|
// UpdateMetadata returns an error.
|
|
// If the reference is 0 it must not be used for caching.
|
|
UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error)
|
|
}
|
|
|
|
// SeriesSet contains a set of series.
|
|
type SeriesSet interface {
|
|
Next() bool
|
|
// At returns full series. Returned series should be iterable even after Next is called.
|
|
At() Series
|
|
// The error that iteration as failed with.
|
|
// When an error occurs, set cannot continue to iterate.
|
|
Err() error
|
|
// A collection of warnings for the whole set.
|
|
// Warnings could be return even iteration has not failed with error.
|
|
Warnings() Warnings
|
|
}
|
|
|
|
var emptySeriesSet = errSeriesSet{}
|
|
|
|
// EmptySeriesSet returns a series set that's always empty.
|
|
func EmptySeriesSet() SeriesSet {
|
|
return emptySeriesSet
|
|
}
|
|
|
|
type testSeriesSet struct {
|
|
series Series
|
|
}
|
|
|
|
func (s testSeriesSet) Next() bool { return true }
|
|
func (s testSeriesSet) At() Series { return s.series }
|
|
func (s testSeriesSet) Err() error { return nil }
|
|
func (s testSeriesSet) Warnings() Warnings { return nil }
|
|
|
|
// TestSeriesSet returns a mock series set
|
|
func TestSeriesSet(series Series) SeriesSet {
|
|
return testSeriesSet{series: series}
|
|
}
|
|
|
|
type errSeriesSet struct {
|
|
err error
|
|
}
|
|
|
|
func (s errSeriesSet) Next() bool { return false }
|
|
func (s errSeriesSet) At() Series { return nil }
|
|
func (s errSeriesSet) Err() error { return s.err }
|
|
func (s errSeriesSet) Warnings() Warnings { return nil }
|
|
|
|
// ErrSeriesSet returns a series set that wraps an error.
|
|
func ErrSeriesSet(err error) SeriesSet {
|
|
return errSeriesSet{err: err}
|
|
}
|
|
|
|
var emptyChunkSeriesSet = errChunkSeriesSet{}
|
|
|
|
// EmptyChunkSeriesSet returns a chunk series set that's always empty.
|
|
func EmptyChunkSeriesSet() ChunkSeriesSet {
|
|
return emptyChunkSeriesSet
|
|
}
|
|
|
|
type errChunkSeriesSet struct {
|
|
err error
|
|
}
|
|
|
|
func (s errChunkSeriesSet) Next() bool { return false }
|
|
func (s errChunkSeriesSet) At() ChunkSeries { return nil }
|
|
func (s errChunkSeriesSet) Err() error { return s.err }
|
|
func (s errChunkSeriesSet) Warnings() Warnings { return nil }
|
|
|
|
// ErrChunkSeriesSet returns a chunk series set that wraps an error.
|
|
func ErrChunkSeriesSet(err error) ChunkSeriesSet {
|
|
return errChunkSeriesSet{err: err}
|
|
}
|
|
|
|
// Series exposes a single time series and allows iterating over samples.
|
|
type Series interface {
|
|
Labels
|
|
SampleIterable
|
|
}
|
|
|
|
type mockSeries struct {
|
|
timestamps []int64
|
|
values []float64
|
|
labelSet []string
|
|
}
|
|
|
|
func (s mockSeries) Labels() labels.Labels {
|
|
return labels.FromStrings(s.labelSet...)
|
|
}
|
|
|
|
func (s mockSeries) Iterator() chunkenc.Iterator {
|
|
return chunkenc.MockSeriesIterator(s.timestamps, s.values)
|
|
}
|
|
|
|
// MockSeries returns a series with custom timestamps, values and labelSet.
|
|
func MockSeries(timestamps []int64, values []float64, labelSet []string) Series {
|
|
return mockSeries{
|
|
timestamps: timestamps,
|
|
values: values,
|
|
labelSet: labelSet,
|
|
}
|
|
}
|
|
|
|
// ChunkSeriesSet contains a set of chunked series.
|
|
type ChunkSeriesSet interface {
|
|
Next() bool
|
|
// At returns full chunk series. Returned series should be iterable even after Next is called.
|
|
At() ChunkSeries
|
|
// The error that iteration has failed with.
|
|
// When an error occurs, set cannot continue to iterate.
|
|
Err() error
|
|
// A collection of warnings for the whole set.
|
|
// Warnings could be return even iteration has not failed with error.
|
|
Warnings() Warnings
|
|
}
|
|
|
|
// ChunkSeries exposes a single time series and allows iterating over chunks.
|
|
type ChunkSeries interface {
|
|
Labels
|
|
ChunkIterable
|
|
}
|
|
|
|
// Labels represents an item that has labels e.g. time series.
|
|
type Labels interface {
|
|
// Labels returns the complete set of labels. For series it means all labels identifying the series.
|
|
Labels() labels.Labels
|
|
}
|
|
|
|
type SampleIterable interface {
|
|
// Iterator returns a new, independent iterator of the data of the series.
|
|
Iterator() chunkenc.Iterator
|
|
}
|
|
|
|
type ChunkIterable interface {
|
|
// Iterator returns a new, independent iterator that iterates over potentially overlapping
|
|
// chunks of the series, sorted by min time.
|
|
Iterator() chunks.Iterator
|
|
}
|
|
|
|
type Warnings []error
|