storage: rename iterator value getters to At()

This commit is contained in:
Fabian Reinartz 2017-01-02 13:33:37 +01:00
parent 89b6b3be9f
commit bc20d93f0a
7 changed files with 16 additions and 15 deletions

View file

@ -431,7 +431,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) { func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) {
for it.Next() { for it.Next() {
res = append(res, it.Series()) res = append(res, it.At())
} }
return res, it.Err() return res, it.Err()
} }
@ -676,7 +676,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
buf := it.Buffer() buf := it.Buffer()
for buf.Next() { for buf.Next() {
t, v := buf.Values() t, v := buf.At()
// Values in the buffer are guaranteed to be smaller than maxt. // Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint { if t >= mint {
ss.Points = append(ss.Points, Point{T: t, V: v}) ss.Points = append(ss.Points, Point{T: t, V: v})

View file

@ -62,7 +62,7 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool {
// Next advances the iterator to the next element. // Next advances the iterator to the next element.
func (b *BufferedSeriesIterator) Next() bool { func (b *BufferedSeriesIterator) Next() bool {
// Add current element to buffer before advancing. // Add current element to buffer before advancing.
b.buf.add(b.it.Values()) b.buf.add(b.it.At())
ok := b.it.Next() ok := b.it.Next()
if ok { if ok {
@ -73,7 +73,7 @@ func (b *BufferedSeriesIterator) Next() bool {
// Values returns the current element of the iterator. // Values returns the current element of the iterator.
func (b *BufferedSeriesIterator) Values() (int64, float64) { func (b *BufferedSeriesIterator) Values() (int64, float64) {
return b.it.Values() return b.it.At()
} }
// Err returns the last encountered error. // Err returns the last encountered error.
@ -130,7 +130,7 @@ func (it *sampleRingIterator) Err() error {
return nil return nil
} }
func (it *sampleRingIterator) Values() (int64, float64) { func (it *sampleRingIterator) At() (int64, float64) {
return it.r.at(it.i) return it.r.at(it.i)
} }

View file

@ -77,7 +77,7 @@ func TestBufferedSeriesIterator(t *testing.T) {
var b []sample var b []sample
bit := it.Buffer() bit := it.Buffer()
for bit.Next() { for bit.Next() {
t, v := bit.Values() t, v := bit.At()
b = append(b, sample{t: t, v: v}) b = append(b, sample{t: t, v: v})
} }
require.Equal(t, exp, b, "buffer mismatch") require.Equal(t, exp, b, "buffer mismatch")
@ -153,7 +153,7 @@ func newListSeriesIterator(list []sample) *listSeriesIterator {
return &listSeriesIterator{list: list, idx: -1} return &listSeriesIterator{list: list, idx: -1}
} }
func (it *listSeriesIterator) Values() (int64, float64) { func (it *listSeriesIterator) At() (int64, float64) {
s := it.list[it.idx] s := it.list[it.idx]
return s.t, s.v return s.t, s.v
} }

View file

@ -61,7 +61,7 @@ type Appender interface {
// SeriesSet contains a set of series. // SeriesSet contains a set of series.
type SeriesSet interface { type SeriesSet interface {
Next() bool Next() bool
Series() Series At() Series
Err() error Err() error
} }
@ -79,8 +79,8 @@ type SeriesIterator interface {
// Seek advances the iterator forward to the value at or after // Seek advances the iterator forward to the value at or after
// the given timestamp. // the given timestamp.
Seek(t int64) bool Seek(t int64) bool
// Values returns the current timestamp/value pair. // At returns the current timestamp/value pair.
Values() (t int64, v float64) At() (t int64, v float64)
// Next advances the iterator by one. // Next advances the iterator by one.
Next() bool Next() bool
// Err returns the current error. // Err returns the current error.

View file

@ -58,9 +58,9 @@ type seriesSet struct {
set tsdb.SeriesSet set tsdb.SeriesSet
} }
func (s seriesSet) Next() bool { return s.set.Next() } func (s seriesSet) Next() bool { return s.set.Next() }
func (s seriesSet) Err() error { return s.set.Err() } func (s seriesSet) Err() error { return s.set.Err() }
func (s seriesSet) Series() storage.Series { return series{s: s.set.Series()} } func (s seriesSet) At() storage.Series { return series{s: s.set.At()} }
type series struct { type series struct {
s tsdb.Series s tsdb.Series
@ -76,6 +76,7 @@ type appender struct {
func (a appender) Add(lset labels.Labels, t int64, v float64) error { func (a appender) Add(lset labels.Labels, t int64, v float64) error {
return a.a.Add(toTSDBLabels(lset), t, v) return a.a.Add(toTSDBLabels(lset), t, v)
} }
func (a appender) Commit() error { return a.a.Commit() } func (a appender) Commit() error { return a.a.Commit() }
func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher {

View file

@ -309,7 +309,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
for _, mset := range matcherSets { for _, mset := range matcherSets {
series := q.Select(mset...) series := q.Select(mset...)
for series.Next() { for series.Next() {
metrics = append(metrics, series.Series().Labels()) metrics = append(metrics, series.At().Labels())
} }
if series.Err() != nil { if series.Err() != nil {
return nil, &apiError{errorExec, series.Err()} return nil, &apiError{errorExec, series.Err()}

View file

@ -77,7 +77,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
for _, mset := range matcherSets { for _, mset := range matcherSets {
series := q.Select(mset...) series := q.Select(mset...)
for series.Next() { for series.Next() {
s := series.Series() s := series.At()
// TODO(fabxc): allow fast path for most recent sample either // TODO(fabxc): allow fast path for most recent sample either
// in the storage itself or caching layer in Prometheus. // in the storage itself or caching layer in Prometheus.
it := storage.NewBuffer(s.Iterator(), int64(promql.StalenessDelta/1e6)) it := storage.NewBuffer(s.Iterator(), int64(promql.StalenessDelta/1e6))