mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Addressed comments.
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
c0a9ab0829
commit
cfba92a133
|
@ -106,7 +106,7 @@ func main() {
|
||||||
outageTolerance model.Duration
|
outageTolerance model.Duration
|
||||||
resendDelay model.Duration
|
resendDelay model.Duration
|
||||||
web web.Options
|
web web.Options
|
||||||
tsdb tsdb.Options
|
tsdb web.TSDBOptions
|
||||||
lookbackDelta model.Duration
|
lookbackDelta model.Duration
|
||||||
webTimeout model.Duration
|
webTimeout model.Duration
|
||||||
queryTimeout model.Duration
|
queryTimeout model.Duration
|
||||||
|
@ -656,6 +656,7 @@ func main() {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
// TSDB.
|
// TSDB.
|
||||||
|
opts := cfg.tsdb.ToTSDBOptions()
|
||||||
cancel := make(chan struct{})
|
cancel := make(chan struct{})
|
||||||
g.Add(
|
g.Add(
|
||||||
func() error {
|
func() error {
|
||||||
|
@ -669,7 +670,7 @@ func main() {
|
||||||
cfg.localStoragePath,
|
cfg.localStoragePath,
|
||||||
log.With(logger, "component", "tsdb"),
|
log.With(logger, "component", "tsdb"),
|
||||||
prometheus.DefaultRegisterer,
|
prometheus.DefaultRegisterer,
|
||||||
&cfg.tsdb,
|
&opts,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "opening storage failed")
|
return errors.Wrapf(err, "opening storage failed")
|
||||||
|
|
|
@ -33,7 +33,7 @@ type BufferedSeriesIterator struct {
|
||||||
// of the current element and the duration of delta before, initialized with an
|
// of the current element and the duration of delta before, initialized with an
|
||||||
// empty iterator. Use Reset() to set an actual iterator to be buffered.
|
// empty iterator. Use Reset() to set an actual iterator to be buffered.
|
||||||
func NewBuffer(delta int64) *BufferedSeriesIterator {
|
func NewBuffer(delta int64) *BufferedSeriesIterator {
|
||||||
return NewBufferIterator(&NoopSeriesIt, delta)
|
return NewBufferIterator(chunkenc.NewNopIterator(), delta)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBufferIterator returns a new iterator that buffers the values within the
|
// NewBufferIterator returns a new iterator that buffers the values within the
|
||||||
|
|
|
@ -33,7 +33,7 @@ var (
|
||||||
|
|
||||||
// Appendable allows creating appenders.
|
// Appendable allows creating appenders.
|
||||||
type Appendable interface {
|
type Appendable interface {
|
||||||
// Appender returns a new appender against the storage.
|
// Appender returns a new appender for the storage.
|
||||||
Appender() Appender
|
Appender() Appender
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,6 +120,7 @@ type Appender interface {
|
||||||
Commit() error
|
Commit() error
|
||||||
|
|
||||||
// Rollback rolls back all modifications made in the appender so far.
|
// Rollback rolls back all modifications made in the appender so far.
|
||||||
|
// Appender has to be discarded after rollback.
|
||||||
Rollback() error
|
Rollback() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,6 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -53,35 +51,8 @@ func NoopSeriesSet() SeriesSet {
|
||||||
return noopSeriesSet{}
|
return noopSeriesSet{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (noopSeriesSet) Next() bool {
|
func (noopSeriesSet) Next() bool { return false }
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (noopSeriesSet) At() Series {
|
func (noopSeriesSet) At() Series { return nil }
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (noopSeriesSet) Err() error {
|
func (noopSeriesSet) Err() error { return nil }
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type noopSeriesIterator struct{}
|
|
||||||
|
|
||||||
// NoopSeriesIt is a SeriesIterator that does nothing.
|
|
||||||
var NoopSeriesIt = noopSeriesIterator{}
|
|
||||||
|
|
||||||
func (noopSeriesIterator) At() (int64, float64) {
|
|
||||||
return math.MinInt64, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (noopSeriesIterator) Seek(t int64) bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (noopSeriesIterator) Next() bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (noopSeriesIterator) Err() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -87,8 +87,8 @@ type Iterator interface {
|
||||||
// At returns (math.MinInt64, 0.0) before the iterator has advanced.
|
// At returns (math.MinInt64, 0.0) before the iterator has advanced.
|
||||||
// TODO(bwplotka): Verify above statement on all implementations with unit test.
|
// TODO(bwplotka): Verify above statement on all implementations with unit test.
|
||||||
At() (int64, float64)
|
At() (int64, float64)
|
||||||
// Err returns the current error.
|
// Err returns the current error. It should be used only after iterator is
|
||||||
// Err can return undefined value before calling `Next` or `Seek`.
|
// exhausted, so `Next` or `Seek` returns false.
|
||||||
Err() error
|
Err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ func testChunk(t *testing.T, c Chunk) {
|
||||||
testutil.Ok(t, it2.Err())
|
testutil.Ok(t, it2.Err())
|
||||||
testutil.Equals(t, exp, res2)
|
testutil.Equals(t, exp, res2)
|
||||||
|
|
||||||
// 3. Test Iterator Seek.
|
// 3. Test iterator Seek.
|
||||||
mid := len(exp) / 2
|
mid := len(exp) / 2
|
||||||
|
|
||||||
it3 := c.Iterator(nil)
|
it3 := c.Iterator(nil)
|
||||||
|
|
|
@ -33,7 +33,6 @@ import (
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
|
@ -178,8 +177,8 @@ func (b *writeBenchmark) run() error {
|
||||||
l := log.With(b.logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
l := log.With(b.logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
||||||
|
|
||||||
st, err := tsdb.Open(dir, l, nil, &tsdb.Options{
|
st, err := tsdb.Open(dir, l, nil, &tsdb.Options{
|
||||||
RetentionDuration: 15 * 24 * model.Duration(time.Hour),
|
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
|
||||||
MinBlockDuration: 2 * model.Duration(time.Hour),
|
MinBlockDuration: int64(2 * time.Hour / time.Millisecond),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
64
tsdb/db.go
64
tsdb/db.go
|
@ -29,13 +29,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/alecthomas/units"
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
|
@ -46,9 +44,9 @@ import (
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default duration of a block in milliseconds - 2h.
|
|
||||||
const (
|
const (
|
||||||
DefaultBlockDuration = int64(2 * 60 * 60 * 1000) // 2h in miliseconds.
|
// Default duration of a block in milliseconds.
|
||||||
|
DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond)
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -61,13 +59,14 @@ var (
|
||||||
func DefaultOptions() *Options {
|
func DefaultOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
WALSegmentSize: wal.DefaultSegmentSize,
|
WALSegmentSize: wal.DefaultSegmentSize,
|
||||||
RetentionDuration: 15 * 24 * model.Duration(time.Hour),
|
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
|
||||||
MinBlockDuration: model.Duration(time.Duration(DefaultBlockDuration) * time.Millisecond),
|
MinBlockDuration: DefaultBlockDuration,
|
||||||
MaxBlockDuration: model.Duration(time.Duration(DefaultBlockDuration) * time.Millisecond),
|
MaxBlockDuration: DefaultBlockDuration,
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
AllowOverlappingBlocks: false,
|
AllowOverlappingBlocks: false,
|
||||||
WALCompression: false,
|
WALCompression: false,
|
||||||
StripeSize: DefaultStripeSize,
|
StripeSize: DefaultStripeSize,
|
||||||
|
ConvertTimeToSecondsFn: func(i int64) float64 { return float64(i / 1000) },
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,17 +76,19 @@ type Options struct {
|
||||||
// WALSegmentSize = 0, segment size is default size.
|
// WALSegmentSize = 0, segment size is default size.
|
||||||
// WALSegmentSize > 0, segment size is WALSegmentSize.
|
// WALSegmentSize > 0, segment size is WALSegmentSize.
|
||||||
// WALSegmentSize < 0, wal is disabled.
|
// WALSegmentSize < 0, wal is disabled.
|
||||||
WALSegmentSize units.Base2Bytes
|
WALSegmentSize int
|
||||||
|
|
||||||
// Duration of persisted data to keep.
|
// Duration of persisted data to keep.
|
||||||
RetentionDuration model.Duration
|
// Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration.
|
||||||
|
// Typically it is in milliseconds.
|
||||||
|
RetentionDuration int64
|
||||||
|
|
||||||
// Maximum number of bytes in blocks to be retained.
|
// Maximum number of bytes in blocks to be retained.
|
||||||
// 0 or less means disabled.
|
// 0 or less means disabled.
|
||||||
// NOTE: For proper storage calculations need to consider
|
// NOTE: For proper storage calculations need to consider
|
||||||
// the size of the WAL folder which is not added when calculating
|
// the size of the WAL folder which is not added when calculating
|
||||||
// the current size of the database.
|
// the current size of the database.
|
||||||
MaxBytes units.Base2Bytes
|
MaxBytes int64
|
||||||
|
|
||||||
// NoLockfile disables creation and consideration of a lock file.
|
// NoLockfile disables creation and consideration of a lock file.
|
||||||
NoLockfile bool
|
NoLockfile bool
|
||||||
|
@ -104,10 +105,17 @@ type Options struct {
|
||||||
|
|
||||||
// The timestamp range of head blocks after which they get persisted.
|
// The timestamp range of head blocks after which they get persisted.
|
||||||
// It's the minimum duration of any persisted block.
|
// It's the minimum duration of any persisted block.
|
||||||
MinBlockDuration model.Duration
|
// Unit agnostic as long as unit is consistent with RetentionDuration and MaxBlockDuration.
|
||||||
|
// Typically it is in milliseconds.
|
||||||
|
MinBlockDuration int64
|
||||||
|
|
||||||
// The maximum timestamp range of compacted blocks.
|
// The maximum timestamp range of compacted blocks.
|
||||||
MaxBlockDuration model.Duration
|
// Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration.
|
||||||
|
// Typically it is in milliseconds.
|
||||||
|
MaxBlockDuration int64
|
||||||
|
|
||||||
|
// ConvertTimeToSecondsFn function is used for time based values to convert to seconds for metric purposes.
|
||||||
|
ConvertTimeToSecondsFn func(int64) float64
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB handles reads and writes of time series falling into
|
// DB handles reads and writes of time series falling into
|
||||||
|
@ -163,7 +171,7 @@ type dbMetrics struct {
|
||||||
headMinTime prometheus.GaugeFunc
|
headMinTime prometheus.GaugeFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
func newDBMetrics(db *DB, r prometheus.Registerer, convToSecondsFn func(int64) float64) *dbMetrics {
|
||||||
m := &dbMetrics{}
|
m := &dbMetrics{}
|
||||||
|
|
||||||
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
@ -238,28 +246,26 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
||||||
Name: "prometheus_tsdb_size_retentions_total",
|
Name: "prometheus_tsdb_size_retentions_total",
|
||||||
Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.",
|
Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Unit agnostic metrics.
|
||||||
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
Name: "prometheus_tsdb_lowest_timestamp_seconds",
|
Name: "prometheus_tsdb_lowest_timestamp_seconds",
|
||||||
Help: "Lowest timestamp value stored in the database.",
|
Help: "Lowest timestamp value stored in the database.",
|
||||||
}, func() float64 {
|
}, func() float64 {
|
||||||
bb := db.Blocks()
|
bb := db.Blocks()
|
||||||
if len(bb) == 0 {
|
if len(bb) == 0 {
|
||||||
return float64(db.Head().MinTime()) / 1000
|
return convToSecondsFn(db.Head().MinTime())
|
||||||
}
|
}
|
||||||
return float64(db.Blocks()[0].Meta().MinTime) / 1000
|
return convToSecondsFn(db.Blocks()[0].Meta().MinTime)
|
||||||
})
|
})
|
||||||
m.headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
m.headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
Name: "prometheus_tsdb_head_min_time_seconds",
|
Name: "prometheus_tsdb_head_min_time_seconds",
|
||||||
Help: "Minimum time bound of the head block.",
|
Help: "Minimum time bound of the head block.",
|
||||||
}, func() float64 {
|
}, func() float64 { return convToSecondsFn(db.Head().MinTime()) })
|
||||||
return float64(db.Head().MinTime()) / 1000
|
|
||||||
})
|
|
||||||
m.headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
m.headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
Name: "prometheus_tsdb_head_max_time_seconds",
|
Name: "prometheus_tsdb_head_max_time_seconds",
|
||||||
Help: "Maximum timestamp of the head block.",
|
Help: "Maximum timestamp of the head block.",
|
||||||
}, func() float64 {
|
}, func() float64 { return convToSecondsFn(db.Head().MaxTime()) })
|
||||||
return float64(db.Head().MaxTime()) / 1000
|
|
||||||
})
|
|
||||||
if r != nil {
|
if r != nil {
|
||||||
r.MustRegister(
|
r.MustRegister(
|
||||||
m.loadedBlocks,
|
m.loadedBlocks,
|
||||||
|
@ -350,7 +356,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error {
|
||||||
context.Background(),
|
context.Background(),
|
||||||
nil,
|
nil,
|
||||||
db.logger,
|
db.logger,
|
||||||
ExponentialBlockRanges(int64(time.Duration(DefaultOptions().MinBlockDuration))/1e6, 3, 5),
|
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
|
||||||
chunkenc.NewPool(),
|
chunkenc.NewPool(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -520,7 +526,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.MinBlockDuration <= 0 {
|
if opts.MinBlockDuration <= 0 {
|
||||||
opts.MinBlockDuration = model.Duration(time.Duration(DefaultBlockDuration) * time.Millisecond)
|
opts.MinBlockDuration = DefaultBlockDuration
|
||||||
}
|
}
|
||||||
if opts.MinBlockDuration > opts.MaxBlockDuration {
|
if opts.MinBlockDuration > opts.MaxBlockDuration {
|
||||||
opts.MaxBlockDuration = opts.MinBlockDuration
|
opts.MaxBlockDuration = opts.MinBlockDuration
|
||||||
|
@ -529,7 +535,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
||||||
if len(rngs) == 0 {
|
if len(rngs) == 0 {
|
||||||
// Start with smallest block duration and create exponential buckets until the exceed the
|
// Start with smallest block duration and create exponential buckets until the exceed the
|
||||||
// configured maximum block duration.
|
// configured maximum block duration.
|
||||||
rngs = ExponentialBlockRanges(int64(time.Duration(opts.MinBlockDuration).Seconds()*1000), 10, 3)
|
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
|
||||||
}
|
}
|
||||||
return opts, rngs
|
return opts, rngs
|
||||||
}
|
}
|
||||||
|
@ -543,7 +549,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, v := range rngs {
|
for i, v := range rngs {
|
||||||
if v > int64(time.Duration(opts.MaxBlockDuration).Seconds()*1000) {
|
if v > opts.MaxBlockDuration {
|
||||||
rngs = rngs[:i]
|
rngs = rngs[:i]
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -568,7 +574,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
autoCompact: true,
|
autoCompact: true,
|
||||||
chunkPool: chunkenc.NewPool(),
|
chunkPool: chunkenc.NewPool(),
|
||||||
}
|
}
|
||||||
db.metrics = newDBMetrics(db, r)
|
db.metrics = newDBMetrics(db, r, opts.ConvertTimeToSecondsFn)
|
||||||
|
|
||||||
maxBytes := opts.MaxBytes
|
maxBytes := opts.MaxBytes
|
||||||
if maxBytes < 0 {
|
if maxBytes < 0 {
|
||||||
|
@ -602,7 +608,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
if opts.WALSegmentSize >= 0 {
|
if opts.WALSegmentSize >= 0 {
|
||||||
// Wal is set to a custom size.
|
// Wal is set to a custom size.
|
||||||
if opts.WALSegmentSize > 0 {
|
if opts.WALSegmentSize > 0 {
|
||||||
segmentSize = int(opts.WALSegmentSize)
|
segmentSize = opts.WALSegmentSize
|
||||||
}
|
}
|
||||||
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
|
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1013,7 +1019,7 @@ func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo
|
||||||
for i, block := range blocks {
|
for i, block := range blocks {
|
||||||
// The difference between the first block and this block is larger than
|
// The difference between the first block and this block is larger than
|
||||||
// the retention period so any blocks after that are added as deletable.
|
// the retention period so any blocks after that are added as deletable.
|
||||||
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > (int64(db.opts.RetentionDuration)/int64(time.Millisecond)) {
|
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration {
|
||||||
for _, b := range blocks[i:] {
|
for _, b := range blocks[i:] {
|
||||||
deletable[b.meta.ULID] = b
|
deletable[b.meta.ULID] = b
|
||||||
}
|
}
|
||||||
|
@ -1525,7 +1531,7 @@ func (s *ReadyStorage) StartTime() (int64, error) {
|
||||||
return startTime + s.startTimeMargin, nil
|
return startTime + s.startTimeMargin, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return int64(model.Latest), ErrNotReady
|
return math.MaxInt64, ErrNotReady
|
||||||
}
|
}
|
||||||
|
|
||||||
// Querier implements the Storage interface.
|
// Querier implements the Storage interface.
|
||||||
|
|
|
@ -30,7 +30,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/alecthomas/units"
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -864,7 +863,7 @@ func TestWALSegmentSizeOptions(t *testing.T) {
|
||||||
for segmentSize, testFunc := range tests {
|
for segmentSize, testFunc := range tests {
|
||||||
t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) {
|
t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) {
|
||||||
opts := DefaultOptions()
|
opts := DefaultOptions()
|
||||||
opts.WALSegmentSize = units.Base2Bytes(segmentSize)
|
opts.WALSegmentSize = segmentSize
|
||||||
db, closeFn := openTestDB(t, opts, nil)
|
db, closeFn := openTestDB(t, opts, nil)
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
|
|
||||||
|
@ -1083,7 +1082,7 @@ func TestTimeRetention(t *testing.T) {
|
||||||
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
|
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
|
||||||
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
|
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
|
||||||
|
|
||||||
db.opts.RetentionDuration = model.Duration(time.Duration(blocks[2].MaxTime-blocks[1].MinTime) * time.Millisecond)
|
db.opts.RetentionDuration = blocks[2].MaxTime - blocks[1].MinTime
|
||||||
testutil.Ok(t, db.reload())
|
testutil.Ok(t, db.reload())
|
||||||
|
|
||||||
expBlocks := blocks[1:]
|
expBlocks := blocks[1:]
|
||||||
|
@ -1163,8 +1162,8 @@ func TestSizeRetention(t *testing.T) {
|
||||||
// Check total size, total count and check that the oldest block was deleted.
|
// Check total size, total count and check that the oldest block was deleted.
|
||||||
firstBlockSize := db.Blocks()[0].Size()
|
firstBlockSize := db.Blocks()[0].Size()
|
||||||
sizeLimit := actSize - firstBlockSize
|
sizeLimit := actSize - firstBlockSize
|
||||||
db.opts.MaxBytes = units.Base2Bytes(sizeLimit) // Set the new db size limit one block smaller that the actual size.
|
db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size.
|
||||||
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
|
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
|
||||||
|
|
||||||
expBlocks := blocks[1:]
|
expBlocks := blocks[1:]
|
||||||
actBlocks := db.Blocks()
|
actBlocks := db.Blocks()
|
||||||
|
@ -1197,7 +1196,7 @@ func TestSizeRetentionMetric(t *testing.T) {
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
db, closeFn := openTestDB(t, &Options{
|
db, closeFn := openTestDB(t, &Options{
|
||||||
MaxBytes: units.Base2Bytes(c.maxBytes),
|
MaxBytes: c.maxBytes,
|
||||||
}, []int64{100})
|
}, []int64{100})
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, db.Close())
|
testutil.Ok(t, db.Close())
|
||||||
|
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
|
@ -35,8 +34,8 @@ func New(t testutil.T) storage.Storage {
|
||||||
// Tests just load data for a series sequentially. Thus we
|
// Tests just load data for a series sequentially. Thus we
|
||||||
// need a long appendable window.
|
// need a long appendable window.
|
||||||
opts := tsdb.DefaultOptions()
|
opts := tsdb.DefaultOptions()
|
||||||
opts.MinBlockDuration = model.Duration(24 * time.Hour)
|
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
|
||||||
opts.MaxBlockDuration = model.Duration(24 * time.Hour)
|
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
|
||||||
db, err := tsdb.Open(dir, nil, nil, opts)
|
db, err := tsdb.Open(dir, nil, nil, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Opening test storage failed: %s", err)
|
t.Fatalf("Opening test storage failed: %s", err)
|
||||||
|
|
|
@ -1189,19 +1189,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||||
for i, query := range req.Queries {
|
for i, query := range req.Queries {
|
||||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error {
|
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error {
|
||||||
// The streaming API provides sorted series.
|
// The streaming API provides sorted series.
|
||||||
set, ws, err := querier.SelectSorted(selectParams, filteredMatchers...)
|
// TODO(bwplotka): Handle warnings via query log.
|
||||||
|
set, _, err := querier.SelectSorted(selectParams, filteredMatchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(ws) > 0 {
|
|
||||||
msg := ""
|
|
||||||
for _, w := range ws {
|
|
||||||
msg += w.Error() + ";"
|
|
||||||
}
|
|
||||||
level.Warn(api.logger).Log("remote read warnings", "warnings", msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
return remote.StreamChunkedReadResponses(
|
return remote.StreamChunkedReadResponses(
|
||||||
remote.NewChunkedWriter(w, f),
|
remote.NewChunkedWriter(w, f),
|
||||||
int64(i),
|
int64(i),
|
||||||
|
|
33
web/web.go
33
web/web.go
|
@ -38,6 +38,7 @@ import (
|
||||||
template_text "text/template"
|
template_text "text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/alecthomas/units"
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
conntrack "github.com/mwitkow/go-conntrack"
|
conntrack "github.com/mwitkow/go-conntrack"
|
||||||
|
@ -208,11 +209,41 @@ func (h *Handler) ApplyConfig(conf *config.Config) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TSDBOptions is tsdb.Option version with defined units.
|
||||||
|
// This is required as tsdb.Option fields are unit agnostic (time).
|
||||||
|
type TSDBOptions struct {
|
||||||
|
WALSegmentSize units.Base2Bytes
|
||||||
|
RetentionDuration model.Duration
|
||||||
|
MaxBytes units.Base2Bytes
|
||||||
|
NoLockfile bool
|
||||||
|
AllowOverlappingBlocks bool
|
||||||
|
WALCompression bool
|
||||||
|
StripeSize int
|
||||||
|
MinBlockDuration model.Duration
|
||||||
|
MaxBlockDuration model.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (opts TSDBOptions) ToTSDBOptions() tsdb.Options {
|
||||||
|
return tsdb.Options{
|
||||||
|
WALSegmentSize: int(opts.WALSegmentSize),
|
||||||
|
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
|
||||||
|
MaxBytes: int64(opts.MaxBytes),
|
||||||
|
NoLockfile: opts.NoLockfile,
|
||||||
|
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
|
||||||
|
WALCompression: opts.WALCompression,
|
||||||
|
StripeSize: opts.StripeSize,
|
||||||
|
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
|
||||||
|
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
|
||||||
|
|
||||||
|
ConvertTimeToSecondsFn: func(i int64) float64 { return float64(i / 1000) },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Options for the web Handler.
|
// Options for the web Handler.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Context context.Context
|
Context context.Context
|
||||||
TSDB func() *tsdb.DB
|
TSDB func() *tsdb.DB
|
||||||
TSDBCfg tsdb.Options
|
TSDBCfg TSDBOptions
|
||||||
Storage storage.Storage
|
Storage storage.Storage
|
||||||
QueryEngine *promql.Engine
|
QueryEngine *promql.Engine
|
||||||
LookbackDelta time.Duration
|
LookbackDelta time.Duration
|
||||||
|
|
Loading…
Reference in a new issue