mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
66dfb951c4
* Add errors and Warnings to SeriesSet Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Change Querier interface and refactor accordingly Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Refactor promql/engine to propagate warnings at eval stage Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Address review issues Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Make sure all the series from all Selects are pre-advanced Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Address review issues Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Separate merge series sets Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Clean Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Refactor merge querier failure handling Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Refactored and simplified fanout with improvements from incoming chunk iterator PRs. * Secondary logic is hidden, instead of weird failed series set logic we had. * Fanout is well commented * Fanout closing record all errors * MergeQuerier improved API (clearer) * deferredGenericMergeSeriesSet is not needed as we return no samples anyway for failed series sets (next = false). Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fix formatting Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Fix CI issues Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Added final tests for error handling. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed Brian's comments. * Moved hints in populate to be allocated only when needed. * Used sync.Once in secondary Querier to achieve all-or-nothing partial response logic. * Select after first Next is done will panic. NOTE: in lazySeriesSet in theory we could just panic, I think however we can totally just return error, it will panic in expand anyway. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Utilize errWithWarnings Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Fix recently introduced expansion issue Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Add tests for secondary querier error handling Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Implement lazy merge Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Add name to test cases Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Reorganize Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Address review comments Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Address review comments Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Remove redundant warnings Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Fix rebase mistake Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
234 lines
8.2 KiB
Go
234 lines
8.2 KiB
Go
// Copyright 2014 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
|
)
|
|
|
|
// The errors exposed.
|
|
var (
|
|
ErrNotFound = errors.New("not found")
|
|
ErrOutOfOrderSample = errors.New("out of order sample")
|
|
ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp")
|
|
ErrOutOfBounds = errors.New("out of bounds")
|
|
)
|
|
|
|
// Appendable allows creating appenders.
|
|
type Appendable interface {
|
|
// Appender returns a new appender for the storage.
|
|
Appender() Appender
|
|
}
|
|
|
|
// Storage ingests and manages samples, along with various indexes. All methods
|
|
// are goroutine-safe. Storage implements storage.SampleAppender.
|
|
// TODO(bwplotka): Add ChunkQueryable to Storage in next PR.
|
|
type Storage interface {
|
|
Queryable
|
|
Appendable
|
|
|
|
// StartTime returns the oldest timestamp stored in the storage.
|
|
StartTime() (int64, error)
|
|
|
|
// Close closes the storage and all its underlying resources.
|
|
Close() error
|
|
}
|
|
|
|
// A Queryable handles queries against a storage.
|
|
// Use it when you need to have access to all samples without chunk encoding abstraction e.g promQL.
|
|
type Queryable interface {
|
|
// Querier returns a new Querier on the storage.
|
|
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
|
}
|
|
|
|
// Querier provides querying access over time series data of a fixed time range.
|
|
type Querier interface {
|
|
baseQuerier
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
|
|
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
|
|
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
|
|
}
|
|
|
|
// A ChunkQueryable handles queries against a storage.
|
|
// Use it when you need to have access to samples in encoded format.
|
|
type ChunkQueryable interface {
|
|
// ChunkQuerier returns a new ChunkQuerier on the storage.
|
|
ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, Warnings, error)
|
|
}
|
|
|
|
// ChunkQuerier provides querying access over time series data of a fixed time range.
|
|
type ChunkQuerier interface {
|
|
baseQuerier
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
|
|
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
|
|
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet
|
|
}
|
|
|
|
type baseQuerier interface {
|
|
// LabelValues returns all potential values for a label name.
|
|
// It is not safe to use the strings beyond the lifefime of the querier.
|
|
LabelValues(name string) ([]string, Warnings, error)
|
|
|
|
// LabelNames returns all the unique label names present in the block in sorted order.
|
|
LabelNames() ([]string, Warnings, error)
|
|
|
|
// Close releases the resources of the Querier.
|
|
Close() error
|
|
}
|
|
|
|
// SelectHints specifies hints passed for data selections.
|
|
// This is used only as an option for implementation to use.
|
|
type SelectHints struct {
|
|
Start int64 // Start time in milliseconds for this select.
|
|
End int64 // End time in milliseconds for this select.
|
|
|
|
Step int64 // Query step size in milliseconds.
|
|
Func string // String representation of surrounding function or aggregation.
|
|
|
|
Grouping []string // List of label names used in aggregation.
|
|
By bool // Indicate whether it is without or by.
|
|
Range int64 // Range vector selector range in milliseconds.
|
|
}
|
|
|
|
// QueryableFunc is an adapter to allow the use of ordinary functions as
|
|
// Queryables. It follows the idea of http.HandlerFunc.
|
|
type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error)
|
|
|
|
// Querier calls f() with the given parameters.
|
|
func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
|
return f(ctx, mint, maxt)
|
|
}
|
|
|
|
// Appender provides batched appends against a storage.
|
|
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
|
|
//
|
|
// Operations on the Appender interface are not goroutine-safe.
|
|
type Appender interface {
|
|
// Add adds a sample pair for the given series. A reference number is
|
|
// returned which can be used to add further samples in the same or later
|
|
// transactions.
|
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
|
// to AddFast() at any point. Adding the sample via Add() returns a new
|
|
// reference number.
|
|
// If the reference is 0 it must not be used for caching.
|
|
Add(l labels.Labels, t int64, v float64) (uint64, error)
|
|
|
|
// AddFast adds a sample pair for the referenced series. It is generally
|
|
// faster than adding a sample by providing its full label set.
|
|
AddFast(ref uint64, t int64, v float64) error
|
|
|
|
// Commit submits the collected samples and purges the batch. If Commit
|
|
// returns a non-nil error, it also rolls back all modifications made in
|
|
// the appender so far, as Rollback would do. In any case, an Appender
|
|
// must not be used anymore after Commit has been called.
|
|
Commit() error
|
|
|
|
// Rollback rolls back all modifications made in the appender so far.
|
|
// Appender has to be discarded after rollback.
|
|
Rollback() error
|
|
}
|
|
|
|
// SeriesSet contains a set of series.
|
|
type SeriesSet interface {
|
|
Next() bool
|
|
At() Series
|
|
// The error that iteration as failed with.
|
|
// When an error occurs, set cannot continue to iterate.
|
|
Err() error
|
|
// A collection of warnings for the whole set.
|
|
// Warnings could be return even iteration has not failed with error.
|
|
Warnings() Warnings
|
|
}
|
|
|
|
var emptySeriesSet = errSeriesSet{}
|
|
|
|
// EmptySeriesSet returns a series set that's always empty.
|
|
func EmptySeriesSet() SeriesSet {
|
|
return emptySeriesSet
|
|
}
|
|
|
|
type errSeriesSet struct {
|
|
ws Warnings
|
|
err error
|
|
}
|
|
|
|
func (s errSeriesSet) Next() bool { return false }
|
|
func (s errSeriesSet) At() Series { return nil }
|
|
func (s errSeriesSet) Err() error { return s.err }
|
|
func (s errSeriesSet) Warnings() Warnings { return s.ws }
|
|
|
|
// ErrSeriesSet returns a series set that wraps an error.
|
|
func ErrSeriesSet(err error) SeriesSet {
|
|
return errSeriesSet{err: err}
|
|
}
|
|
|
|
// Series exposes a single time series and allows iterating over samples.
|
|
type Series interface {
|
|
Labels
|
|
SampleIteratable
|
|
}
|
|
|
|
// ChunkSeriesSet contains a set of chunked series.
|
|
type ChunkSeriesSet interface {
|
|
Next() bool
|
|
At() ChunkSeries
|
|
// The error that iteration has failed with.
|
|
// When an error occurs, set cannot continue to iterate.
|
|
Err() error
|
|
// A collection of warnings for the whole set.
|
|
// Warnings could be return even iteration has not failed with error.
|
|
Warnings() Warnings
|
|
}
|
|
|
|
// ChunkSeries exposes a single time series and allows iterating over chunks.
|
|
type ChunkSeries interface {
|
|
Labels
|
|
ChunkIteratable
|
|
}
|
|
|
|
// Labels represents an item that has labels e.g. time series.
|
|
type Labels interface {
|
|
// Labels returns the complete set of labels. For series it means all labels identifying the series.
|
|
Labels() labels.Labels
|
|
}
|
|
|
|
type SampleIteratable interface {
|
|
// Iterator returns a new iterator of the data of the series.
|
|
Iterator() chunkenc.Iterator
|
|
}
|
|
|
|
type ChunkIteratable interface {
|
|
// ChunkIterator returns a new iterator that iterates over non-overlapping chunks of the series.
|
|
Iterator() chunks.Iterator
|
|
}
|
|
|
|
// TODO(bwplotka): Remove in next Pr.
|
|
type DeprecatedChunkSeriesSet interface {
|
|
Next() bool
|
|
At() (labels.Labels, []chunks.Meta, tombstones.Intervals)
|
|
Err() error
|
|
}
|
|
|
|
type Warnings []error
|