Add own Appender() method for DB

This commit is contained in:
Fabian Reinartz 2017-01-09 20:04:16 +01:00
parent 4c4e0c614e
commit 29883a18fc
2 changed files with 53 additions and 27 deletions

70
db.go
View file

@ -323,6 +323,35 @@ func (db *DB) Close() error {
return merr.Err() return merr.Err()
} }
func (db *DB) Appender() Appender {
return &dbAppender{db: db}
}
type dbAppender struct {
db *DB
buf []hashedSample
}
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) error {
return a.add(hashedSample{
hash: lset.Hash(),
labels: lset,
t: t,
v: v,
})
}
func (a *dbAppender) add(s hashedSample) error {
a.buf = append(a.buf, s)
return nil
}
func (a *dbAppender) Commit() error {
err := a.db.appendBatch(a.buf)
a.buf = a.buf[:0]
return err
}
func (db *DB) appendBatch(samples []hashedSample) error { func (db *DB) appendBatch(samples []hashedSample) error {
if len(samples) == 0 { if len(samples) == 0 {
return nil return nil
@ -335,9 +364,9 @@ func (db *DB) appendBatch(samples []hashedSample) error {
// TODO(fabxc): distinguish samples between concurrent heads for // TODO(fabxc): distinguish samples between concurrent heads for
// different time blocks. Those may occurr during transition to still // different time blocks. Those may occurr during transition to still
// allow late samples to arrive for a previous block. // allow late samples to arrive for a previous block.
err := head.appendBatch(samples) n, err := head.appendBatch(samples)
if err == nil { if err == nil {
db.metrics.samplesAppended.Add(float64(len(samples))) db.metrics.samplesAppended.Add(float64(n))
} }
if head.fullness() > 1.0 { if head.fullness() > 1.0 {
@ -583,45 +612,42 @@ func (db *PartitionedDB) Close() error {
// Appender returns a new appender against the database. // Appender returns a new appender against the database.
func (db *PartitionedDB) Appender() Appender { func (db *PartitionedDB) Appender() Appender {
return &partitionedAppender{ app := &partitionedAppender{db: db}
db: db,
buckets: make([][]hashedSample, len(db.Partitions)), for _, p := range db.Partitions {
app.buckets = append(app.buckets, p.Appender().(*dbAppender))
} }
return app
} }
type partitionedAppender struct { type partitionedAppender struct {
db *PartitionedDB db *PartitionedDB
buckets [][]hashedSample buckets []*dbAppender
} }
func (ba *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error { func (ba *partitionedAppender) SetSeries(lset labels.Labels) (uint32, error) {
h := lset.Hash()
s := h >> (64 - ba.db.partitionPow)
ba.buckets[s] = append(ba.buckets[s], hashedSample{ return 0, nil
}
func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error {
h := lset.Hash()
s := h >> (64 - a.db.partitionPow)
return a.buckets[s].add(hashedSample{
hash: h, hash: h,
labels: lset, labels: lset,
t: t, t: t,
v: v, v: v,
}) })
return nil
}
func (ba *partitionedAppender) reset() {
for i := range ba.buckets {
ba.buckets[i] = ba.buckets[i][:0]
}
} }
func (ba *partitionedAppender) Commit() error { func (ba *partitionedAppender) Commit() error {
defer ba.reset()
var merr MultiError var merr MultiError
// Spill buckets into partitiondb. // Spill buckets into partitiondb.
for s, b := range ba.buckets { for _, b := range ba.buckets {
merr.Add(ba.db.Partitions[s].appendBatch(b)) merr.Add(b.Commit())
} }
return merr.Err() return merr.Err()
} }

10
head.go
View file

@ -284,7 +284,7 @@ var (
ErrAmendSample = errors.New("amending sample") ErrAmendSample = errors.New("amending sample")
) )
func (h *HeadBlock) appendBatch(samples []hashedSample) error { func (h *HeadBlock) appendBatch(samples []hashedSample) (int, error) {
// Find head chunks for all samples and allocate new IDs/refs for // Find head chunks for all samples and allocate new IDs/refs for
// ones we haven't seen before. // ones we haven't seen before.
var ( var (
@ -303,10 +303,10 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
if cd != nil { if cd != nil {
// Samples must only occur in order. // Samples must only occur in order.
if s.t < cd.lastTimestamp { if s.t < cd.lastTimestamp {
return ErrOutOfOrderSample return 0, ErrOutOfOrderSample
} }
if cd.lastTimestamp == s.t && cd.lastValue != s.v { if cd.lastTimestamp == s.t && cd.lastValue != s.v {
return ErrAmendSample return 0, ErrAmendSample
} }
// TODO(fabxc): sample refs are only scoped within a block for // TODO(fabxc): sample refs are only scoped within a block for
// now and we ignore any previously set value // now and we ignore any previously set value
@ -332,7 +332,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
// Write all new series and samples to the WAL and add it to the // Write all new series and samples to the WAL and add it to the
// in-mem database on success. // in-mem database on success.
if err := h.wal.Log(newSeries, samples); err != nil { if err := h.wal.Log(newSeries, samples); err != nil {
return err return 0, err
} }
// After the samples were successfully written to the WAL, there may // After the samples were successfully written to the WAL, there may
@ -392,7 +392,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
h.bstats.MaxTime = maxt h.bstats.MaxTime = maxt
} }
return nil return int(total), nil
} }
func (h *HeadBlock) fullness() float64 { func (h *HeadBlock) fullness() float64 {