diff --git a/pkg/labels/matcher.go b/pkg/labels/matcher.go new file mode 100644 index 0000000000..5d0a1ee82a --- /dev/null +++ b/pkg/labels/matcher.go @@ -0,0 +1,61 @@ +package labels + +import ( + "fmt" + "regexp" +) + +// MatchType is an enum for label matching types. +type MatchType int + +// Possible MatchTypes. +const ( + MatchEqual MatchType = iota + MatchNotEqual + MatchRegexp + MatchNotRegexp +) + +func (m MatchType) String() string { + typeToStr := map[MatchType]string{ + MatchEqual: "=", + MatchNotEqual: "!=", + MatchRegexp: "=~", + MatchNotRegexp: "!~", + } + if str, ok := typeToStr[m]; ok { + return str + } + panic("unknown match type") +} + +// Matcher models the matching of a label. +type Matcher struct { + Type MatchType + Name string + Value string + + re *regexp.Regexp +} + +// NewMatcher returns a matcher object. +func NewMatcher(t MatchType, n, v string) (*Matcher, error) { + m := &Matcher{ + Type: t, + Name: n, + Value: v, + } + if t == MatchRegexp || t == MatchNotRegexp { + m.Value = "^(?:" + v + ")$" + re, err := regexp.Compile(m.Value) + if err != nil { + return nil, err + } + m.re = re + } + return m, nil +} + +func (m *Matcher) String() string { + return fmt.Sprintf("%s%s%q", m.Name, m.Type, m.Value) +} diff --git a/promql/ast.go b/promql/ast.go index a765094d63..65c57221d0 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -15,13 +15,10 @@ package promql import ( "fmt" - "regexp" "time" - "unsafe" - "github.com/fabxc/tsdb" - tsdbLabels "github.com/fabxc/tsdb/labels" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" ) // Node is a generic interface for all nodes in an AST. @@ -134,11 +131,11 @@ type MatrixSelector struct { Name string Range time.Duration Offset time.Duration - LabelMatchers []*LabelMatcher + LabelMatchers []*labels.Matcher // The series iterators are populated at query preparation time. - series []tsdb.Series - iterators []*tsdb.BufferedSeriesIterator + series []storage.Series + iterators []*storage.BufferedSeriesIterator } // NumberLiteral represents a number. @@ -168,11 +165,11 @@ type UnaryExpr struct { type VectorSelector struct { Name string Offset time.Duration - LabelMatchers []*LabelMatcher + LabelMatchers []*labels.Matcher // The series iterators are populated at query preparation time. - series []tsdb.Series - iterators []*tsdb.BufferedSeriesIterator + series []storage.Series + iterators []*storage.BufferedSeriesIterator } func (e *AggregateExpr) Type() ValueType { return ValueTypeVector } @@ -318,91 +315,3 @@ func (f inspector) Visit(node Node) Visitor { func Inspect(node Node, f func(Node) bool) { Walk(inspector(f), node) } - -// MatchType is an enum for label matching types. -type MatchType int - -// Possible MatchTypes. -const ( - MatchEqual MatchType = iota - MatchNotEqual - MatchRegexp - MatchNotRegexp -) - -func (m MatchType) String() string { - typeToStr := map[MatchType]string{ - MatchEqual: "=", - MatchNotEqual: "!=", - MatchRegexp: "=~", - MatchNotRegexp: "!~", - } - if str, ok := typeToStr[m]; ok { - return str - } - panic("unknown match type") -} - -// LabelMatcher models the matching of a label. -type LabelMatcher struct { - Type MatchType - Name string - Value string - - re *regexp.Regexp -} - -// NewLabelMatcher returns a LabelMatcher object ready to use. -func NewLabelMatcher(t MatchType, n, v string) (*LabelMatcher, error) { - m := &LabelMatcher{ - Type: t, - Name: n, - Value: v, - } - if t == MatchRegexp || t == MatchNotRegexp { - m.Value = "^(?:" + v + ")$" - re, err := regexp.Compile(m.Value) - if err != nil { - return nil, err - } - m.re = re - } - return m, nil -} - -func toTSDBLabels(l labels.Labels) tsdbLabels.Labels { - return *(*tsdbLabels.Labels)(unsafe.Pointer(&l)) -} - -func toLabels(l tsdbLabels.Labels) labels.Labels { - return *(*labels.Labels)(unsafe.Pointer(&l)) -} - -func (m *LabelMatcher) String() string { - return fmt.Sprintf("%s%s%q", m.Name, m.Type, m.Value) -} - -func (m *LabelMatcher) matcher() tsdbLabels.Matcher { - switch m.Type { - case MatchEqual: - return tsdbLabels.NewEqualMatcher(m.Name, m.Value) - - case MatchNotEqual: - return tsdbLabels.Not(tsdbLabels.NewEqualMatcher(m.Name, m.Value)) - - case MatchRegexp: - res, err := tsdbLabels.NewRegexpMatcher(m.Name, m.Value) - if err != nil { - panic(err) - } - return res - - case MatchNotRegexp: - res, err := tsdbLabels.NewRegexpMatcher(m.Name, m.Value) - if err != nil { - panic(err) - } - return tsdbLabels.Not(res) - } - panic("promql.LabelMatcher.matcher: invalid matcher type") -} diff --git a/promql/engine.go b/promql/engine.go index cc2739e3ea..0910fa8380 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -22,10 +22,9 @@ import ( "strings" "time" - "github.com/fabxc/tsdb" - tsdbLabels "github.com/fabxc/tsdb/labels" "github.com/prometheus/common/log" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "golang.org/x/net/context" "github.com/prometheus/prometheus/util/stats" @@ -291,7 +290,7 @@ type Engine struct { // Queryable allows opening a storage querier. type Queryable interface { - Querier(mint, maxt int64) (tsdb.Querier, error) + Querier(mint, maxt int64) (storage.Querier, error) } // NewEngine returns a new engine. @@ -526,7 +525,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return resMatrix, nil } -func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (tsdb.Querier, error) { +func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Querier, error) { var maxOffset time.Duration Inspect(s.Expr, func(node Node) bool { @@ -553,31 +552,22 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (tsdb.Quer Inspect(s.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: - sel := make(tsdbLabels.Selector, 0, len(n.LabelMatchers)) - for _, m := range n.LabelMatchers { - sel = append(sel, m.matcher()) - } - - n.series, err = expandSeriesSet(querier.Select(sel...)) + n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) if err != nil { return false } for _, s := range n.series { - it := tsdb.NewBuffer(s.Iterator(), durationMilliseconds(StalenessDelta)) + it := storage.NewBuffer(s.Iterator(), durationMilliseconds(StalenessDelta)) n.iterators = append(n.iterators, it) } - case *MatrixSelector: - sel := make(tsdbLabels.Selector, 0, len(n.LabelMatchers)) - for _, m := range n.LabelMatchers { - sel = append(sel, m.matcher()) - } - n.series, err = expandSeriesSet(querier.Select(sel...)) + case *MatrixSelector: + n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) if err != nil { return false } for _, s := range n.series { - it := tsdb.NewBuffer(s.Iterator(), durationMilliseconds(n.Range)) + it := storage.NewBuffer(s.Iterator(), durationMilliseconds(n.Range)) n.iterators = append(n.iterators, it) } } @@ -586,7 +576,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (tsdb.Quer return querier, err } -func expandSeriesSet(it tsdb.SeriesSet) (res []tsdb.Series, err error) { +func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) { for it.Next() { res = append(res, it.Series()) } diff --git a/storage/buffer.go b/storage/buffer.go new file mode 100644 index 0000000000..2f3f3c5c5a --- /dev/null +++ b/storage/buffer.go @@ -0,0 +1,190 @@ +package storage + + +// NewBuffer returns a new iterator that buffers the values within the time range +// of the current element and the duration of delta before. +func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { + return &BufferedSeriesIterator{ + it: it, + buf: newSampleRing(delta, 16), + lastTime: math.MinInt64, + } +} + +// PeekBack returns the previous element of the iterator. If there is none buffered, +// ok is false. +func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { + return b.buf.last() +} + +// Buffer returns an iterator over the buffered data. +func (b *BufferedSeriesIterator) Buffer() SeriesIterator { + return b.buf.iterator() +} + +// Seek advances the iterator to the element at time t or greater. +func (b *BufferedSeriesIterator) Seek(t int64) bool { + t0 := t - b.buf.delta + + // If the delta would cause us to seek backwards, preserve the buffer + // and just continue regular advancment while filling the buffer on the way. + if t0 > b.lastTime { + b.buf.reset() + + ok := b.it.Seek(t0) + if !ok { + return false + } + b.lastTime, _ = b.Values() + } + + if b.lastTime >= t { + return true + } + for b.Next() { + if b.lastTime >= t { + return true + } + } + + return false +} + +// Next advances the iterator to the next element. +func (b *BufferedSeriesIterator) Next() bool { + // Add current element to buffer before advancing. + b.buf.add(b.it.Values()) + + ok := b.it.Next() + if ok { + b.lastTime, _ = b.Values() + } + return ok +} + +// Values returns the current element of the iterator. +func (b *BufferedSeriesIterator) Values() (int64, float64) { + return b.it.Values() +} + +// Err returns the last encountered error. +func (b *BufferedSeriesIterator) Err() error { + return b.it.Err() +} + +type sample struct { + t int64 + v float64 +} + +type sampleRing struct { + delta int64 + + buf []sample // lookback buffer + i int // position of most recent element in ring buffer + f int // position of first element in ring buffer + l int // number of elements in buffer +} + +func newSampleRing(delta int64, sz int) *sampleRing { + r := &sampleRing{delta: delta, buf: make([]sample, sz)} + r.reset() + + return r +} + +func (r *sampleRing) reset() { + r.l = 0 + r.i = -1 + r.f = 0 +} + +func (r *sampleRing) iterator() SeriesIterator { + return &sampleRingIterator{r: r, i: -1} +} + +type sampleRingIterator struct { + r *sampleRing + i int +} + +func (it *sampleRingIterator) Next() bool { + it.i++ + return it.i < it.r.l +} + +func (it *sampleRingIterator) Seek(int64) bool { + return false +} + +func (it *sampleRingIterator) Err() error { + return nil +} + +func (it *sampleRingIterator) Values() (int64, float64) { + return it.r.at(it.i) +} + +func (r *sampleRing) at(i int) (int64, float64) { + j := (r.f + i) % len(r.buf) + s := r.buf[j] + return s.t, s.v +} + +// add adds a sample to the ring buffer and frees all samples that fall +// out of the delta range. +func (r *sampleRing) add(t int64, v float64) { + l := len(r.buf) + // Grow the ring buffer if it fits no more elements. + if l == r.l { + buf := make([]sample, 2*l) + copy(buf[l+r.f:], r.buf[r.f:]) + copy(buf, r.buf[:r.f]) + + r.buf = buf + r.i = r.f + r.f += l + } else { + r.i++ + if r.i >= l { + r.i -= l + } + } + + r.buf[r.i] = sample{t: t, v: v} + r.l++ + + // Free head of the buffer of samples that just fell out of the range. + for r.buf[r.f].t < t-r.delta { + r.f++ + if r.f >= l { + r.f -= l + } + r.l-- + } +} + +// last returns the most recent element added to the ring. +func (r *sampleRing) last() (int64, float64, bool) { + if r.l == 0 { + return 0, 0, false + } + s := r.buf[r.i] + return s.t, s.v, true +} + +func (r *sampleRing) samples() []sample { + res := make([]sample, r.l) + + var k = r.f + r.l + var j int + if k > len(r.buf) { + k = len(r.buf) + j = r.l - k + r.f + } + + n := copy(res, r.buf[r.f:k]) + copy(res[n:], r.buf[:j]) + + return res +} diff --git a/storage/buffer_test.go b/storage/buffer_test.go new file mode 100644 index 0000000000..e4e82cb10a --- /dev/null +++ b/storage/buffer_test.go @@ -0,0 +1,123 @@ +package storage + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSampleRing(t *testing.T) { + cases := []struct { + input []int64 + delta int64 + size int + }{ + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 2, + size: 1, + }, + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 2, + size: 2, + }, + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 7, + size: 3, + }, + { + input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}, + delta: 7, + size: 1, + }, + } + for _, c := range cases { + r := newSampleRing(c.delta, c.size) + + input := []sample{} + for _, t := range c.input { + input = append(input, sample{ + t: t, + v: float64(rand.Intn(100)), + }) + } + + for i, s := range input { + r.add(s.t, s.v) + buffered := r.samples() + + for _, sold := range input[:i] { + found := false + for _, bs := range buffered { + if bs.t == sold.t && bs.v == sold.v { + found = true + break + } + } + if sold.t >= s.t-c.delta && !found { + t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered) + } + if sold.t < s.t-c.delta && found { + t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered) + } + } + } + } +} + +func TestBufferedSeriesIterator(t *testing.T) { + var it *BufferedSeriesIterator + + bufferEq := func(exp []sample) { + var b []sample + bit := it.Buffer() + for bit.Next() { + t, v := bit.Values() + b = append(b, sample{t: t, v: v}) + } + require.Equal(t, exp, b, "buffer mismatch") + } + sampleEq := func(ets int64, ev float64) { + ts, v := it.Values() + require.Equal(t, ets, ts, "timestamp mismatch") + require.Equal(t, ev, v, "value mismatch") + } + + it = NewBuffer(newListSeriesIterator([]sample{ + {t: 1, v: 2}, + {t: 2, v: 3}, + {t: 3, v: 4}, + {t: 4, v: 5}, + {t: 5, v: 6}, + {t: 99, v: 8}, + {t: 100, v: 9}, + {t: 101, v: 10}, + }), 2) + + require.True(t, it.Seek(-123), "seek failed") + sampleEq(1, 2) + bufferEq(nil) + + require.True(t, it.Next(), "next failed") + sampleEq(2, 3) + bufferEq([]sample{{t: 1, v: 2}}) + + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + sampleEq(5, 6) + bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) + + require.True(t, it.Seek(5), "seek failed") + sampleEq(5, 6) + bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) + + require.True(t, it.Seek(101), "seek failed") + sampleEq(101, 10) + bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) + + require.False(t, it.Next(), "next succeeded unexpectedly") +} diff --git a/storage/interface.go b/storage/interface.go new file mode 100644 index 0000000000..39b8f67871 --- /dev/null +++ b/storage/interface.go @@ -0,0 +1,93 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "errors" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/util/testutil" +) + +var ( + ErrOutOfOrderSample = errors.New("out of order sample") + ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp") +) + +func NewTestStorage(t testutil.T) Storage { + return nil, nil +} + +// Storage ingests and manages samples, along with various indexes. All methods +// are goroutine-safe. Storage implements storage.SampleAppender. +type Storage interface { + // Querier returns a new Querier on the storage. + Querier(mint, maxt int64) (Querier, error) + + // Appender returns a new appender against the storage. + Appender() (Appender, error) + + // Close closes the storage and all its underlying resources. + Close() error +} + +// Querier provides reading access to time series data. +type Querier interface { + // Select returns a set of series that matches the given label matchers. + Select(...*labels.Matcher) (SeriesSet, error) + + // LabelValues returns all potential values for a label name. + LabelValues(name string) ([]string, error) + + // Close releases the resources of the Querier. + Close() error +} + +// Appender provides batched appends against a storage. +type Appender interface { + // Add adds a sample pair for the referenced series. + Add(lset labels.Labels, t int64, v float64) + + // Commit submits the collected samples and purges the batch. + Commit() error +} + +// SeriesSet contains a set of series. +type SeriesSet interface { + Next() bool + Series() Series + Err() error +} + +// Series represents a single time series. +type Series interface { + // Labels returns the complete set of labels identifying the series. + Labels() labels.Labels + + // Iterator returns a new iterator of the data of the series. + Iterator() SeriesIterator +} + +// SeriesIterator iterates over the data of a time series. +type SeriesIterator interface { + // Seek advances the iterator forward to the value at or after + // the given timestamp. + Seek(t int64) bool + // Values returns the current timestamp/value pair. + Values() (t int64, v float64) + // Next advances the iterator by one. + Next() bool + // Err returns the current error. + Err() error +} diff --git a/storage/local/interface.go b/storage/local/interface.go deleted file mode 100644 index 851f0c062f..0000000000 --- a/storage/local/interface.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2014 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package local - -import ( - "errors" - "time" - - "github.com/prometheus/common/model" - "golang.org/x/net/context" - - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/metric" - "github.com/prometheus/prometheus/util/testutil" -) - -var ( - ErrOutOfOrderSample = errors.New("out of order sample") - ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp") -) - -func NewTestStorage(t testutil.T, enc byte) (Storage, testutil.Closer) { - return nil, nil -} - -// Storage ingests and manages samples, along with various indexes. All methods -// are goroutine-safe. Storage implements storage.SampleAppender. -type Storage interface { - // Querier returns a new Querier on the storage. - Querier() (Querier, error) - - // This SampleAppender needs multiple samples for the same fingerprint to be - // submitted in chronological order, from oldest to newest. When Append has - // returned, the appended sample might not be queryable immediately. (Use - // WaitForIndexing to wait for complete processing.) The implementation might - // remove labels with empty value from the provided Sample as those labels - // are considered equivalent to a label not present at all. - // - // Appending is throttled if the Storage has too many chunks in memory - // already or has too many chunks waiting for persistence. - storage.SampleAppender - - // Drop all time series associated with the given label matchers. Returns - // the number series that were dropped. - DropMetricsForLabelMatchers(context.Context, ...*metric.LabelMatcher) (int, error) - // Run the various maintenance loops in goroutines. Returns when the - // storage is ready to use. Keeps everything running in the background - // until Stop is called. - Start() error - // Stop shuts down the Storage gracefully, flushes all pending - // operations, stops all maintenance loops,and frees all resources. - Stop() error - // WaitForIndexing returns once all samples in the storage are - // indexed. Indexing is needed for FingerprintsForLabelMatchers and - // LabelValuesForLabelName and may lag behind. - WaitForIndexing() -} - -// Querier allows querying a time series storage. -type Querier interface { - // Close closes the querier. Behavior for subsequent calls to Querier methods - // is undefined. - Close() error - // QueryRange returns a list of series iterators for the selected - // time range and label matchers. The iterators need to be closed - // after usage. - QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) - // QueryInstant returns a list of series iterators for the selected - // instant and label matchers. The iterators need to be closed after usage. - QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) - // MetricsForLabelMatchers returns the metrics from storage that satisfy - // the given sets of label matchers. Each set of matchers must contain at - // least one label matcher that does not match the empty string. Otherwise, - // an empty list is returned. Within one set of matchers, the intersection - // of matching series is computed. The final return value will be the union - // of the per-set results. The times from and through are hints for the - // storage to optimize the search. The storage MAY exclude metrics that - // have no samples in the specified interval from the returned map. In - // doubt, specify model.Earliest for from and model.Latest for through. - MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) - // LastSampleForLabelMatchers returns the last samples that have been - // ingested for the time series matching the given set of label matchers. - // The label matching behavior is the same as in MetricsForLabelMatchers. - // All returned samples are between the specified cutoff time and now. - LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) - // Get all of the label values that are associated with a given label name. - LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error) -} - -// SeriesIterator enables efficient access of sample values in a series. Its -// methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of -// a series, i.e. it is safe to continue using a SeriesIterator after or during -// modifying the corresponding series, but the iterator will represent the state -// of the series prior to the modification. -type SeriesIterator interface { - // Gets the value that is closest before the given time. In case a value - // exists at precisely the given time, that value is returned. If no - // applicable value exists, model.ZeroSamplePair is returned. - ValueAtOrBeforeTime(model.Time) model.SamplePair - // Gets all values contained within a given interval. - RangeValues(metric.Interval) []model.SamplePair - // Returns the metric of the series that the iterator corresponds to. - Metric() metric.Metric - // Closes the iterator and releases the underlying data. - Close() -} diff --git a/storage/storage.go b/storage/storage.go deleted file mode 100644 index 5acae673e2..0000000000 --- a/storage/storage.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "github.com/prometheus/common/model" -) - -// SampleAppender is the interface to append samples to both, local and remote -// storage. All methods are goroutine-safe. -type SampleAppender interface { - // Append appends a sample to the underlying storage. Depending on the - // storage implementation, there are different guarantees for the fate - // of the sample after Append has returned. Remote storage - // implementation will simply drop samples if they cannot keep up with - // sending samples. Local storage implementations will only drop metrics - // upon unrecoverable errors. - Append(*model.Sample) error - // NeedsThrottling returns true if the underlying storage wishes to not - // receive any more samples. Append will still work but might lead to - // undue resource usage. It is recommended to call NeedsThrottling once - // before an upcoming batch of Append calls (e.g. a full scrape of a - // target or the evaluation of a rule group) and only proceed with the - // batch if NeedsThrottling returns false. In that way, the result of a - // scrape or of an evaluation of a rule group will always be appended - // completely or not at all, and the work of scraping or evaluation will - // not be performed in vain. Also, a call of NeedsThrottling is - // potentially expensive, so limiting the number of calls is reasonable. - // - // Only SampleAppenders for which it is considered critical to receive - // each and every sample should ever return true. SampleAppenders that - // tolerate not receiving all samples should always return false and - // instead drop samples as they see fit to avoid overload. - NeedsThrottling() bool -} - -// Fanout is a SampleAppender that appends every sample to each SampleAppender -// in its list. -type Fanout []SampleAppender - -// Append implements SampleAppender. It appends the provided sample to all -// SampleAppenders in the Fanout slice and waits for each append to complete -// before proceeding with the next. -// If any of the SampleAppenders returns an error, the first one is returned -// at the end. -func (f Fanout) Append(s *model.Sample) error { - var err error - for _, a := range f { - if e := a.Append(s); e != nil && err == nil { - err = e - } - } - return err -} - -// NeedsThrottling returns true if at least one of the SampleAppenders in the -// Fanout slice is throttled. -func (f Fanout) NeedsThrottling() bool { - for _, a := range f { - if a.NeedsThrottling() { - return true - } - } - return false -} diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go new file mode 100644 index 0000000000..43b98a9f9d --- /dev/null +++ b/storage/tsdb/tsdb.go @@ -0,0 +1,119 @@ +package tsdb + +import ( + "unsafe" + + "github.com/fabxc/tsdb" + tsdbLabels "github.com/fabxc/tsdb/labels" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" +) + +type storage struct { + db *tsdb.DB +} + +// Open returns a new storage backed by a tsdb database. +func Open(path string) (storage.Storage, error) { + db, err := tsdb.Open(path, nil, nil) + if err != nil { + return nil, err + } + return &storage{db: db} +} + +func (db *storage) Querier(mint, maxt int64) (storage.Querier, error) { + q, err := db.db.Querier(mint, maxt) + if err != nil { + return nil, err + } + return querier{q: q}, nil +} + +// Appender returns a new appender against the storage. +func (db *storage) Appender() (Appender, error) { + a, err := db.db.Appender() + if err != nil { + return nil, err + } + return appender{a: a}, nil +} + +// Close closes the storage and all its underlying resources. +func (db *storage) Close() error { + return db.Close() +} + +type querier struct { + q tsdb.Querier +} + +func (q *querier) Select(oms ...*promql.LabelMatcher) (storage.SeriesSet, error) { + ms := make([]tsdbLabels.Matcher, 0, len(oms)) + + for _, om := range oms { + ms = append(ms, convertMatcher(om)) + } + + set := q.q.Select(ms...) + + return seriesSet{set: set} +} + +func (q *querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) } +func (q *querier) Close() error { return q.q.Close() } + +type seriesSet struct { + set tsdb.SeriesSet +} + +func (s *seriesSet) Next() bool { return s.set.Next() } +func (s *seriesSet) Err() error { return s.set.Err() } +func (s *seriesSet) Series() storage.Series { return series{s: s.set.Series()} } + +type series struct { + s tsdb.Series +} + +func (s *series) Labels() labels.Labels { return toLabels(s.s.Labels()) } +func (s *series) Iterator() storage.SeriesIterator { return storage.SeriesIterator(s.s.Iterator()) } + +type appender struct { + a tsdb.Appender +} + +func (a *appender) Add(lset labels.Labels, t int64, v float64) { a.Add(toTSDBLabels(lset), t, v) } +func (a *appender) Commit() error { a.a.Commit() } + +func convertMatcher(m *promql.LabelMatcher) tsdbLabels.Matcher { + switch m.Type { + case MatchEqual: + return tsdbLabels.NewEqualMatcher(m.Name, m.Value) + + case MatchNotEqual: + return tsdbLabels.Not(tsdbLabels.NewEqualMatcher(m.Name, m.Value)) + + case MatchRegexp: + res, err := tsdbLabels.NewRegexpMatcher(m.Name, m.Value) + if err != nil { + panic(err) + } + return res + + case MatchNotRegexp: + res, err := tsdbLabels.NewRegexpMatcher(m.Name, m.Value) + if err != nil { + panic(err) + } + return tsdbLabels.Not(res) + } + panic("promql.LabelMatcher.matcher: invalid matcher type") +} + +func toTSDBLabels(l labels.Labels) tsdbLabels.Labels { + return *(*tsdbLabels.Labels)(unsafe.Pointer(&l)) +} + +func toLabels(l tsdbLabels.Labels) labels.Labels { + return *(*labels.Labels)(unsafe.Pointer(&l)) +}