Addressed Brian's comments, moved metrics to main.go

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-02-14 18:48:55 +00:00
parent 5d84e5d895
commit 59c9d6ef45
7 changed files with 116 additions and 79 deletions

View file

@ -666,15 +666,16 @@ func main() {
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
}
}
db, err := tsdb.Open(
db, err := openDBWithMetrics(
cfg.localStoragePath,
log.With(logger, "component", "tsdb"),
logger,
prometheus.DefaultRegisterer,
&opts,
)
if err != nil {
return errors.Wrapf(err, "opening storage failed")
}
level.Info(logger).Log("fs_type", prom_runtime.Statfs(cfg.localStoragePath))
level.Info(logger).Log("msg", "TSDB started")
level.Debug(logger).Log("msg", "TSDB options",
@ -745,6 +746,40 @@ func main() {
level.Info(logger).Log("msg", "See you next time!")
}
func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options) (*tsdb.DB, error) {
db, err := tsdb.Open(
dir,
log.With(logger, "component", "tsdb"),
reg,
opts,
)
if err != nil {
return nil, err
}
reg.MustRegister(
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp_seconds",
Help: "Lowest timestamp value stored in the database.",
}, func() float64 {
bb := db.Blocks()
if len(bb) == 0 {
return float64(db.Head().MinTime() / 1000)
}
return float64(db.Blocks()[0].Meta().MinTime / 1000)
}), prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time_seconds",
Help: "Minimum time bound of the head block.",
}, func() float64 { return float64(db.Head().MinTime() / 1000) }),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time_seconds",
Help: "Maximum timestamp of the head block.",
}, func() float64 { return float64(db.Head().MaxTime() / 1000) }),
)
return db, nil
}
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)

View file

@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
"os/exec"
"path/filepath"
@ -24,6 +25,9 @@ import (
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/rules"
@ -232,3 +236,71 @@ func TestWALSegmentSizeBounds(t *testing.T) {
}
}
}
func TestTimeMetrics(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "time_metrics_e2e")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpDir))
}()
reg := prometheus.NewRegistry()
db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, db.Close())
}()
// Check initial values.
testutil.Equals(t, map[string]float64{
"prometheus_tsdb_lowest_timestamp_seconds": float64(math.MaxInt64) / 1000,
"prometheus_tsdb_head_min_time_seconds": float64(math.MaxInt64) / 1000,
"prometheus_tsdb_head_max_time_seconds": float64(math.MinInt64) / 1000,
}, getCurrentGaugeValuesFor(t, reg,
"prometheus_tsdb_lowest_timestamp_seconds",
"prometheus_tsdb_head_min_time_seconds",
"prometheus_tsdb_head_max_time_seconds",
))
app := db.Appender()
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1)
testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1)
testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 3000, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, map[string]float64{
"prometheus_tsdb_lowest_timestamp_seconds": 1.0,
"prometheus_tsdb_head_min_time_seconds": 1.0,
"prometheus_tsdb_head_max_time_seconds": 3.0,
}, getCurrentGaugeValuesFor(t, reg,
"prometheus_tsdb_lowest_timestamp_seconds",
"prometheus_tsdb_head_min_time_seconds",
"prometheus_tsdb_head_max_time_seconds",
))
}
func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames ...string) map[string]float64 {
f, err := reg.Gather()
testutil.Ok(t, err)
res := make(map[string]float64, len(metricNames))
for _, g := range f {
for _, m := range metricNames {
if g.GetName() != m {
continue
}
testutil.Equals(t, 1, len(g.GetMetric()))
if _, ok := res[m]; ok {
t.Error("expected only one metric family for", m)
t.FailNow()
}
res[m] = *g.GetMetric()[0].GetGauge().Value
}
}
return res
}

View file

@ -15,6 +15,7 @@ package storage
import (
"context"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
@ -33,7 +34,7 @@ func TestSelectSorted(t *testing.T) {
priStorage := teststorage.New(t)
defer priStorage.Close()
app1, _ := priStorage.Appender()
app1 := priStorage.Appender()
app1.Add(inputLabel, 0, 0)
inputTotalSize++
app1.Add(inputLabel, 1000, 1)
@ -45,7 +46,7 @@ func TestSelectSorted(t *testing.T) {
remoteStorage1 := teststorage.New(t)
defer remoteStorage1.Close()
app2, _ := remoteStorage1.Appender()
app2 := remoteStorage1.Appender()
app2.Add(inputLabel, 3000, 3)
inputTotalSize++
app2.Add(inputLabel, 4000, 4)
@ -58,7 +59,7 @@ func TestSelectSorted(t *testing.T) {
remoteStorage2 := teststorage.New(t)
defer remoteStorage2.Close()
app3, _ := remoteStorage2.Appender()
app3 := remoteStorage2.Appender()
app3.Add(inputLabel, 6000, 6)
inputTotalSize++
app3.Add(inputLabel, 7000, 7)

View file

@ -86,7 +86,7 @@ type Iterator interface {
// Before the iterator has advanced At behaviour is unspecified.
At() (int64, float64)
// Err returns the current error. It should be used only after iterator is
// exhausted, so `Next` or `Seek` returns false.
// exhausted, that is `Next` or `Seek` returns false.
Err() error
}

View file

@ -66,7 +66,6 @@ func DefaultOptions() *Options {
AllowOverlappingBlocks: false,
WALCompression: false,
StripeSize: DefaultStripeSize,
ConvertTimeToSecondsFn: func(i int64) float64 { return float64(i / 1000) },
}
}
@ -113,9 +112,6 @@ type Options struct {
// Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration.
// Typically it is in milliseconds.
MaxBlockDuration int64
// ConvertTimeToSecondsFn function is used for time based values to convert to seconds for metric purposes.
ConvertTimeToSecondsFn func(int64) float64
}
// DB handles reads and writes of time series falling into
@ -166,12 +162,9 @@ type dbMetrics struct {
tombCleanTimer prometheus.Histogram
blocksBytes prometheus.Gauge
maxBytes prometheus.Gauge
minTime prometheus.GaugeFunc
headMaxTime prometheus.GaugeFunc
headMinTime prometheus.GaugeFunc
}
func newDBMetrics(db *DB, r prometheus.Registerer, convToSecondsFn func(int64) float64) *dbMetrics {
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{}
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
@ -247,25 +240,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer, convToSecondsFn func(int64) f
Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.",
})
// Unit agnostic metrics.
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp_seconds",
Help: "Lowest timestamp value stored in the database.",
}, func() float64 {
bb := db.Blocks()
if len(bb) == 0 {
return convToSecondsFn(db.Head().MinTime())
}
return convToSecondsFn(db.Blocks()[0].Meta().MinTime)
})
m.headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time_seconds",
Help: "Minimum time bound of the head block.",
}, func() float64 { return convToSecondsFn(db.Head().MinTime()) })
m.headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time_seconds",
Help: "Maximum timestamp of the head block.",
}, func() float64 { return convToSecondsFn(db.Head().MaxTime()) })
if r != nil {
r.MustRegister(
m.loadedBlocks,
@ -281,9 +255,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer, convToSecondsFn func(int64) f
m.tombCleanTimer,
m.blocksBytes,
m.maxBytes,
m.minTime,
m.headMaxTime,
m.headMinTime,
)
}
return m
@ -574,7 +545,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
autoCompact: true,
chunkPool: chunkenc.NewPool(),
}
db.metrics = newDBMetrics(db, r, opts.ConvertTimeToSecondsFn)
db.metrics = newDBMetrics(db, r)
maxBytes := opts.MaxBytes
if maxBytes < 0 {

View file

@ -35,8 +35,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -2701,41 +2699,3 @@ func TestChunkReader_ConcurrentReads(t *testing.T) {
}
testutil.Ok(t, r.Close())
}
func TestTimeMetrics(t *testing.T) {
db, closeFn := openTestDB(t, nil, nil)
defer func() {
testutil.Ok(t, db.Close())
closeFn()
}()
metrics := &dto.Metric{}
// Check initial values.
testutil.Ok(t, db.metrics.minTime.Write(metrics))
testutil.Equals(t, float64(math.MaxInt64)/1000, metrics.Gauge.GetValue())
testutil.Ok(t, db.metrics.headMinTime.Write(metrics))
testutil.Equals(t, float64(math.MaxInt64)/1000, metrics.Gauge.GetValue())
testutil.Ok(t, db.metrics.headMaxTime.Write(metrics))
testutil.Equals(t, float64(math.MinInt64)/1000, metrics.Gauge.GetValue())
app := db.Appender()
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1, 1)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2, 1)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 3, 1)
testutil.Ok(t, app.Commit())
// Check after adding some samples.
testutil.Ok(t, db.metrics.minTime.Write(metrics))
testutil.Equals(t, 0.0, metrics.Gauge.GetValue())
testutil.Ok(t, db.metrics.headMinTime.Write(metrics))
testutil.Equals(t, 0.0, metrics.Gauge.GetValue())
testutil.Ok(t, db.metrics.headMaxTime.Write(metrics))
testutil.Equals(t, 0.0, metrics.Gauge.GetValue())
}

View file

@ -234,8 +234,6 @@ func (opts TSDBOptions) ToTSDBOptions() tsdb.Options {
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
ConvertTimeToSecondsFn: func(i int64) float64 { return float64(i / 1000) },
}
}