retrieval/storage: adapt to new interface

This simplifies the interface to two add methods for
appends with labels or faster reference numbers.
This commit is contained in:
Fabian Reinartz 2017-02-01 15:59:37 +01:00
parent 1d3cdd0d67
commit 5772f1a7ba
8 changed files with 99 additions and 66 deletions

View file

@ -20,7 +20,6 @@ import (
_ "net/http/pprof" // Comment this line to disable pprof endpoint. _ "net/http/pprof" // Comment this line to disable pprof endpoint.
"os" "os"
"os/signal" "os/signal"
"runtime/trace"
"syscall" "syscall"
"time" "time"
@ -61,18 +60,6 @@ func init() {
// Main manages the stup and shutdown lifecycle of the entire Prometheus server. // Main manages the stup and shutdown lifecycle of the entire Prometheus server.
func Main() int { func Main() int {
go func() {
f, err := os.Create("trace")
if err != nil {
panic(err)
}
if err := trace.Start(f); err != nil {
panic(err)
}
time.Sleep(30 * time.Second)
trace.Stop()
f.Close()
}()
if err := parse(os.Args[1:]); err != nil { if err := parse(os.Args[1:]); err != nil {
log.Error(err) log.Error(err)
return 2 return 2
@ -91,7 +78,11 @@ func Main() int {
reloadables []Reloadable reloadables []Reloadable
) )
localStorage, err := tsdb.Open(cfg.localStoragePath) localStorage, err := tsdb.Open(cfg.localStoragePath, &tsdb.Options{
MinBlockDuration: 2 * 60 * 60 * 1000,
MaxBlockDuration: 24 * 60 * 60 * 1000,
AppendableBlocks: 2,
})
if err != nil { if err != nil {
log.Errorf("Opening storage failed: %s", err) log.Errorf("Opening storage failed: %s", err)
return 1 return 1

View file

@ -285,11 +285,7 @@ func (cmd *loadCmd) append(a storage.Appender) error {
m := cmd.metrics[h] m := cmd.metrics[h]
for _, s := range smpls { for _, s := range smpls {
ref, err := a.SetSeries(m) if _, err := a.Add(m, s.T, s.V); err != nil {
if err != nil {
return err
}
if err := a.Add(ref, s.T, s.V); err != nil {
return err return err
} }
} }

View file

@ -514,6 +514,7 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro
defTime = timestamp.FromTime(ts) defTime = timestamp.FromTime(ts)
) )
loop:
for p.Next() { for p.Next() {
total++ total++
@ -526,33 +527,33 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro
mets := string(met) mets := string(met)
ref, ok := sl.cache[mets] ref, ok := sl.cache[mets]
if ok { if ok {
if err = app.Add(ref, t, v); err == nil { switch err = app.AddFast(ref, t, v); err {
added++ case nil:
continue case storage.ErrNotFound:
} else if err != storage.ErrNotFound {
break
}
ok = false ok = false
case errSeriesDropped:
continue
default:
break loop
}
} }
if !ok { if !ok {
var lset labels.Labels var lset labels.Labels
p.Metric(&lset) p.Metric(&lset)
ref, err = app.SetSeries(lset) ref, err = app.Add(lset, t, v)
// TODO(fabxc): also add a dropped-cache? // TODO(fabxc): also add a dropped-cache?
if err == errSeriesDropped { switch err {
case nil:
case errSeriesDropped:
continue continue
} default:
if err != nil { break loop
break
}
if err = app.Add(ref, t, v); err != nil {
break
}
added++
} }
sl.cache[mets] = ref sl.cache[mets] = ref
} }
added++
}
if err == nil { if err == nil {
err = p.Err() err = p.Err()
} }
@ -601,20 +602,19 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
ref, ok := sl.cache[s] ref, ok := sl.cache[s]
if ok { if ok {
if err := app.Add(ref, t, v); err != storage.ErrNotFound { if err := app.AddFast(ref, t, v); err == nil {
return nil
} else if err != storage.ErrNotFound {
return err return err
} }
} }
met := labels.Labels{ met := labels.Labels{
labels.Label{Name: labels.MetricName, Value: s}, labels.Label{Name: labels.MetricName, Value: s},
} }
ref, err := app.SetSeries(met) ref, err := app.Add(met, t, v)
if err != nil { if err != nil {
return err return err
} }
if err = app.Add(ref, t, v); err != nil {
return err
}
sl.cache[s] = ref sl.cache[s] = ref
return nil return nil

View file

@ -235,12 +235,24 @@ type limitAppender struct {
i int i int
} }
func (app *limitAppender) Add(ref uint64, t int64, v float64) error { func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if app.i+1 > app.limit {
return 0, errors.New("sample limit exceeded")
}
ref, err := app.Appender.Add(lset, t, v)
if err != nil {
return 0, fmt.Errorf("sample limit of %d exceeded", app.limit)
}
app.i++
return ref, nil
}
func (app *limitAppender) AddFast(ref uint64, t int64, v float64) error {
if app.i+1 > app.limit { if app.i+1 > app.limit {
return errors.New("sample limit exceeded") return errors.New("sample limit exceeded")
} }
if err := app.Appender.Add(ref, t, v); err != nil { if err := app.Appender.AddFast(ref, t, v); err != nil {
return fmt.Errorf("sample limit of %d exceeded", app.limit) return fmt.Errorf("sample limit of %d exceeded", app.limit)
} }
app.i++ app.i++
@ -254,7 +266,7 @@ type ruleLabelsAppender struct {
labels labels.Labels labels labels.Labels
} }
func (app ruleLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) { func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lb := labels.NewBuilder(lset) lb := labels.NewBuilder(lset)
for _, l := range app.labels { for _, l := range app.labels {
@ -265,7 +277,7 @@ func (app ruleLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) {
lb.Set(l.Name, l.Value) lb.Set(l.Name, l.Value)
} }
return app.Appender.SetSeries(lb.Labels()) return app.Appender.Add(lb.Labels(), t, v)
} }
type honorLabelsAppender struct { type honorLabelsAppender struct {
@ -276,7 +288,7 @@ type honorLabelsAppender struct {
// Merges the sample's metric with the given labels if the label is not // Merges the sample's metric with the given labels if the label is not
// already present in the metric. // already present in the metric.
// This also considers labels explicitly set to the empty string. // This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) { func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lb := labels.NewBuilder(lset) lb := labels.NewBuilder(lset)
for _, l := range app.labels { for _, l := range app.labels {
@ -284,7 +296,7 @@ func (app honorLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) {
lb.Set(l.Name, l.Value) lb.Set(l.Name, l.Value)
} }
} }
return app.Appender.SetSeries(lb.Labels()) return app.Appender.Add(lb.Labels(), t, v)
} }
// Applies a set of relabel configurations to the sample's metric // Applies a set of relabel configurations to the sample's metric
@ -296,12 +308,12 @@ type relabelAppender struct {
var errSeriesDropped = errors.New("series dropped") var errSeriesDropped = errors.New("series dropped")
func (app relabelAppender) SetSeries(lset labels.Labels) (uint64, error) { func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lset = relabel.Process(lset, app.relabelings...) lset = relabel.Process(lset, app.relabelings...)
if lset == nil { if lset == nil {
return 0, errSeriesDropped return 0, errSeriesDropped
} }
return app.Appender.SetSeries(lset) return app.Appender.Add(lset, t, v)
} }
// populateLabels builds a label set from the given label set and scrape configuration. // populateLabels builds a label set from the given label set and scrape configuration.

View file

@ -282,21 +282,16 @@ func (g *Group) Eval() {
} }
for _, s := range vector { for _, s := range vector {
ref, err := app.SetSeries(s.Metric) if _, err := app.Add(s.Metric, s.T, s.V); err != nil {
if err != nil {
log.With("sample", s).With("error", err).Warn("Setting metric failed")
continue
}
if err := app.Add(ref, s.T, s.V); err != nil {
switch err { switch err {
case storage.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
numOutOfOrder++ numOutOfOrder++
log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded") log.With("sample", s).With("err", err).Debug("Rule evaluation result discarded")
case storage.ErrDuplicateSampleForTimestamp: case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++ numDuplicates++
log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded") log.With("sample", s).With("err", err).Debug("Rule evaluation result discarded")
default: default:
log.With("sample", s).With("error", err).Warn("Rule evaluation result discarded") log.With("sample", s).With("err", err).Warn("Rule evaluation result discarded")
} }
} }
} }

View file

@ -52,10 +52,9 @@ type Querier interface {
// Appender provides batched appends against a storage. // Appender provides batched appends against a storage.
type Appender interface { type Appender interface {
SetSeries(labels.Labels) (uint64, error) Add(l labels.Labels, t int64, v float64) (uint64, error)
// Add adds a sample pair for the referenced series. AddFast(ref uint64, t int64, v float64) error
Add(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch. // Commit submits the collected samples and purges the batch.
Commit() error Commit() error

View file

@ -1,6 +1,7 @@
package tsdb package tsdb
import ( import (
"time"
"unsafe" "unsafe"
"github.com/fabxc/tsdb" "github.com/fabxc/tsdb"
@ -14,9 +15,34 @@ type adapter struct {
db *tsdb.PartitionedDB db *tsdb.PartitionedDB
} }
// Options of the DB storage.
type Options struct {
// The interval at which the write ahead log is flushed to disc.
WALFlushInterval time.Duration
// The timestamp range of head blocks after which they get persisted.
// It's the minimum duration of any persisted block.
MinBlockDuration uint64
// The maximum timestamp range of compacted blocks.
MaxBlockDuration uint64
// Number of head blocks that can be appended to.
// Should be two or higher to prevent write errors in general scenarios.
//
// After a new block is started for timestamp t0 or higher, appends with
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
AppendableBlocks int
}
// Open returns a new storage backed by a tsdb database. // Open returns a new storage backed by a tsdb database.
func Open(path string) (storage.Storage, error) { func Open(path string, opts *Options) (storage.Storage, error) {
db, err := tsdb.OpenPartitioned(path, 1, nil, nil) db, err := tsdb.OpenPartitioned(path, 1, nil, &tsdb.Options{
WALFlushInterval: 10 * time.Second,
MinBlockDuration: opts.MinBlockDuration,
MaxBlockDuration: opts.MaxBlockDuration,
AppendableBlocks: opts.AppendableBlocks,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -73,12 +99,22 @@ type appender struct {
a tsdb.Appender a tsdb.Appender
} }
func (a appender) SetSeries(lset labels.Labels) (uint64, error) { func (a appender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
return a.a.SetSeries(toTSDBLabels(lset)) ref, err := a.a.Add(toTSDBLabels(lset), t, v)
switch err {
case tsdb.ErrNotFound:
return 0, storage.ErrNotFound
case tsdb.ErrOutOfOrderSample:
return 0, storage.ErrOutOfOrderSample
case tsdb.ErrAmendSample:
return 0, storage.ErrDuplicateSampleForTimestamp
}
return ref, err
} }
func (a appender) Add(ref uint64, t int64, v float64) error { func (a appender) AddFast(ref uint64, t int64, v float64) error {
err := a.a.Add(ref, t, v) err := a.a.AddFast(ref, t, v)
switch err { switch err {
case tsdb.ErrNotFound: case tsdb.ErrNotFound:

View file

@ -19,7 +19,11 @@ func NewStorage(t T) storage.Storage {
log.With("dir", dir).Debugln("opening test storage") log.With("dir", dir).Debugln("opening test storage")
db, err := tsdb.Open(dir) db, err := tsdb.Open(dir, &tsdb.Options{
MinBlockDuration: 2 * 60 * 60 * 1000,
MaxBlockDuration: 24 * 60 * 60 * 1000,
AppendableBlocks: 10,
})
if err != nil { if err != nil {
t.Fatalf("Opening test storage failed: %s", err) t.Fatalf("Opening test storage failed: %s", err)
} }