2017-05-10 02:44:13 -07:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
2017-10-16 18:26:38 -07:00
// limitations under the License.
2017-05-10 02:44:13 -07:00
package storage
import (
2020-06-24 06:41:52 -07:00
"bytes"
2017-05-10 02:44:13 -07:00
"container/heap"
2017-10-04 12:04:15 -07:00
"context"
2018-11-19 02:21:14 -08:00
"sort"
2017-05-10 02:44:13 -07:00
"strings"
2020-06-09 09:57:31 -07:00
"sync"
2017-05-10 02:44:13 -07:00
2017-08-11 11:45:52 -07:00
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
2018-11-19 02:21:14 -08:00
"github.com/pkg/errors"
2017-10-18 04:08:14 -07:00
"github.com/prometheus/common/model"
2017-05-10 02:44:13 -07:00
"github.com/prometheus/prometheus/pkg/labels"
2020-02-06 07:58:38 -08:00
"github.com/prometheus/prometheus/tsdb/chunkenc"
2020-03-24 13:15:47 -07:00
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
2017-05-10 02:44:13 -07:00
)
type fanout struct {
2017-08-11 11:45:52 -07:00
logger log . Logger
2017-07-12 07:50:26 -07:00
primary Storage
secondaries [ ] Storage
2017-05-10 02:44:13 -07:00
}
2020-06-09 09:57:31 -07:00
// NewFanout returns a new fanout Storage, which proxies reads and writes
2017-05-10 02:44:13 -07:00
// through to multiple underlying storages.
2020-06-09 09:57:31 -07:00
//
// The difference between primary and secondary Storage is only for read (Querier) path and it goes as follows:
// * If the primary querier returns an error, then any of the Querier operations will fail.
// * If any secondary querier returns an error the result from that queries is discarded. The overall operation will succeed,
// and the error from the secondary querier will be returned as a warning.
//
// NOTE: In the case of Prometheus, it treats all remote storages as secondary / best effort.
2017-08-11 11:45:52 -07:00
func NewFanout ( logger log . Logger , primary Storage , secondaries ... Storage ) Storage {
2017-05-10 02:44:13 -07:00
return & fanout {
2017-08-11 11:45:52 -07:00
logger : logger ,
2017-07-12 07:50:26 -07:00
primary : primary ,
secondaries : secondaries ,
2017-05-10 02:44:13 -07:00
}
}
2017-10-18 04:08:14 -07:00
// StartTime implements the Storage interface.
func ( f * fanout ) StartTime ( ) ( int64 , error ) {
// StartTime of a fanout should be the earliest StartTime of all its storages,
// both primary and secondaries.
firstTime , err := f . primary . StartTime ( )
if err != nil {
return int64 ( model . Latest ) , err
}
2020-06-09 09:57:31 -07:00
for _ , s := range f . secondaries {
t , err := s . StartTime ( )
2017-10-18 04:08:14 -07:00
if err != nil {
return int64 ( model . Latest ) , err
}
if t < firstTime {
firstTime = t
}
}
return firstTime , nil
}
2017-10-04 12:04:15 -07:00
func ( f * fanout ) Querier ( ctx context . Context , mint , maxt int64 ) ( Querier , error ) {
2020-06-09 09:57:31 -07:00
primary , err := f . primary . Querier ( ctx , mint , maxt )
2017-07-12 07:50:26 -07:00
if err != nil {
return nil , err
}
2020-06-09 09:57:31 -07:00
secondaries := make ( [ ] Querier , 0 , len ( f . secondaries ) )
2017-07-12 07:50:26 -07:00
for _ , storage := range f . secondaries {
2017-10-04 12:04:15 -07:00
querier , err := storage . Querier ( ctx , mint , maxt )
2017-05-10 02:44:13 -07:00
if err != nil {
2020-06-09 09:57:31 -07:00
// Close already open Queriers, append potential errors to returned error.
errs := tsdb_errors . MultiError { err }
errs . Add ( primary . Close ( ) )
for _ , q := range secondaries {
errs . Add ( q . Close ( ) )
2020-03-24 13:15:47 -07:00
}
2020-06-09 09:57:31 -07:00
return nil , errs . Err ( )
2017-05-10 02:44:13 -07:00
}
2020-06-09 09:57:31 -07:00
secondaries = append ( secondaries , querier )
}
return NewMergeQuerier ( primary , secondaries , ChainedSeriesMerge ) , nil
2017-05-10 02:44:13 -07:00
}
2020-06-24 06:41:52 -07:00
func ( f * fanout ) ChunkQuerier ( ctx context . Context , mint , maxt int64 ) ( ChunkQuerier , error ) {
primary , err := f . primary . ChunkQuerier ( ctx , mint , maxt )
if err != nil {
return nil , err
}
secondaries := make ( [ ] ChunkQuerier , 0 , len ( f . secondaries ) )
for _ , storage := range f . secondaries {
querier , err := storage . ChunkQuerier ( ctx , mint , maxt )
if err != nil {
// Close already open Queriers, append potential errors to returned error.
errs := tsdb_errors . MultiError { err }
errs . Add ( primary . Close ( ) )
for _ , q := range secondaries {
errs . Add ( q . Close ( ) )
}
return nil , errs . Err ( )
}
secondaries = append ( secondaries , querier )
}
return NewMergeChunkQuerier ( primary , secondaries , NewCompactingChunkSeriesMerger ( ChainedSeriesMerge ) ) , nil
}
2020-02-06 07:58:38 -08:00
func ( f * fanout ) Appender ( ) Appender {
primary := f . primary . Appender ( )
2017-07-12 07:50:26 -07:00
secondaries := make ( [ ] Appender , 0 , len ( f . secondaries ) )
for _ , storage := range f . secondaries {
2020-02-06 07:58:38 -08:00
secondaries = append ( secondaries , storage . Appender ( ) )
2017-05-10 02:44:13 -07:00
}
return & fanoutAppender {
2017-08-11 11:45:52 -07:00
logger : f . logger ,
2017-07-12 07:50:26 -07:00
primary : primary ,
secondaries : secondaries ,
2020-02-06 07:58:38 -08:00
}
2017-05-10 02:44:13 -07:00
}
// Close closes the storage and all its underlying resources.
func ( f * fanout ) Close ( ) error {
2020-06-09 09:57:31 -07:00
errs := tsdb_errors . MultiError { }
errs . Add ( f . primary . Close ( ) )
for _ , s := range f . secondaries {
errs . Add ( s . Close ( ) )
2017-07-12 07:50:26 -07:00
}
2020-06-09 09:57:31 -07:00
return errs . Err ( )
2017-05-10 02:44:13 -07:00
}
// fanoutAppender implements Appender.
type fanoutAppender struct {
2017-08-11 11:45:52 -07:00
logger log . Logger
2017-07-12 07:50:26 -07:00
primary Appender
secondaries [ ] Appender
2017-05-10 02:44:13 -07:00
}
2017-09-07 05:14:41 -07:00
func ( f * fanoutAppender ) Add ( l labels . Labels , t int64 , v float64 ) ( uint64 , error ) {
2017-07-12 07:50:26 -07:00
ref , err := f . primary . Add ( l , t , v )
if err != nil {
return ref , err
}
for _ , appender := range f . secondaries {
2017-05-10 02:44:13 -07:00
if _ , err := appender . Add ( l , t , v ) ; err != nil {
2017-09-07 05:14:41 -07:00
return 0 , err
2017-05-10 02:44:13 -07:00
}
}
2017-07-12 07:50:26 -07:00
return ref , nil
2017-05-10 02:44:13 -07:00
}
2020-02-06 07:58:38 -08:00
func ( f * fanoutAppender ) AddFast ( ref uint64 , t int64 , v float64 ) error {
if err := f . primary . AddFast ( ref , t , v ) ; err != nil {
2017-07-12 07:50:26 -07:00
return err
}
for _ , appender := range f . secondaries {
2020-02-06 07:58:38 -08:00
if err := appender . AddFast ( ref , t , v ) ; err != nil {
2017-07-12 04:41:27 -07:00
return err
}
}
return nil
2017-05-10 02:44:13 -07:00
}
2017-07-12 07:50:26 -07:00
func ( f * fanoutAppender ) Commit ( ) ( err error ) {
err = f . primary . Commit ( )
for _ , appender := range f . secondaries {
if err == nil {
err = appender . Commit ( )
} else {
if rollbackErr := appender . Rollback ( ) ; rollbackErr != nil {
2017-08-11 11:45:52 -07:00
level . Error ( f . logger ) . Log ( "msg" , "Squashed rollback error on commit" , "err" , rollbackErr )
2017-07-12 07:50:26 -07:00
}
2017-05-10 02:44:13 -07:00
}
}
2017-07-12 07:50:26 -07:00
return
2017-05-10 02:44:13 -07:00
}
2017-07-12 07:50:26 -07:00
func ( f * fanoutAppender ) Rollback ( ) ( err error ) {
err = f . primary . Rollback ( )
for _ , appender := range f . secondaries {
rollbackErr := appender . Rollback ( )
if err == nil {
err = rollbackErr
} else if rollbackErr != nil {
2017-08-11 11:45:52 -07:00
level . Error ( f . logger ) . Log ( "msg" , "Squashed rollback error on rollback" , "err" , rollbackErr )
2017-05-10 02:44:13 -07:00
}
}
return nil
}
2020-03-24 13:15:47 -07:00
type mergeGenericQuerier struct {
2020-06-09 09:57:31 -07:00
queriers [ ] genericQuerier
2017-10-27 04:29:05 -07:00
2020-06-09 09:57:31 -07:00
// mergeFn is used when we see series from different queriers Selects with the same labels.
mergeFn genericSeriesMergeFunc
}
2018-11-30 06:27:12 -08:00
2020-06-09 09:57:31 -07:00
// NewMergeQuerier returns a new Querier that merges results of given primary and slice of secondary queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primary + secondaries Selects, merge function will be used.
func NewMergeQuerier ( primary Querier , secondaries [ ] Querier , mergeFn VerticalSeriesMergeFunc ) Querier {
queriers := make ( [ ] genericQuerier , 0 , len ( secondaries ) + 1 )
if primary != nil {
queriers = append ( queriers , newGenericQuerierFrom ( primary ) )
}
for _ , querier := range secondaries {
if _ , ok := querier . ( noopQuerier ) ; ! ok && querier != nil {
queriers = append ( queriers , newSecondaryQuerierFrom ( querier ) )
}
2020-03-24 13:15:47 -07:00
}
return & querierAdapter { & mergeGenericQuerier {
2020-06-09 09:57:31 -07:00
mergeFn : ( & seriesMergerAdapter { VerticalSeriesMergeFunc : mergeFn } ) . Merge ,
queriers : queriers ,
2020-03-24 13:15:47 -07:00
} }
}
2020-06-09 09:57:31 -07:00
// NewMergeChunkQuerier returns a new ChunkQuerier that merges results of given primary and slice of secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primary + secondaries Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
2020-06-24 06:41:52 -07:00
func NewMergeChunkQuerier ( primary ChunkQuerier , secondaries [ ] ChunkQuerier , mergeFn VerticalChunkSeriesMergeFunc ) ChunkQuerier {
2020-06-09 09:57:31 -07:00
queriers := make ( [ ] genericQuerier , 0 , len ( secondaries ) + 1 )
if primary != nil {
queriers = append ( queriers , newGenericQuerierFromChunk ( primary ) )
}
for _ , querier := range secondaries {
2020-03-24 13:15:47 -07:00
if _ , ok := querier . ( noopChunkQuerier ) ; ! ok && querier != nil {
2020-06-09 09:57:31 -07:00
queriers = append ( queriers , newSecondaryQuerierFromChunk ( querier ) )
2017-10-27 04:29:05 -07:00
}
2017-05-10 02:44:13 -07:00
}
2020-03-24 13:15:47 -07:00
return & chunkQuerierAdapter { & mergeGenericQuerier {
2020-06-24 06:41:52 -07:00
mergeFn : ( & chunkSeriesMergerAdapter { VerticalChunkSeriesMergeFunc : mergeFn } ) . Merge ,
2020-06-09 09:57:31 -07:00
queriers : queriers ,
2020-03-24 13:15:47 -07:00
} }
2017-05-10 02:44:13 -07:00
}
// Select returns a set of series that matches the given label matchers.
2020-06-09 09:57:31 -07:00
func ( q * mergeGenericQuerier ) Select ( sortSeries bool , hints * SelectHints , matchers ... * labels . Matcher ) genericSeriesSet {
2020-03-12 02:36:09 -07:00
if len ( q . queriers ) == 1 {
return q . queriers [ 0 ] . Select ( sortSeries , hints , matchers ... )
2020-01-17 03:21:44 -08:00
}
2020-02-11 09:19:34 -08:00
2020-03-12 02:36:09 -07:00
var (
2020-06-09 09:57:31 -07:00
seriesSets = make ( [ ] genericSeriesSet , 0 , len ( q . queriers ) )
wg sync . WaitGroup
seriesSetChan = make ( chan genericSeriesSet )
2020-03-12 02:36:09 -07:00
)
2020-06-09 09:57:31 -07:00
// Schedule all Selects for all queriers we know about.
2017-05-10 02:44:13 -07:00
for _ , querier := range q . queriers {
2020-06-09 09:57:31 -07:00
wg . Add ( 1 )
2020-03-24 13:15:47 -07:00
go func ( qr genericQuerier ) {
2020-06-09 09:57:31 -07:00
defer wg . Done ( )
2020-03-12 02:36:09 -07:00
// We need to sort for NewMergeSeriesSet to work.
2020-06-09 09:57:31 -07:00
seriesSetChan <- qr . Select ( true , hints , matchers ... )
2020-02-11 09:19:34 -08:00
} ( querier )
}
2020-06-09 09:57:31 -07:00
go func ( ) {
wg . Wait ( )
close ( seriesSetChan )
} ( )
for r := range seriesSetChan {
seriesSets = append ( seriesSets , r )
}
return & lazySeriesSet { create : create ( seriesSets , q . mergeFn ) }
}
2020-06-24 06:41:52 -07:00
func create ( seriesSets [ ] genericSeriesSet , mergeFunc genericSeriesMergeFunc ) func ( ) ( genericSeriesSet , bool ) {
2020-06-09 09:57:31 -07:00
// Returned function gets called with the first call to Next().
return func ( ) ( genericSeriesSet , bool ) {
if len ( seriesSets ) == 1 {
return seriesSets [ 0 ] , seriesSets [ 0 ] . Next ( )
2018-11-30 06:27:12 -08:00
}
2020-06-09 09:57:31 -07:00
var h genericSeriesSetHeap
for _ , set := range seriesSets {
if set == nil {
continue
}
if set . Next ( ) {
heap . Push ( & h , set )
continue
}
// When primary fails ignore results from secondaries.
// Only the primary querier returns error.
if err := set . Err ( ) ; err != nil {
return errorOnlySeriesSet { err } , false
2018-11-30 06:27:12 -08:00
}
2017-11-23 04:50:06 -08:00
}
2020-06-09 09:57:31 -07:00
set := & genericMergeSeriesSet {
2020-06-24 06:41:52 -07:00
mergeFunc : mergeFunc ,
sets : seriesSets ,
heap : h ,
2020-06-09 09:57:31 -07:00
}
return set , set . Next ( )
2017-05-10 02:44:13 -07:00
}
}
// LabelValues returns all potential values for a label name.
2020-03-24 13:15:47 -07:00
func ( q * mergeGenericQuerier ) LabelValues ( name string ) ( [ ] string , Warnings , error ) {
2020-06-09 09:57:31 -07:00
var (
results [ ] [ ] string
warnings Warnings
)
2017-05-10 02:44:13 -07:00
for _ , querier := range q . queriers {
2019-06-17 00:31:17 -07:00
values , wrn , err := querier . LabelValues ( name )
if wrn != nil {
2020-06-09 09:57:31 -07:00
// TODO(bwplotka): We could potentially wrap warnings.
2019-06-17 00:31:17 -07:00
warnings = append ( warnings , wrn ... )
}
2017-05-10 02:44:13 -07:00
if err != nil {
2020-06-09 09:57:31 -07:00
return nil , nil , errors . Wrapf ( err , "LabelValues() from Querier for label %s" , name )
2017-05-10 02:44:13 -07:00
}
results = append ( results , values )
}
2019-06-17 00:31:17 -07:00
return mergeStringSlices ( results ) , warnings , nil
2017-05-10 02:44:13 -07:00
}
func mergeStringSlices ( ss [ ] [ ] string ) [ ] string {
switch len ( ss ) {
case 0 :
return nil
case 1 :
return ss [ 0 ]
case 2 :
return mergeTwoStringSlices ( ss [ 0 ] , ss [ 1 ] )
default :
halfway := len ( ss ) / 2
return mergeTwoStringSlices (
mergeStringSlices ( ss [ : halfway ] ) ,
mergeStringSlices ( ss [ halfway : ] ) ,
)
}
}
func mergeTwoStringSlices ( a , b [ ] string ) [ ] string {
i , j := 0 , 0
result := make ( [ ] string , 0 , len ( a ) + len ( b ) )
for i < len ( a ) && j < len ( b ) {
switch strings . Compare ( a [ i ] , b [ j ] ) {
case 0 :
result = append ( result , a [ i ] )
i ++
j ++
2017-07-13 03:05:38 -07:00
case - 1 :
2017-05-10 02:44:13 -07:00
result = append ( result , a [ i ] )
i ++
2017-07-13 03:05:38 -07:00
case 1 :
2017-05-10 02:44:13 -07:00
result = append ( result , b [ j ] )
j ++
}
}
2017-07-13 03:05:38 -07:00
result = append ( result , a [ i : ] ... )
result = append ( result , b [ j : ] ... )
2017-05-10 02:44:13 -07:00
return result
}
2018-11-19 02:21:14 -08:00
// LabelNames returns all the unique label names present in the block in sorted order.
2020-03-24 13:15:47 -07:00
func ( q * mergeGenericQuerier ) LabelNames ( ) ( [ ] string , Warnings , error ) {
2020-06-24 06:41:52 -07:00
var (
labelNamesMap = make ( map [ string ] struct { } )
warnings Warnings
)
2020-03-24 13:15:47 -07:00
for _ , querier := range q . queriers {
names , wrn , err := querier . LabelNames ( )
2019-06-17 00:31:17 -07:00
if wrn != nil {
2020-06-09 09:57:31 -07:00
// TODO(bwplotka): We could potentially wrap warnings.
2019-06-17 00:31:17 -07:00
warnings = append ( warnings , wrn ... )
}
2018-11-19 02:21:14 -08:00
if err != nil {
2020-06-09 09:57:31 -07:00
return nil , nil , errors . Wrap ( err , "LabelNames() from Querier" )
2018-11-19 02:21:14 -08:00
}
for _ , name := range names {
labelNamesMap [ name ] = struct { } { }
}
}
2020-06-09 09:57:31 -07:00
if len ( labelNamesMap ) == 0 {
return nil , warnings , nil
}
2018-11-19 02:21:14 -08:00
labelNames := make ( [ ] string , 0 , len ( labelNamesMap ) )
for name := range labelNamesMap {
labelNames = append ( labelNames , name )
}
sort . Strings ( labelNames )
2019-06-17 00:31:17 -07:00
return labelNames , warnings , nil
2018-11-19 02:21:14 -08:00
}
2017-05-10 02:44:13 -07:00
// Close releases the resources of the Querier.
2020-03-24 13:15:47 -07:00
func ( q * mergeGenericQuerier ) Close ( ) error {
2020-06-09 09:57:31 -07:00
errs := tsdb_errors . MultiError { }
2017-05-10 02:44:13 -07:00
for _ , querier := range q . queriers {
if err := querier . Close ( ) ; err != nil {
2020-03-24 13:15:47 -07:00
errs . Add ( err )
2017-05-10 02:44:13 -07:00
}
}
2020-03-24 13:15:47 -07:00
return errs . Err ( )
2017-05-10 02:44:13 -07:00
}
2020-03-24 13:15:47 -07:00
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func ( ... Series ) Series
2020-06-24 06:41:52 -07:00
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
func NewMergeSeriesSet ( sets [ ] SeriesSet , mergeFunc VerticalSeriesMergeFunc ) SeriesSet {
2020-03-24 13:15:47 -07:00
genericSets := make ( [ ] genericSeriesSet , 0 , len ( sets ) )
for _ , s := range sets {
genericSets = append ( genericSets , & genericSeriesSetAdapter { s } )
}
2020-06-24 06:41:52 -07:00
return & seriesSetAdapter { newGenericMergeSeriesSet ( genericSets , ( & seriesMergerAdapter { VerticalSeriesMergeFunc : mergeFunc } ) . Merge ) }
2020-03-24 13:15:47 -07:00
}
2020-06-24 06:41:52 -07:00
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
// chunk series with the same labels into single ChunkSeries.
//
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
type VerticalChunkSeriesMergeFunc func ( ... ChunkSeries ) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet ( sets [ ] ChunkSeriesSet , mergeFunc VerticalChunkSeriesMergeFunc ) ChunkSeriesSet {
2020-03-24 13:15:47 -07:00
genericSets := make ( [ ] genericSeriesSet , 0 , len ( sets ) )
for _ , s := range sets {
genericSets = append ( genericSets , & genericChunkSeriesSetAdapter { s } )
}
2020-06-24 06:41:52 -07:00
return & chunkSeriesSetAdapter { newGenericMergeSeriesSet ( genericSets , ( & chunkSeriesMergerAdapter { VerticalChunkSeriesMergeFunc : mergeFunc } ) . Merge ) }
2020-06-09 09:57:31 -07:00
}
// genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct {
currentLabels labels . Labels
2020-06-24 06:41:52 -07:00
mergeFunc genericSeriesMergeFunc
2020-06-09 09:57:31 -07:00
heap genericSeriesSetHeap
sets [ ] genericSeriesSet
currentSets [ ] genericSeriesSet
2020-03-24 13:15:47 -07:00
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
2020-06-09 09:57:31 -07:00
// series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise
2020-01-30 06:00:32 -08:00
// merged series set will be incorrect.
2020-06-24 06:41:52 -07:00
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet ( sets [ ] genericSeriesSet , mergeFunc genericSeriesMergeFunc ) genericSeriesSet {
2018-01-25 03:17:16 -08:00
if len ( sets ) == 1 {
return sets [ 0 ]
}
2020-06-09 09:57:31 -07:00
// We are pre-advancing sets, so we can introspect the label of the
2017-05-10 02:44:13 -07:00
// series under the cursor.
2020-03-24 13:15:47 -07:00
var h genericSeriesSetHeap
2017-05-10 02:44:13 -07:00
for _ , set := range sets {
2018-11-30 06:27:12 -08:00
if set == nil {
continue
}
2017-05-10 02:44:13 -07:00
if set . Next ( ) {
heap . Push ( & h , set )
}
}
2020-03-24 13:15:47 -07:00
return & genericMergeSeriesSet {
2020-06-24 06:41:52 -07:00
mergeFunc : mergeFunc ,
sets : sets ,
heap : h ,
2017-05-10 02:44:13 -07:00
}
}
2020-03-24 13:15:47 -07:00
func ( c * genericMergeSeriesSet ) Next ( ) bool {
2018-11-30 06:27:12 -08:00
// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
for {
2020-06-09 09:57:31 -07:00
// Firstly advance all the current series sets. If any of them have run out
2018-11-30 06:27:12 -08:00
// we can drop them, otherwise they should be inserted back into the heap.
for _ , set := range c . currentSets {
if set . Next ( ) {
heap . Push ( & c . heap , set )
}
}
2020-06-09 09:57:31 -07:00
2018-11-30 06:27:12 -08:00
if len ( c . heap ) == 0 {
return false
2017-05-10 02:44:13 -07:00
}
2018-11-30 06:27:12 -08:00
// Now, pop items of the heap that have equal label sets.
c . currentSets = nil
c . currentLabels = c . heap [ 0 ] . At ( ) . Labels ( )
for len ( c . heap ) > 0 && labels . Equal ( c . currentLabels , c . heap [ 0 ] . At ( ) . Labels ( ) ) {
2020-03-24 13:15:47 -07:00
set := heap . Pop ( & c . heap ) . ( genericSeriesSet )
2018-11-30 06:27:12 -08:00
c . currentSets = append ( c . currentSets , set )
}
// As long as the current set contains at least 1 set,
// then it should return true.
if len ( c . currentSets ) != 0 {
break
}
2017-05-10 02:44:13 -07:00
}
return true
}
2020-03-24 13:15:47 -07:00
func ( c * genericMergeSeriesSet ) At ( ) Labels {
2018-01-25 03:11:55 -08:00
if len ( c . currentSets ) == 1 {
return c . currentSets [ 0 ] . At ( )
}
2020-03-24 13:15:47 -07:00
series := make ( [ ] Labels , 0 , len ( c . currentSets ) )
2017-05-10 02:44:13 -07:00
for _ , seriesSet := range c . currentSets {
series = append ( series , seriesSet . At ( ) )
}
2020-06-24 06:41:52 -07:00
return c . mergeFunc ( series ... )
2017-05-10 02:44:13 -07:00
}
2020-03-24 13:15:47 -07:00
func ( c * genericMergeSeriesSet ) Err ( ) error {
2017-05-10 02:44:13 -07:00
for _ , set := range c . sets {
if err := set . Err ( ) ; err != nil {
return err
}
}
return nil
}
2020-06-09 09:57:31 -07:00
func ( c * genericMergeSeriesSet ) Warnings ( ) Warnings {
var ws Warnings
for _ , set := range c . sets {
ws = append ( ws , set . Warnings ( ) ... )
}
return ws
}
2020-03-24 13:15:47 -07:00
type genericSeriesSetHeap [ ] genericSeriesSet
2017-05-10 02:44:13 -07:00
2020-03-24 13:15:47 -07:00
func ( h genericSeriesSetHeap ) Len ( ) int { return len ( h ) }
func ( h genericSeriesSetHeap ) Swap ( i , j int ) { h [ i ] , h [ j ] = h [ j ] , h [ i ] }
2017-05-10 02:44:13 -07:00
2020-03-24 13:15:47 -07:00
func ( h genericSeriesSetHeap ) Less ( i , j int ) bool {
2017-05-10 02:44:13 -07:00
a , b := h [ i ] . At ( ) . Labels ( ) , h [ j ] . At ( ) . Labels ( )
return labels . Compare ( a , b ) < 0
}
2020-03-24 13:15:47 -07:00
func ( h * genericSeriesSetHeap ) Push ( x interface { } ) {
* h = append ( * h , x . ( genericSeriesSet ) )
2017-05-10 02:44:13 -07:00
}
2020-03-24 13:15:47 -07:00
func ( h * genericSeriesSetHeap ) Pop ( ) interface { } {
2017-05-10 02:44:13 -07:00
old := * h
n := len ( old )
x := old [ n - 1 ]
* h = old [ 0 : n - 1 ]
return x
}
2020-06-24 06:41:52 -07:00
// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
//
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// NOTE: Use this only when you see potentially overlapping series, as this introduces small overhead to handle overlaps
// between series.
2020-03-24 13:15:47 -07:00
func ChainedSeriesMerge ( s ... Series ) Series {
if len ( s ) == 0 {
return nil
}
return & chainSeries {
labels : s [ 0 ] . Labels ( ) ,
series : s ,
}
}
type chainSeries struct {
2017-05-10 02:44:13 -07:00
labels labels . Labels
series [ ] Series
}
2020-03-24 13:15:47 -07:00
func ( m * chainSeries ) Labels ( ) labels . Labels {
2017-05-10 02:44:13 -07:00
return m . labels
}
2020-03-24 13:15:47 -07:00
func ( m * chainSeries ) Iterator ( ) chunkenc . Iterator {
2020-02-06 07:58:38 -08:00
iterators := make ( [ ] chunkenc . Iterator , 0 , len ( m . series ) )
2017-05-10 02:44:13 -07:00
for _ , s := range m . series {
iterators = append ( iterators , s . Iterator ( ) )
}
2020-03-24 13:15:47 -07:00
return newChainSampleIterator ( iterators )
2017-05-10 02:44:13 -07:00
}
2020-06-24 06:41:52 -07:00
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
2020-03-24 13:15:47 -07:00
type chainSampleIterator struct {
2020-02-06 07:58:38 -08:00
iterators [ ] chunkenc . Iterator
2020-03-24 13:15:47 -07:00
h samplesIteratorHeap
2017-05-10 02:44:13 -07:00
}
2020-03-24 13:15:47 -07:00
func newChainSampleIterator ( iterators [ ] chunkenc . Iterator ) chunkenc . Iterator {
return & chainSampleIterator {
2017-05-10 02:44:13 -07:00
iterators : iterators ,
h : nil ,
}
}
2020-03-24 13:15:47 -07:00
func ( c * chainSampleIterator ) Seek ( t int64 ) bool {
c . h = samplesIteratorHeap { }
2017-05-10 02:44:13 -07:00
for _ , iter := range c . iterators {
if iter . Seek ( t ) {
heap . Push ( & c . h , iter )
}
}
return len ( c . h ) > 0
}
2020-03-24 13:15:47 -07:00
func ( c * chainSampleIterator ) At ( ) ( t int64 , v float64 ) {
2017-07-13 06:40:29 -07:00
if len ( c . h ) == 0 {
2020-03-24 13:15:47 -07:00
panic ( "chainSampleIterator.At() called after .Next() returned false." )
2017-07-13 06:40:29 -07:00
}
2017-05-10 02:44:13 -07:00
return c . h [ 0 ] . At ( )
}
2020-03-24 13:15:47 -07:00
func ( c * chainSampleIterator ) Next ( ) bool {
2017-05-10 02:44:13 -07:00
if c . h == nil {
2017-07-13 03:05:38 -07:00
for _ , iter := range c . iterators {
if iter . Next ( ) {
heap . Push ( & c . h , iter )
}
}
2018-05-23 04:15:47 -07:00
2017-07-13 03:05:38 -07:00
return len ( c . h ) > 0
2017-05-10 02:44:13 -07:00
}
if len ( c . h ) == 0 {
return false
}
2017-07-13 03:05:38 -07:00
2018-06-18 09:34:08 -07:00
currt , _ := c . At ( )
2018-05-23 04:15:47 -07:00
for len ( c . h ) > 0 {
2018-06-18 09:34:08 -07:00
nextt , _ := c . h [ 0 ] . At ( )
2020-03-24 13:15:47 -07:00
// All but one of the overlapping samples will be dropped.
2018-06-18 09:34:08 -07:00
if nextt != currt {
2018-05-23 04:15:47 -07:00
break
}
2020-02-06 07:58:38 -08:00
iter := heap . Pop ( & c . h ) . ( chunkenc . Iterator )
2018-05-23 04:15:47 -07:00
if iter . Next ( ) {
heap . Push ( & c . h , iter )
}
2017-05-10 02:44:13 -07:00
}
2017-07-13 03:05:38 -07:00
2017-05-10 02:44:13 -07:00
return len ( c . h ) > 0
}
2020-03-24 13:15:47 -07:00
func ( c * chainSampleIterator ) Err ( ) error {
2020-06-24 06:41:52 -07:00
var errs tsdb_errors . MultiError
2017-05-10 02:44:13 -07:00
for _ , iter := range c . iterators {
if err := iter . Err ( ) ; err != nil {
2020-06-24 06:41:52 -07:00
errs . Add ( err )
2017-05-10 02:44:13 -07:00
}
}
2020-06-24 06:41:52 -07:00
return errs . Err ( )
2017-05-10 02:44:13 -07:00
}
2020-03-24 13:15:47 -07:00
type samplesIteratorHeap [ ] chunkenc . Iterator
2017-05-10 02:44:13 -07:00
2020-03-24 13:15:47 -07:00
func ( h samplesIteratorHeap ) Len ( ) int { return len ( h ) }
func ( h samplesIteratorHeap ) Swap ( i , j int ) { h [ i ] , h [ j ] = h [ j ] , h [ i ] }
2017-05-10 02:44:13 -07:00
2020-03-24 13:15:47 -07:00
func ( h samplesIteratorHeap ) Less ( i , j int ) bool {
2017-05-10 02:44:13 -07:00
at , _ := h [ i ] . At ( )
bt , _ := h [ j ] . At ( )
return at < bt
}
2020-03-24 13:15:47 -07:00
func ( h * samplesIteratorHeap ) Push ( x interface { } ) {
2020-02-06 07:58:38 -08:00
* h = append ( * h , x . ( chunkenc . Iterator ) )
2017-05-10 02:44:13 -07:00
}
2020-03-24 13:15:47 -07:00
func ( h * samplesIteratorHeap ) Pop ( ) interface { } {
old := * h
n := len ( old )
x := old [ n - 1 ]
* h = old [ 0 : n - 1 ]
return x
}
2020-06-24 06:41:52 -07:00
type compactChunkSeriesMerger struct {
mergeFunc VerticalSeriesMergeFunc
2020-03-24 13:15:47 -07:00
labels labels . Labels
series [ ] ChunkSeries
}
2020-06-24 06:41:52 -07:00
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
// Samples from overlapped chunks are merged using series vertical merge func.
2020-03-24 13:15:47 -07:00
// It expects the same labels for each given series.
2020-06-24 06:41:52 -07:00
//
// NOTE: Use this only when you see potentially overlapping series, as this introduces small overhead to handle overlaps
// between series.
func NewCompactingChunkSeriesMerger ( mergeFunc VerticalSeriesMergeFunc ) VerticalChunkSeriesMergeFunc {
2020-03-24 13:15:47 -07:00
return func ( s ... ChunkSeries ) ChunkSeries {
if len ( s ) == 0 {
return nil
}
2020-06-24 06:41:52 -07:00
return & compactChunkSeriesMerger {
mergeFunc : mergeFunc ,
labels : s [ 0 ] . Labels ( ) ,
series : s ,
2020-03-24 13:15:47 -07:00
}
}
}
2020-06-24 06:41:52 -07:00
func ( s * compactChunkSeriesMerger ) Labels ( ) labels . Labels {
2020-03-24 13:15:47 -07:00
return s . labels
}
2020-06-24 06:41:52 -07:00
func ( s * compactChunkSeriesMerger ) Iterator ( ) chunks . Iterator {
2020-03-24 13:15:47 -07:00
iterators := make ( [ ] chunks . Iterator , 0 , len ( s . series ) )
for _ , series := range s . series {
iterators = append ( iterators , series . Iterator ( ) )
}
2020-06-24 06:41:52 -07:00
return & compactChunkIterator {
mergeFunc : s . mergeFunc ,
labels : s . labels ,
iterators : iterators ,
2020-03-24 13:15:47 -07:00
}
}
2020-06-24 06:41:52 -07:00
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
2020-03-24 13:15:47 -07:00
2020-06-24 06:41:52 -07:00
labels labels . Labels
2020-03-24 13:15:47 -07:00
iterators [ ] chunks . Iterator
2020-06-24 06:41:52 -07:00
h chunkIteratorHeap
2020-03-24 13:15:47 -07:00
}
2020-06-24 06:41:52 -07:00
func ( c * compactChunkIterator ) At ( ) chunks . Meta {
2020-03-24 13:15:47 -07:00
if len ( c . h ) == 0 {
2020-06-24 06:41:52 -07:00
panic ( "compactChunkIterator.At() called after .Next() returned false." )
2020-03-24 13:15:47 -07:00
}
return c . h [ 0 ] . At ( )
}
2020-06-24 06:41:52 -07:00
func ( c * compactChunkIterator ) Next ( ) bool {
2020-03-24 13:15:47 -07:00
if c . h == nil {
for _ , iter := range c . iterators {
if iter . Next ( ) {
heap . Push ( & c . h , iter )
}
}
return len ( c . h ) > 0
}
if len ( c . h ) == 0 {
return false
}
2020-06-24 06:41:52 -07:00
// Detect overlaps to compact.
// Be smart about it and deduplicate on the fly if chunks are identical.
2020-03-24 13:15:47 -07:00
last := c . At ( )
2020-06-24 06:41:52 -07:00
var overlapped [ ] Series
2020-03-24 13:15:47 -07:00
for {
iter := heap . Pop ( & c . h ) . ( chunks . Iterator )
if iter . Next ( ) {
heap . Push ( & c . h , iter )
}
if len ( c . h ) == 0 {
break
}
2020-06-24 06:41:52 -07:00
// Get the current oldest chunk by min, then max time.
2020-03-24 13:15:47 -07:00
next := c . At ( )
if next . MinTime > last . MaxTime {
// No overlap with last one.
break
}
2020-06-24 06:41:52 -07:00
if next . MinTime == last . MinTime &&
next . MaxTime == last . MaxTime &&
bytes . Equal ( next . Chunk . Bytes ( ) , last . Chunk . Bytes ( ) ) {
// 1:1 duplicates, skip last.
continue
}
overlapped = append ( overlapped , & chunkToSeriesDecoder {
labels : c . labels ,
Meta : last ,
} )
2020-03-24 13:15:47 -07:00
last = next
}
2020-06-24 06:41:52 -07:00
if len ( overlapped ) == 0 {
return len ( c . h ) > 0
2020-03-24 13:15:47 -07:00
}
2020-06-24 06:41:52 -07:00
// Add last, not yet included overlap.
overlapped = append ( overlapped , & chunkToSeriesDecoder {
labels : c . labels ,
Meta : c . At ( ) ,
} )
var chkSeries ChunkSeries = & seriesToChunkEncoder { Series : c . mergeFunc ( overlapped ... ) }
heap . Push ( & c . h , chkSeries )
return true
2020-03-24 13:15:47 -07:00
}
2020-06-24 06:41:52 -07:00
func ( c * compactChunkIterator ) Err ( ) error {
var errs tsdb_errors . MultiError
2020-03-24 13:15:47 -07:00
for _ , iter := range c . iterators {
if err := iter . Err ( ) ; err != nil {
2020-06-24 06:41:52 -07:00
errs . Add ( err )
2020-03-24 13:15:47 -07:00
}
}
2020-06-24 06:41:52 -07:00
return errs . Err ( )
2020-03-24 13:15:47 -07:00
}
type chunkIteratorHeap [ ] chunks . Iterator
func ( h chunkIteratorHeap ) Len ( ) int { return len ( h ) }
func ( h chunkIteratorHeap ) Swap ( i , j int ) { h [ i ] , h [ j ] = h [ j ] , h [ i ] }
func ( h chunkIteratorHeap ) Less ( i , j int ) bool {
at := h [ i ] . At ( )
bt := h [ j ] . At ( )
if at . MinTime == bt . MinTime {
return at . MaxTime < bt . MaxTime
}
return at . MinTime < bt . MinTime
}
func ( h * chunkIteratorHeap ) Push ( x interface { } ) {
* h = append ( * h , x . ( chunks . Iterator ) )
}
func ( h * chunkIteratorHeap ) Pop ( ) interface { } {
2017-05-10 02:44:13 -07:00
old := * h
n := len ( old )
x := old [ n - 1 ]
* h = old [ 0 : n - 1 ]
return x
}