mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Track appended samples properly in metric
This commit is contained in:
parent
012cf4ef25
commit
f1435f2e2c
25
db.go
25
db.go
|
@ -29,8 +29,8 @@ import (
|
||||||
// millisecond precision timestampdb.
|
// millisecond precision timestampdb.
|
||||||
var DefaultOptions = &Options{
|
var DefaultOptions = &Options{
|
||||||
WALFlushInterval: 5 * time.Second,
|
WALFlushInterval: 5 * time.Second,
|
||||||
MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds
|
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||||
MaxBlockDuration: 48 * 60 * 60 * 1000, // 2 days in milliseconds
|
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||||
AppendableBlocks: 2,
|
AppendableBlocks: 2,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,8 +131,8 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var r prometheus.Registerer
|
// var r prometheus.Registerer
|
||||||
// r := prometheus.DefaultRegisterer
|
r := prometheus.DefaultRegisterer
|
||||||
|
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = DefaultOptions
|
opts = DefaultOptions
|
||||||
|
@ -354,8 +354,9 @@ func (db *DB) Appender() Appender {
|
||||||
}
|
}
|
||||||
|
|
||||||
type dbAppender struct {
|
type dbAppender struct {
|
||||||
db *DB
|
db *DB
|
||||||
heads []*headAppender
|
heads []*headAppender
|
||||||
|
samples int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||||
|
@ -367,6 +368,7 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
a.samples++
|
||||||
return ref | (uint64(h.generation) << 40), nil
|
return ref | (uint64(h.generation) << 40), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,6 +381,7 @@ func (a *dbAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
a.samples++
|
||||||
return ref | (uint64(h.generation) << 40), nil
|
return ref | (uint64(h.generation) << 40), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -396,7 +399,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
if h.generation != gen {
|
if h.generation != gen {
|
||||||
return ErrNotFound
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
return h.AddFast(ref, t, v)
|
if err := h.AddFast(ref, t, v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.samples++
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// appenderFor gets the appender for the head containing timestamp t.
|
// appenderFor gets the appender for the head containing timestamp t.
|
||||||
|
@ -469,6 +477,9 @@ func (a *dbAppender) Commit() error {
|
||||||
}
|
}
|
||||||
a.db.mtx.RUnlock()
|
a.db.mtx.RUnlock()
|
||||||
|
|
||||||
|
if merr.Err() == nil {
|
||||||
|
a.db.metrics.samplesAppended.Add(float64(a.samples))
|
||||||
|
}
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
1
head.go
1
head.go
|
@ -43,6 +43,7 @@ type headBlock struct {
|
||||||
|
|
||||||
activeWriters uint64
|
activeWriters uint64
|
||||||
|
|
||||||
|
symbols map[string]struct{}
|
||||||
// descs holds all chunk descs for the head block. Each chunk implicitly
|
// descs holds all chunk descs for the head block. Each chunk implicitly
|
||||||
// is assigned the index as its ID.
|
// is assigned the index as its ID.
|
||||||
series []*memSeries
|
series []*memSeries
|
||||||
|
|
27
head_test.go
27
head_test.go
|
@ -1,6 +1,8 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -33,3 +35,28 @@ func TestPositionMapper(t *testing.T) {
|
||||||
require.Equal(t, c.res, m.fw)
|
require.Equal(t, c.res, m.fw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkCreateSeries(b *testing.B) {
|
||||||
|
f, err := os.Open("cmd/tsdb/testdata.1m")
|
||||||
|
require.NoError(b, err)
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
lbls, err := readPrometheusLabels(f, 1e6)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
b.Run("", func(b *testing.B) {
|
||||||
|
dir, err := ioutil.TempDir("", "create_series_bench")
|
||||||
|
require.NoError(b, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
h, err := createHeadBlock(dir, 0, nil, 0, 1)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for _, l := range lbls[:b.N] {
|
||||||
|
h.create(l.Hash(), l)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue