mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-28 23:19:41 -08:00
bf4a279a91
Currently all read queries are simply pushed to remote read clients. This is fine, except for remote storage for wich it unefficient and make query slower even if remote read is unnecessary. So we need instead to compare the oldest timestamp in primary/local storage with the query range lower boundary. If the oldest timestamp is older than the mint parameter, then there is no need for remote read. This is an optionnal behavior per remote read client. Signed-off-by: Thibault Chataigner <t.chataigner@criteo.com>
463 lines
10 KiB
Go
463 lines
10 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"strings"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
)
|
|
|
|
type fanout struct {
|
|
logger log.Logger
|
|
|
|
primary Storage
|
|
secondaries []Storage
|
|
}
|
|
|
|
// NewFanout returns a new fan-out Storage, which proxies reads and writes
|
|
// through to multiple underlying storages.
|
|
func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage {
|
|
return &fanout{
|
|
logger: logger,
|
|
primary: primary,
|
|
secondaries: secondaries,
|
|
}
|
|
}
|
|
|
|
// StartTime implements the Storage interface.
|
|
func (f *fanout) StartTime() (int64, error) {
|
|
// StartTime of a fanout should be the earliest StartTime of all its storages,
|
|
// both primary and secondaries.
|
|
firstTime, err := f.primary.StartTime()
|
|
if err != nil {
|
|
return int64(model.Latest), err
|
|
}
|
|
|
|
for _, storage := range f.secondaries {
|
|
t, err := storage.StartTime()
|
|
if err != nil {
|
|
return int64(model.Latest), err
|
|
}
|
|
if t < firstTime {
|
|
firstTime = t
|
|
}
|
|
}
|
|
return firstTime, nil
|
|
}
|
|
|
|
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
|
queriers := mergeQuerier{
|
|
queriers: make([]Querier, 0, 1+len(f.secondaries)),
|
|
}
|
|
|
|
// Add primary querier
|
|
querier, err := f.primary.Querier(ctx, mint, maxt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
queriers.queriers = append(queriers.queriers, querier)
|
|
|
|
// Add secondary queriers
|
|
for _, storage := range f.secondaries {
|
|
querier, err := storage.Querier(ctx, mint, maxt)
|
|
if err != nil {
|
|
queriers.Close()
|
|
return nil, err
|
|
}
|
|
queriers.queriers = append(queriers.queriers, querier)
|
|
}
|
|
return &queriers, nil
|
|
}
|
|
|
|
func (f *fanout) Appender() (Appender, error) {
|
|
primary, err := f.primary.Appender()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
secondaries := make([]Appender, 0, len(f.secondaries))
|
|
for _, storage := range f.secondaries {
|
|
appender, err := storage.Appender()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
secondaries = append(secondaries, appender)
|
|
}
|
|
return &fanoutAppender{
|
|
logger: f.logger,
|
|
primary: primary,
|
|
secondaries: secondaries,
|
|
}, nil
|
|
}
|
|
|
|
// Close closes the storage and all its underlying resources.
|
|
func (f *fanout) Close() error {
|
|
if err := f.primary.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO return multiple errors?
|
|
var lastErr error
|
|
for _, storage := range f.secondaries {
|
|
if err := storage.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
// fanoutAppender implements Appender.
|
|
type fanoutAppender struct {
|
|
logger log.Logger
|
|
|
|
primary Appender
|
|
secondaries []Appender
|
|
}
|
|
|
|
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
|
ref, err := f.primary.Add(l, t, v)
|
|
if err != nil {
|
|
return ref, err
|
|
}
|
|
|
|
for _, appender := range f.secondaries {
|
|
if _, err := appender.Add(l, t, v); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
return ref, nil
|
|
}
|
|
|
|
func (f *fanoutAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error {
|
|
if err := f.primary.AddFast(l, ref, t, v); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, appender := range f.secondaries {
|
|
if _, err := appender.Add(l, t, v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fanoutAppender) Commit() (err error) {
|
|
err = f.primary.Commit()
|
|
|
|
for _, appender := range f.secondaries {
|
|
if err == nil {
|
|
err = appender.Commit()
|
|
} else {
|
|
if rollbackErr := appender.Rollback(); rollbackErr != nil {
|
|
level.Error(f.logger).Log("msg", "Squashed rollback error on commit", "err", rollbackErr)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (f *fanoutAppender) Rollback() (err error) {
|
|
err = f.primary.Rollback()
|
|
|
|
for _, appender := range f.secondaries {
|
|
rollbackErr := appender.Rollback()
|
|
if err == nil {
|
|
err = rollbackErr
|
|
} else if rollbackErr != nil {
|
|
level.Error(f.logger).Log("msg", "Squashed rollback error on rollback", "err", rollbackErr)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// mergeQuerier implements Querier.
|
|
type mergeQuerier struct {
|
|
queriers []Querier
|
|
}
|
|
|
|
// NewMergeQuerier returns a new Querier that merges results of input queriers.
|
|
func NewMergeQuerier(queriers []Querier) Querier {
|
|
return &mergeQuerier{
|
|
queriers: queriers,
|
|
}
|
|
}
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet {
|
|
seriesSets := make([]SeriesSet, 0, len(q.queriers))
|
|
for _, querier := range q.queriers {
|
|
seriesSets = append(seriesSets, querier.Select(matchers...))
|
|
}
|
|
return newMergeSeriesSet(seriesSets)
|
|
}
|
|
|
|
// LabelValues returns all potential values for a label name.
|
|
func (q *mergeQuerier) LabelValues(name string) ([]string, error) {
|
|
var results [][]string
|
|
for _, querier := range q.queriers {
|
|
values, err := querier.LabelValues(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
results = append(results, values)
|
|
}
|
|
return mergeStringSlices(results), nil
|
|
}
|
|
|
|
func mergeStringSlices(ss [][]string) []string {
|
|
switch len(ss) {
|
|
case 0:
|
|
return nil
|
|
case 1:
|
|
return ss[0]
|
|
case 2:
|
|
return mergeTwoStringSlices(ss[0], ss[1])
|
|
default:
|
|
halfway := len(ss) / 2
|
|
return mergeTwoStringSlices(
|
|
mergeStringSlices(ss[:halfway]),
|
|
mergeStringSlices(ss[halfway:]),
|
|
)
|
|
}
|
|
}
|
|
|
|
func mergeTwoStringSlices(a, b []string) []string {
|
|
i, j := 0, 0
|
|
result := make([]string, 0, len(a)+len(b))
|
|
for i < len(a) && j < len(b) {
|
|
switch strings.Compare(a[i], b[j]) {
|
|
case 0:
|
|
result = append(result, a[i])
|
|
i++
|
|
j++
|
|
case -1:
|
|
result = append(result, a[i])
|
|
i++
|
|
case 1:
|
|
result = append(result, b[j])
|
|
j++
|
|
}
|
|
}
|
|
result = append(result, a[i:]...)
|
|
result = append(result, b[j:]...)
|
|
return result
|
|
}
|
|
|
|
// Close releases the resources of the Querier.
|
|
func (q *mergeQuerier) Close() error {
|
|
// TODO return multiple errors?
|
|
var lastErr error
|
|
for _, querier := range q.queriers {
|
|
if err := querier.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
// mergeSeriesSet implements SeriesSet
|
|
type mergeSeriesSet struct {
|
|
currentLabels labels.Labels
|
|
currentSets []SeriesSet
|
|
heap seriesSetHeap
|
|
sets []SeriesSet
|
|
}
|
|
|
|
func newMergeSeriesSet(sets []SeriesSet) SeriesSet {
|
|
// Sets need to be pre-advanced, so we can introspect the label of the
|
|
// series under the cursor.
|
|
var h seriesSetHeap
|
|
for _, set := range sets {
|
|
if set.Next() {
|
|
heap.Push(&h, set)
|
|
}
|
|
}
|
|
return &mergeSeriesSet{
|
|
heap: h,
|
|
sets: sets,
|
|
}
|
|
}
|
|
|
|
func (c *mergeSeriesSet) Next() bool {
|
|
// Firstly advance all the current series sets. If any of them have run out
|
|
// we can drop them, otherwise they should be inserted back into the heap.
|
|
for _, set := range c.currentSets {
|
|
if set.Next() {
|
|
heap.Push(&c.heap, set)
|
|
}
|
|
}
|
|
if len(c.heap) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Now, pop items of the heap that have equal label sets.
|
|
c.currentSets = nil
|
|
c.currentLabels = c.heap[0].At().Labels()
|
|
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
|
|
set := heap.Pop(&c.heap).(SeriesSet)
|
|
c.currentSets = append(c.currentSets, set)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *mergeSeriesSet) At() Series {
|
|
series := []Series{}
|
|
for _, seriesSet := range c.currentSets {
|
|
series = append(series, seriesSet.At())
|
|
}
|
|
return &mergeSeries{
|
|
labels: c.currentLabels,
|
|
series: series,
|
|
}
|
|
}
|
|
|
|
func (c *mergeSeriesSet) Err() error {
|
|
for _, set := range c.sets {
|
|
if err := set.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type seriesSetHeap []SeriesSet
|
|
|
|
func (h seriesSetHeap) Len() int { return len(h) }
|
|
func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h seriesSetHeap) Less(i, j int) bool {
|
|
a, b := h[i].At().Labels(), h[j].At().Labels()
|
|
return labels.Compare(a, b) < 0
|
|
}
|
|
|
|
func (h *seriesSetHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(SeriesSet))
|
|
}
|
|
|
|
func (h *seriesSetHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
type mergeSeries struct {
|
|
labels labels.Labels
|
|
series []Series
|
|
}
|
|
|
|
func (m *mergeSeries) Labels() labels.Labels {
|
|
return m.labels
|
|
}
|
|
|
|
func (m *mergeSeries) Iterator() SeriesIterator {
|
|
iterators := make([]SeriesIterator, 0, len(m.series))
|
|
for _, s := range m.series {
|
|
iterators = append(iterators, s.Iterator())
|
|
}
|
|
return newMergeIterator(iterators)
|
|
}
|
|
|
|
type mergeIterator struct {
|
|
iterators []SeriesIterator
|
|
h seriesIteratorHeap
|
|
}
|
|
|
|
func newMergeIterator(iterators []SeriesIterator) SeriesIterator {
|
|
return &mergeIterator{
|
|
iterators: iterators,
|
|
h: nil,
|
|
}
|
|
}
|
|
|
|
func (c *mergeIterator) Seek(t int64) bool {
|
|
c.h = seriesIteratorHeap{}
|
|
for _, iter := range c.iterators {
|
|
if iter.Seek(t) {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
func (c *mergeIterator) At() (t int64, v float64) {
|
|
if len(c.h) == 0 {
|
|
panic("mergeIterator.At() called after .Next() returned false.")
|
|
}
|
|
|
|
// TODO do I need to dedupe or just merge?
|
|
return c.h[0].At()
|
|
}
|
|
|
|
func (c *mergeIterator) Next() bool {
|
|
if c.h == nil {
|
|
for _, iter := range c.iterators {
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
if len(c.h) == 0 {
|
|
return false
|
|
}
|
|
|
|
iter := heap.Pop(&c.h).(SeriesIterator)
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
func (c *mergeIterator) Err() error {
|
|
for _, iter := range c.iterators {
|
|
if err := iter.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type seriesIteratorHeap []SeriesIterator
|
|
|
|
func (h seriesIteratorHeap) Len() int { return len(h) }
|
|
func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h seriesIteratorHeap) Less(i, j int) bool {
|
|
at, _ := h[i].At()
|
|
bt, _ := h[j].At()
|
|
return at < bt
|
|
}
|
|
|
|
func (h *seriesIteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(SeriesIterator))
|
|
}
|
|
|
|
func (h *seriesIteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|