Add basic shard metrics

This commit is contained in:
Fabian Reinartz 2016-12-31 09:48:49 +01:00
parent a009247ab7
commit 7280533c42
4 changed files with 64 additions and 24 deletions

58
db.go
View file

@ -17,6 +17,7 @@ import (
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
) )
// DefaultOptions used for the DB. They are sane for setups using // DefaultOptions used for the DB. They are sane for setups using
@ -43,7 +44,7 @@ type DB struct {
// TODO(fabxc): make configurable // TODO(fabxc): make configurable
const ( const (
shardShift = 0 shardShift = 2
numShards = 1 << shardShift numShards = 1 << shardShift
maxChunkSize = 1024 maxChunkSize = 1024
) )
@ -74,7 +75,7 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
l := log.NewContext(l).With("shard", i) l := log.NewContext(l).With("shard", i)
d := shardDir(path, i) d := shardDir(path, i)
s, err := OpenShard(d, l) s, err := OpenShard(d, i, l)
if err != nil { if err != nil {
return nil, fmt.Errorf("initializing shard %q failed: %s", d, err) return nil, fmt.Errorf("initializing shard %q failed: %s", d, err)
} }
@ -181,14 +182,55 @@ type Shard struct {
path string path string
persistCh chan struct{} persistCh chan struct{}
logger log.Logger logger log.Logger
metrics *shardMetrics
mtx sync.RWMutex mtx sync.RWMutex
persisted persistedBlocks persisted persistedBlocks
head *HeadBlock head *HeadBlock
} }
type shardMetrics struct {
persistences prometheus.Counter
persistenceDuration prometheus.Histogram
samplesAppended prometheus.Counter
}
func newShardMetrics(r prometheus.Registerer, i int) *shardMetrics {
shardLabel := prometheus.Labels{
"shard": fmt.Sprintf("%d", i),
}
m := &shardMetrics{
persistences: prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_persistences_total",
Help: "Total number of head persistances that ran so far.",
ConstLabels: shardLabel,
}),
persistenceDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_shard_persistence_duration_seconds",
Help: "Duration of persistences in seconds.",
ConstLabels: shardLabel,
Buckets: prometheus.ExponentialBuckets(0.25, 2, 5),
}),
samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_samples_appended_total",
Help: "Total number of appended samples for the shard.",
ConstLabels: shardLabel,
}),
}
if r != nil {
r.MustRegister(
m.persistences,
m.persistenceDuration,
m.samplesAppended,
)
}
return m
}
// OpenShard returns a new Shard. // OpenShard returns a new Shard.
func OpenShard(path string, logger log.Logger) (*Shard, error) { func OpenShard(path string, i int, logger log.Logger) (*Shard, error) {
// Create directory if shard is new. // Create directory if shard is new.
if _, err := os.Stat(path); os.IsNotExist(err) { if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil { if err := os.MkdirAll(path, 0777); err != nil {
@ -219,9 +261,9 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) {
path: path, path: path,
persistCh: make(chan struct{}, 1), persistCh: make(chan struct{}, 1),
logger: logger, logger: logger,
metrics: newShardMetrics(prometheus.DefaultRegisterer, i),
head: head, head: head,
persisted: pbs, persisted: pbs,
// TODO(fabxc): restore from checkpoint.
} }
return s, nil return s, nil
} }
@ -248,16 +290,20 @@ func (s *Shard) appendBatch(samples []hashedSample) error {
// TODO(fabxc): distinguish samples between concurrent heads for // TODO(fabxc): distinguish samples between concurrent heads for
// different time blocks. Those may occurr during transition to still // different time blocks. Those may occurr during transition to still
// allow late samples to arrive for a previous block. // allow late samples to arrive for a previous block.
err := s.head.appendBatch(samples) err := s.head.appendBatch(samples, s.metrics.samplesAppended)
// TODO(fabxc): randomize over time and use better scoring function. // TODO(fabxc): randomize over time and use better scoring function.
if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 24000 { if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 {
select { select {
case s.persistCh <- struct{}{}: case s.persistCh <- struct{}{}:
go func() { go func() {
start := time.Now()
defer func() { s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) }()
if err := s.persist(); err != nil { if err := s.persist(); err != nil {
s.logger.Log("msg", "persistance error", "err", err) s.logger.Log("msg", "persistance error", "err", err)
} }
s.metrics.persistences.Inc()
}() }()
default: default:
} }

View file

@ -8,6 +8,7 @@ import (
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/prometheus/client_golang/prometheus"
) )
// HeadBlock handles reads and writes of time series data within a time window. // HeadBlock handles reads and writes of time series data within a time window.
@ -182,7 +183,7 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
return cd return cd
} }
func (h *HeadBlock) appendBatch(samples []hashedSample) error { func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Counter) error {
// Find head chunks for all samples and allocate new IDs/refs for // Find head chunks for all samples and allocate new IDs/refs for
// ones we haven't seen before. // ones we haven't seen before.
var ( var (
@ -233,6 +234,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
continue continue
} }
appended.Inc()
h.stats.SampleCount++ h.stats.SampleCount++
if s.t > h.stats.MaxTime { if s.t > h.stats.MaxTime {

7
wal.go
View file

@ -188,14 +188,7 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error {
e.buf = append(e.buf, b[:n]...) e.buf = append(e.buf, b[:n]...)
for _, l := range lset { for _, l := range lset {
// func() {
// defer func() {
// if recover() != nil {
// fmt.Println(l)
// }
// }()
n = binary.PutUvarint(b, uint64(len(l.Name))) n = binary.PutUvarint(b, uint64(len(l.Name)))
// }()
e.buf = append(e.buf, b[:n]...) e.buf = append(e.buf, b[:n]...)
e.buf = append(e.buf, l.Name...) e.buf = append(e.buf, l.Name...)

View file

@ -1,7 +1,6 @@
package tsdb package tsdb
import ( import (
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -28,7 +27,7 @@ func BenchmarkWALWrite(b *testing.B) {
f, err := os.Open("cmd/tsdb/testdata.1m") f, err := os.Open("cmd/tsdb/testdata.1m")
require.NoError(b, err) require.NoError(b, err)
series, err := readPrometheusLabels(f, b.N) series, err := readPrometheusLabels(f, b.N/300)
require.NoError(b, err) require.NoError(b, err)
var ( var (
@ -70,7 +69,7 @@ func BenchmarkWALRead(b *testing.B) {
require.NoError(b, err) require.NoError(b, err)
b.Run("test", func(b *testing.B) { b.Run("test", func(b *testing.B) {
bseries := series[:b.N] bseries := series[:b.N/300]
d, err := ioutil.TempDir("", "wal_read_test") d, err := ioutil.TempDir("", "wal_read_test")
require.NoError(b, err) require.NoError(b, err)
@ -123,8 +122,8 @@ func BenchmarkWALRead(b *testing.B) {
}) })
require.NoError(b, err) require.NoError(b, err)
stat, _ := wal.f.Stat() // stat, _ := wal.f.Stat()
fmt.Println("read series", numSeries, "read samples", numSamples, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) // fmt.Println("read series", numSeries, "read samples", numSamples, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024))
}) })
} }
@ -136,7 +135,7 @@ func BenchmarkWALReadIntoHead(b *testing.B) {
require.NoError(b, err) require.NoError(b, err)
b.Run("test", func(b *testing.B) { b.Run("test", func(b *testing.B) {
bseries := series[:b.N] bseries := series[:b.N/300]
d, err := ioutil.TempDir("", "wal_read_test") d, err := ioutil.TempDir("", "wal_read_test")
require.NoError(b, err) require.NoError(b, err)
@ -178,12 +177,12 @@ func BenchmarkWALReadIntoHead(b *testing.B) {
b.ResetTimer() b.ResetTimer()
head, err := OpenHeadBlock(d, 0) _, err = OpenHeadBlock(d, 0)
require.NoError(b, err) require.NoError(b, err)
stat, _ := head.wal.f.Stat() // stat, _ := head.wal.f.Stat()
fmt.Println("head block initialized from WAL") // fmt.Println("head block initialized from WAL")
fmt.Println("read series", head.stats.SeriesCount, "read samples", head.stats.SampleCount, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) // fmt.Println("read series", head.stats.SeriesCount, "read samples", head.stats.SampleCount, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024))
}) })
} }