// Copyright 2017 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package storage import ( "container/heap" "context" "sort" "strings" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) type fanout struct { logger log.Logger primary Storage secondaries []Storage } // NewFanout returns a new fan-out Storage, which proxies reads and writes // through to multiple underlying storages. func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage { return &fanout{ logger: logger, primary: primary, secondaries: secondaries, } } // StartTime implements the Storage interface. func (f *fanout) StartTime() (int64, error) { // StartTime of a fanout should be the earliest StartTime of all its storages, // both primary and secondaries. firstTime, err := f.primary.StartTime() if err != nil { return int64(model.Latest), err } for _, storage := range f.secondaries { t, err := storage.StartTime() if err != nil { return int64(model.Latest), err } if t < firstTime { firstTime = t } } return firstTime, nil } func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { queriers := make([]Querier, 0, 1+len(f.secondaries)) // Add primary querier primaryQuerier, err := f.primary.Querier(ctx, mint, maxt) if err != nil { return nil, err } queriers = append(queriers, primaryQuerier) // Add secondary queriers for _, storage := range f.secondaries { querier, err := storage.Querier(ctx, mint, maxt) if err != nil { NewMergeQuerier(primaryQuerier, queriers).Close() return nil, err } queriers = append(queriers, querier) } return NewMergeQuerier(primaryQuerier, queriers), nil } func (f *fanout) Appender() (Appender, error) { primary, err := f.primary.Appender() if err != nil { return nil, err } secondaries := make([]Appender, 0, len(f.secondaries)) for _, storage := range f.secondaries { appender, err := storage.Appender() if err != nil { return nil, err } secondaries = append(secondaries, appender) } return &fanoutAppender{ logger: f.logger, primary: primary, secondaries: secondaries, }, nil } // Close closes the storage and all its underlying resources. func (f *fanout) Close() error { if err := f.primary.Close(); err != nil { return err } // TODO return multiple errors? var lastErr error for _, storage := range f.secondaries { if err := storage.Close(); err != nil { lastErr = err } } return lastErr } // fanoutAppender implements Appender. type fanoutAppender struct { logger log.Logger primary Appender secondaries []Appender } func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { ref, err := f.primary.Add(l, t, v) if err != nil { return ref, err } for _, appender := range f.secondaries { if _, err := appender.Add(l, t, v); err != nil { return 0, err } } return ref, nil } func (f *fanoutAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error { if err := f.primary.AddFast(l, ref, t, v); err != nil { return err } for _, appender := range f.secondaries { if _, err := appender.Add(l, t, v); err != nil { return err } } return nil } func (f *fanoutAppender) Commit() (err error) { err = f.primary.Commit() for _, appender := range f.secondaries { if err == nil { err = appender.Commit() } else { if rollbackErr := appender.Rollback(); rollbackErr != nil { level.Error(f.logger).Log("msg", "Squashed rollback error on commit", "err", rollbackErr) } } } return } func (f *fanoutAppender) Rollback() (err error) { err = f.primary.Rollback() for _, appender := range f.secondaries { rollbackErr := appender.Rollback() if err == nil { err = rollbackErr } else if rollbackErr != nil { level.Error(f.logger).Log("msg", "Squashed rollback error on rollback", "err", rollbackErr) } } return nil } // mergeQuerier implements Querier. type mergeQuerier struct { primaryQuerier Querier queriers []Querier failedQueriers map[Querier]struct{} setQuerierMap map[SeriesSet]Querier } // NewMergeQuerier returns a new Querier that merges results of input queriers. // NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it, // and will filter NoopQueriers from its arguments, in order to reduce overhead // when only one querier is passed. func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { filtered := make([]Querier, 0, len(queriers)) for _, querier := range queriers { if querier != NoopQuerier() { filtered = append(filtered, querier) } } setQuerierMap := make(map[SeriesSet]Querier) failedQueriers := make(map[Querier]struct{}) switch len(filtered) { case 0: return NoopQuerier() case 1: return filtered[0] default: return &mergeQuerier{ primaryQuerier: primaryQuerier, queriers: filtered, failedQueriers: failedQueriers, setQuerierMap: setQuerierMap, } } } // Select returns a set of series that matches the given label matchers. func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { if len(q.queriers) != 1 { // We need to sort for NewMergeSeriesSet to work. return q.SelectSorted(params, matchers...) } return q.queriers[0].Select(params, matchers...) } // SelectSorted returns a set of sorted series that matches the given label matchers. func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { seriesSets := make([]SeriesSet, 0, len(q.queriers)) var warnings Warnings var priErr error = nil type queryResult struct { qr Querier set SeriesSet wrn Warnings selectError error } queryResultChan := make(chan *queryResult) for _, querier := range q.queriers { go func(qr Querier) { set, wrn, err := qr.SelectSorted(params, matchers...) queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err} }(querier) } for i := 0; i < len(q.queriers); i++ { qryResult := <-queryResultChan q.setQuerierMap[qryResult.set] = qryResult.qr if qryResult.wrn != nil { warnings = append(warnings, qryResult.wrn...) } if qryResult.selectError != nil { q.failedQueriers[qryResult.qr] = struct{}{} // If the error source isn't the primary querier, return the error as a warning and continue. if qryResult.qr != q.primaryQuerier { warnings = append(warnings, qryResult.selectError) } else { priErr = qryResult.selectError } } } if priErr != nil { return nil, nil, priErr } return NewMergeSeriesSet(seriesSets, q), warnings, nil } // LabelValues returns all potential values for a label name. func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) { var results [][]string var warnings Warnings for _, querier := range q.queriers { values, wrn, err := querier.LabelValues(name) if wrn != nil { warnings = append(warnings, wrn...) } if err != nil { q.failedQueriers[querier] = struct{}{} // If the error source isn't the primary querier, return the error as a warning and continue. if querier != q.primaryQuerier { warnings = append(warnings, err) continue } else { return nil, nil, err } } results = append(results, values) } return mergeStringSlices(results), warnings, nil } func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool { _, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]] return isFailedQuerier } func mergeStringSlices(ss [][]string) []string { switch len(ss) { case 0: return nil case 1: return ss[0] case 2: return mergeTwoStringSlices(ss[0], ss[1]) default: halfway := len(ss) / 2 return mergeTwoStringSlices( mergeStringSlices(ss[:halfway]), mergeStringSlices(ss[halfway:]), ) } } func mergeTwoStringSlices(a, b []string) []string { i, j := 0, 0 result := make([]string, 0, len(a)+len(b)) for i < len(a) && j < len(b) { switch strings.Compare(a[i], b[j]) { case 0: result = append(result, a[i]) i++ j++ case -1: result = append(result, a[i]) i++ case 1: result = append(result, b[j]) j++ } } result = append(result, a[i:]...) result = append(result, b[j:]...) return result } // LabelNames returns all the unique label names present in the block in sorted order. func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) { labelNamesMap := make(map[string]struct{}) var warnings Warnings for _, b := range q.queriers { names, wrn, err := b.LabelNames() if wrn != nil { warnings = append(warnings, wrn...) } if err != nil { // If the error source isn't the primary querier, return the error as a warning and continue. if b != q.primaryQuerier { warnings = append(warnings, err) continue } else { return nil, nil, errors.Wrap(err, "LabelNames() from Querier") } } for _, name := range names { labelNamesMap[name] = struct{}{} } } labelNames := make([]string, 0, len(labelNamesMap)) for name := range labelNamesMap { labelNames = append(labelNames, name) } sort.Strings(labelNames) return labelNames, warnings, nil } // Close releases the resources of the Querier. func (q *mergeQuerier) Close() error { // TODO return multiple errors? var lastErr error for _, querier := range q.queriers { if err := querier.Close(); err != nil { lastErr = err } } return lastErr } // mergeSeriesSet implements SeriesSet type mergeSeriesSet struct { currentLabels labels.Labels currentSets []SeriesSet heap seriesSetHeap sets []SeriesSet querier *mergeQuerier } // NewMergeSeriesSet returns a new series set that merges (deduplicates) // series returned by the input series sets when iterating. // Each input series set must return its series in labels order, otherwise // merged series set will be incorrect. func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet { if len(sets) == 1 { return sets[0] } // Sets need to be pre-advanced, so we can introspect the label of the // series under the cursor. var h seriesSetHeap for _, set := range sets { if set == nil { continue } if set.Next() { heap.Push(&h, set) } } return &mergeSeriesSet{ heap: h, sets: sets, querier: querier, } } func (c *mergeSeriesSet) Next() bool { // Run in a loop because the "next" series sets may not be valid anymore. // If a remote querier fails, we discard all series sets from that querier. // If, for the current label set, all the next series sets come from // failed remote storage sources, we want to keep trying with the next label set. for { // Firstly advance all the current series sets. If any of them have run out // we can drop them, otherwise they should be inserted back into the heap. for _, set := range c.currentSets { if set.Next() { heap.Push(&c.heap, set) } } if len(c.heap) == 0 { return false } // Now, pop items of the heap that have equal label sets. c.currentSets = nil c.currentLabels = c.heap[0].At().Labels() for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) { set := heap.Pop(&c.heap).(SeriesSet) if c.querier != nil && c.querier.IsFailedSet(set) { continue } c.currentSets = append(c.currentSets, set) } // As long as the current set contains at least 1 set, // then it should return true. if len(c.currentSets) != 0 { break } } return true } func (c *mergeSeriesSet) At() Series { if len(c.currentSets) == 1 { return c.currentSets[0].At() } series := []Series{} for _, seriesSet := range c.currentSets { series = append(series, seriesSet.At()) } return &mergeSeries{ labels: c.currentLabels, series: series, } } func (c *mergeSeriesSet) Err() error { for _, set := range c.sets { if err := set.Err(); err != nil { return err } } return nil } type seriesSetHeap []SeriesSet func (h seriesSetHeap) Len() int { return len(h) } func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h seriesSetHeap) Less(i, j int) bool { a, b := h[i].At().Labels(), h[j].At().Labels() return labels.Compare(a, b) < 0 } func (h *seriesSetHeap) Push(x interface{}) { *h = append(*h, x.(SeriesSet)) } func (h *seriesSetHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x } type mergeSeries struct { labels labels.Labels series []Series } func (m *mergeSeries) Labels() labels.Labels { return m.labels } func (m *mergeSeries) Iterator() SeriesIterator { iterators := make([]SeriesIterator, 0, len(m.series)) for _, s := range m.series { iterators = append(iterators, s.Iterator()) } return newMergeIterator(iterators) } type mergeIterator struct { iterators []SeriesIterator h seriesIteratorHeap } func newMergeIterator(iterators []SeriesIterator) SeriesIterator { return &mergeIterator{ iterators: iterators, h: nil, } } func (c *mergeIterator) Seek(t int64) bool { c.h = seriesIteratorHeap{} for _, iter := range c.iterators { if iter.Seek(t) { heap.Push(&c.h, iter) } } return len(c.h) > 0 } func (c *mergeIterator) At() (t int64, v float64) { if len(c.h) == 0 { panic("mergeIterator.At() called after .Next() returned false.") } return c.h[0].At() } func (c *mergeIterator) Next() bool { if c.h == nil { for _, iter := range c.iterators { if iter.Next() { heap.Push(&c.h, iter) } } return len(c.h) > 0 } if len(c.h) == 0 { return false } currt, _ := c.At() for len(c.h) > 0 { nextt, _ := c.h[0].At() if nextt != currt { break } iter := heap.Pop(&c.h).(SeriesIterator) if iter.Next() { heap.Push(&c.h, iter) } } return len(c.h) > 0 } func (c *mergeIterator) Err() error { for _, iter := range c.iterators { if err := iter.Err(); err != nil { return err } } return nil } type seriesIteratorHeap []SeriesIterator func (h seriesIteratorHeap) Len() int { return len(h) } func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h seriesIteratorHeap) Less(i, j int) bool { at, _ := h[i].At() bt, _ := h[j].At() return at < bt } func (h *seriesIteratorHeap) Push(x interface{}) { *h = append(*h, x.(SeriesIterator)) } func (h *seriesIteratorHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x }