mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-12 16:44:05 -08:00
Merge pull request #7025 from prometheus/release-2.17
Merge release 2.17 into master
This commit is contained in:
commit
f1984bb007
|
@ -1,4 +1,4 @@
|
||||||
## 2.17.0-rc.3 / 2020-03-18
|
## 2.17.0-rc.4 / 2020-03-21
|
||||||
|
|
||||||
This release implements isolation in TSDB. API queries and recording rules are
|
This release implements isolation in TSDB. API queries and recording rules are
|
||||||
guaranteed to only see full scrapes and full recording rules. This comes with a
|
guaranteed to only see full scrapes and full recording rules. This comes with a
|
||||||
|
@ -20,8 +20,11 @@ some increase in memory usage, CPU usage, or query latency.
|
||||||
* [BUGFIX] PromQL: Do not escape HTML-like chars in query log #6834 #6795
|
* [BUGFIX] PromQL: Do not escape HTML-like chars in query log #6834 #6795
|
||||||
* [BUGFIX] React UI: Fix data table matrix values #6896
|
* [BUGFIX] React UI: Fix data table matrix values #6896
|
||||||
* [BUGFIX] React UI: Fix new targets page not loading when using non-ASCII characters #6892
|
* [BUGFIX] React UI: Fix new targets page not loading when using non-ASCII characters #6892
|
||||||
|
* [BUGFIX] Remote read: Fix duplication of metrics read from remote storage with external labels #6967 #7018
|
||||||
|
* [BUGFIX] Remote write: Register WAL watcher and live reader metrics for all remotes, not just the first one #6998
|
||||||
* [BUGFIX] Scrape: Prevent removal of metric names upon relabeling #6891
|
* [BUGFIX] Scrape: Prevent removal of metric names upon relabeling #6891
|
||||||
* [BUGFIX] Scrape: Fix 'superfluous response.WriteHeader call' errors when scrape fails under some circonstances #6986
|
* [BUGFIX] Scrape: Fix 'superfluous response.WriteHeader call' errors when scrape fails under some circonstances #6986
|
||||||
|
* [BUGFIX] Scrape: Fix crash when reloads are separated by two scrape intervals #7011
|
||||||
|
|
||||||
## 2.16.0 / 2020-02-13
|
## 2.16.0 / 2020-02-13
|
||||||
|
|
||||||
|
|
|
@ -314,6 +314,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
for fp, oldLoop := range sp.loops {
|
for fp, oldLoop := range sp.loops {
|
||||||
var cache *scrapeCache
|
var cache *scrapeCache
|
||||||
if oc := oldLoop.getCache(); reuseCache && oc != nil {
|
if oc := oldLoop.getCache(); reuseCache && oc != nil {
|
||||||
|
oldLoop.disableEndOfRunStalenessMarkers()
|
||||||
cache = oc
|
cache = oc
|
||||||
} else {
|
} else {
|
||||||
cache = newScrapeCache()
|
cache = newScrapeCache()
|
||||||
|
@ -593,6 +594,7 @@ type loop interface {
|
||||||
run(interval, timeout time.Duration, errc chan<- error)
|
run(interval, timeout time.Duration, errc chan<- error)
|
||||||
stop()
|
stop()
|
||||||
getCache() *scrapeCache
|
getCache() *scrapeCache
|
||||||
|
disableEndOfRunStalenessMarkers()
|
||||||
}
|
}
|
||||||
|
|
||||||
type cacheEntry struct {
|
type cacheEntry struct {
|
||||||
|
@ -619,6 +621,8 @@ type scrapeLoop struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
|
|
||||||
|
disabledEndOfRunStalenessMarkers bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
||||||
|
@ -996,7 +1000,9 @@ mainLoop:
|
||||||
|
|
||||||
close(sl.stopped)
|
close(sl.stopped)
|
||||||
|
|
||||||
sl.endOfRunStaleness(last, ticker, interval)
|
if !sl.disabledEndOfRunStalenessMarkers {
|
||||||
|
sl.endOfRunStaleness(last, ticker, interval)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
|
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
|
||||||
|
@ -1054,6 +1060,10 @@ func (sl *scrapeLoop) stop() {
|
||||||
<-sl.stopped
|
<-sl.stopped
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() {
|
||||||
|
sl.disabledEndOfRunStalenessMarkers = true
|
||||||
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) getCache() *scrapeCache {
|
func (sl *scrapeLoop) getCache() *scrapeCache {
|
||||||
return sl.cache
|
return sl.cache
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,6 +141,9 @@ func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
||||||
l.startFunc(interval, timeout, errc)
|
l.startFunc(interval, timeout, errc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *testLoop) disableEndOfRunStalenessMarkers() {
|
||||||
|
}
|
||||||
|
|
||||||
func (l *testLoop) stop() {
|
func (l *testLoop) stop() {
|
||||||
l.stopFunc()
|
l.stopFunc()
|
||||||
}
|
}
|
||||||
|
@ -1839,3 +1842,39 @@ func TestScrapeAddFast(t *testing.T) {
|
||||||
_, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second))
|
_, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReuseCacheRace(t *testing.T) {
|
||||||
|
var (
|
||||||
|
app = &nopAppendable{}
|
||||||
|
cfg = &config.ScrapeConfig{
|
||||||
|
JobName: "Prometheus",
|
||||||
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
||||||
|
ScrapeInterval: model.Duration(5 * time.Second),
|
||||||
|
MetricsPath: "/metrics",
|
||||||
|
}
|
||||||
|
sp, _ = newScrapePool(cfg, app, 0, nil)
|
||||||
|
t1 = &Target{
|
||||||
|
discoveredLabels: labels.Labels{
|
||||||
|
labels.Label{
|
||||||
|
Name: "labelNew",
|
||||||
|
Value: "nameNew",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
sp.sync([]*Target{t1})
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
for i := uint(1); i > 0; i++ {
|
||||||
|
if time.Since(start) > 5*time.Second {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
sp.reload(&config.ScrapeConfig{
|
||||||
|
JobName: "Prometheus",
|
||||||
|
ScrapeTimeout: model.Duration(1 * time.Millisecond),
|
||||||
|
ScrapeInterval: model.Duration(1 * time.Millisecond),
|
||||||
|
MetricsPath: "/metrics",
|
||||||
|
SampleLimit: i,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -271,7 +271,7 @@ type QueueManager struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueManager builds a new QueueManager.
|
// NewQueueManager builds a new QueueManager.
|
||||||
func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager {
|
func NewQueueManager(metrics *queueManagerMetrics, watcherMetrics *wal.WatcherMetrics, readerMetrics *wal.LiveReaderMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
@ -301,7 +301,7 @@ func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, lo
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, client.Name(), t, walDir)
|
t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir)
|
||||||
t.shards = t.newShards()
|
t.shards = t.newShards()
|
||||||
|
|
||||||
return t
|
return t
|
||||||
|
|
|
@ -61,7 +61,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
|
@ -90,7 +90,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
@ -131,7 +131,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -150,7 +150,8 @@ func TestShutdown(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
|
||||||
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
||||||
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
||||||
samples, series := createTimeseries(n, n)
|
samples, series := createTimeseries(n, n)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
@ -188,7 +189,7 @@ func TestSeriesReset(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
||||||
for i := 0; i < numSegments; i++ {
|
for i := 0; i < numSegments; i++ {
|
||||||
series := []record.RefSeries{}
|
series := []record.RefSeries{}
|
||||||
for j := 0; j < numSeries; j++ {
|
for j := 0; j < numSeries; j++ {
|
||||||
|
@ -218,7 +219,7 @@ func TestReshard(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -251,7 +252,7 @@ func TestReshardRaceWithStop(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
m = NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||||
m.Start()
|
m.Start()
|
||||||
h.Unlock()
|
h.Unlock()
|
||||||
h.Lock()
|
h.Lock()
|
||||||
|
@ -269,7 +270,7 @@ func TestReshardRaceWithStop(t *testing.T) {
|
||||||
func TestReleaseNoninternedString(t *testing.T) {
|
func TestReleaseNoninternedString(t *testing.T) {
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
c := NewTestStorageClient()
|
c := NewTestStorageClient()
|
||||||
m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||||
m.Start()
|
m.Start()
|
||||||
|
|
||||||
for i := 1; i < 1000; i++ {
|
for i := 1; i < 1000; i++ {
|
||||||
|
@ -316,7 +317,7 @@ func TestCalculateDesiredsShards(t *testing.T) {
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
client := NewTestStorageClient()
|
client := NewTestStorageClient()
|
||||||
m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline)
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline)
|
||||||
m.numShards = c.startingShards
|
m.numShards = c.startingShards
|
||||||
m.samplesIn.incr(c.samplesIn)
|
m.samplesIn.incr(c.samplesIn)
|
||||||
m.samplesOut.incr(c.samplesOut)
|
m.samplesOut.incr(c.samplesOut)
|
||||||
|
@ -527,7 +528,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
|
@ -569,7 +570,7 @@ func BenchmarkStartup(b *testing.B) {
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
c := NewTestBlockedStorageClient()
|
c := NewTestBlockedStorageClient()
|
||||||
m := NewQueueManager(nil, metrics, logger, dir,
|
m := NewQueueManager(metrics, nil, nil, logger, dir,
|
||||||
newEWMARate(ewmaWeight, shardUpdateDuration),
|
newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||||
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
|
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
|
||||||
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
|
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
|
||||||
|
@ -620,7 +621,7 @@ func TestCalculateDesiredShards(t *testing.T) {
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil)
|
metrics := newQueueManagerMetrics(nil)
|
||||||
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
||||||
m := NewQueueManager(nil, metrics, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
|
|
||||||
// Need to start the queue manager so the proper metrics are initialized.
|
// Need to start the queue manager so the proper metrics are initialized.
|
||||||
// However we can stop it right away since we don't need to do any actual
|
// However we can stop it right away since we don't need to do any actual
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/wal"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -46,11 +47,12 @@ var (
|
||||||
|
|
||||||
// WriteStorage represents all the remote write storage.
|
// WriteStorage represents all the remote write storage.
|
||||||
type WriteStorage struct {
|
type WriteStorage struct {
|
||||||
reg prometheus.Registerer
|
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
|
|
||||||
queueMetrics *queueManagerMetrics
|
queueMetrics *queueManagerMetrics
|
||||||
|
watcherMetrics *wal.WatcherMetrics
|
||||||
|
liveReaderMetrics *wal.LiveReaderMetrics
|
||||||
configHash string
|
configHash string
|
||||||
externalLabelHash string
|
externalLabelHash string
|
||||||
walDir string
|
walDir string
|
||||||
|
@ -65,13 +67,14 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
rws := &WriteStorage{
|
rws := &WriteStorage{
|
||||||
queues: make(map[string]*QueueManager),
|
queues: make(map[string]*QueueManager),
|
||||||
reg: reg,
|
queueMetrics: newQueueManagerMetrics(reg),
|
||||||
queueMetrics: newQueueManagerMetrics(reg),
|
watcherMetrics: wal.NewWatcherMetrics(reg),
|
||||||
logger: logger,
|
liveReaderMetrics: wal.NewLiveReaderMetrics(reg),
|
||||||
flushDeadline: flushDeadline,
|
logger: logger,
|
||||||
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
|
flushDeadline: flushDeadline,
|
||||||
walDir: walDir,
|
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||||
|
walDir: walDir,
|
||||||
}
|
}
|
||||||
go rws.run()
|
go rws.run()
|
||||||
return rws
|
return rws
|
||||||
|
@ -152,8 +155,9 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
newQueues[hash] = NewQueueManager(
|
newQueues[hash] = NewQueueManager(
|
||||||
rws.reg,
|
|
||||||
rws.queueMetrics,
|
rws.queueMetrics,
|
||||||
|
rws.watcherMetrics,
|
||||||
|
rws.liveReaderMetrics,
|
||||||
rws.logger,
|
rws.logger,
|
||||||
rws.walDir,
|
rws.walDir,
|
||||||
rws.samplesIn,
|
rws.samplesIn,
|
||||||
|
|
|
@ -28,14 +28,14 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// liveReaderMetrics holds all metrics exposed by the LiveReader.
|
// liveReaderMetrics holds all metrics exposed by the LiveReader.
|
||||||
type liveReaderMetrics struct {
|
type LiveReaderMetrics struct {
|
||||||
readerCorruptionErrors *prometheus.CounterVec
|
readerCorruptionErrors *prometheus.CounterVec
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLiveReaderMetrics instantiates, registers and returns metrics to be injected
|
// NewLiveReaderMetrics instantiates, registers and returns metrics to be injected
|
||||||
// at LiveReader instantiation.
|
// at LiveReader instantiation.
|
||||||
func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics {
|
func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics {
|
||||||
m := &liveReaderMetrics{
|
m := &LiveReaderMetrics{
|
||||||
readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
|
readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
Name: "prometheus_tsdb_wal_reader_corruption_errors_total",
|
Name: "prometheus_tsdb_wal_reader_corruption_errors_total",
|
||||||
Help: "Errors encountered when reading the WAL.",
|
Help: "Errors encountered when reading the WAL.",
|
||||||
|
@ -43,15 +43,14 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
if reg != nil {
|
if reg != nil {
|
||||||
// TODO(codesome): log error.
|
reg.MustRegister(m.readerCorruptionErrors)
|
||||||
_ = reg.Register(m.readerCorruptionErrors)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLiveReader returns a new live reader.
|
// NewLiveReader returns a new live reader.
|
||||||
func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader {
|
func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader {
|
||||||
lr := &LiveReader{
|
lr := &LiveReader{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
rdr: r,
|
rdr: r,
|
||||||
|
@ -89,7 +88,7 @@ type LiveReader struct {
|
||||||
// NB the non-ive Reader implementation allows for this.
|
// NB the non-ive Reader implementation allows for this.
|
||||||
permissive bool
|
permissive bool
|
||||||
|
|
||||||
metrics *liveReaderMetrics
|
metrics *LiveReaderMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
|
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
|
||||||
|
|
|
@ -66,7 +66,7 @@ type Watcher struct {
|
||||||
walDir string
|
walDir string
|
||||||
lastCheckpoint string
|
lastCheckpoint string
|
||||||
metrics *WatcherMetrics
|
metrics *WatcherMetrics
|
||||||
readerMetrics *liveReaderMetrics
|
readerMetrics *LiveReaderMetrics
|
||||||
|
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
startTimestamp int64 // the start time as a Prometheus timestamp
|
startTimestamp int64 // the start time as a Prometheus timestamp
|
||||||
|
@ -125,17 +125,17 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
if reg != nil {
|
if reg != nil {
|
||||||
_ = reg.Register(m.recordsRead)
|
reg.MustRegister(m.recordsRead)
|
||||||
_ = reg.Register(m.recordDecodeFails)
|
reg.MustRegister(m.recordDecodeFails)
|
||||||
_ = reg.Register(m.samplesSentPreTailing)
|
reg.MustRegister(m.samplesSentPreTailing)
|
||||||
_ = reg.Register(m.currentSegment)
|
reg.MustRegister(m.currentSegment)
|
||||||
}
|
}
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWatcher creates a new WAL watcher for a given WriteTo.
|
// NewWatcher creates a new WAL watcher for a given WriteTo.
|
||||||
func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher {
|
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
@ -143,7 +143,7 @@ func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.L
|
||||||
logger: logger,
|
logger: logger,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
readerMetrics: NewLiveReaderMetrics(reg),
|
readerMetrics: readerMetrics,
|
||||||
walDir: path.Join(walDir, "wal"),
|
walDir: path.Join(walDir, "wal"),
|
||||||
name: name,
|
name: name,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
|
@ -179,11 +179,13 @@ func (w *Watcher) Stop() {
|
||||||
<-w.done
|
<-w.done
|
||||||
|
|
||||||
// Records read metric has series and samples.
|
// Records read metric has series and samples.
|
||||||
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
|
if w.metrics != nil {
|
||||||
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
|
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
|
||||||
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
|
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
|
||||||
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
|
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
|
||||||
w.metrics.currentSegment.DeleteLabelValues(w.name)
|
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
|
||||||
|
w.metrics.currentSegment.DeleteLabelValues(w.name)
|
||||||
|
}
|
||||||
|
|
||||||
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
|
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,7 +138,7 @@ func TestTailSamples(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock()
|
||||||
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
|
||||||
watcher.SetStartTime(now)
|
watcher.SetStartTime(now)
|
||||||
|
|
||||||
// Set the Watcher's metrics so they're not nil pointers.
|
// Set the Watcher's metrics so they're not nil pointers.
|
||||||
|
@ -148,7 +148,7 @@ func TestTailSamples(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer segment.Close()
|
defer segment.Close()
|
||||||
|
|
||||||
reader := NewLiveReader(nil, NewLiveReaderMetrics(prometheus.DefaultRegisterer), segment)
|
reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment)
|
||||||
// Use tail true so we can ensure we got the right number of samples.
|
// Use tail true so we can ensure we got the right number of samples.
|
||||||
watcher.readSegment(reader, i, true)
|
watcher.readSegment(reader, i, true)
|
||||||
}
|
}
|
||||||
|
@ -217,7 +217,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock()
|
||||||
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
|
||||||
go watcher.Start()
|
go watcher.Start()
|
||||||
|
|
||||||
expected := seriesCount
|
expected := seriesCount
|
||||||
|
@ -303,7 +303,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
|
||||||
_, _, err = w.Segments()
|
_, _, err = w.Segments()
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock()
|
||||||
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
|
||||||
go watcher.Start()
|
go watcher.Start()
|
||||||
|
|
||||||
expected := seriesCount * 2
|
expected := seriesCount * 2
|
||||||
|
@ -368,7 +368,7 @@ func TestReadCheckpoint(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock()
|
||||||
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
|
||||||
go watcher.Start()
|
go watcher.Start()
|
||||||
|
|
||||||
expectedSeries := seriesCount
|
expectedSeries := seriesCount
|
||||||
|
@ -439,7 +439,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock()
|
||||||
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
|
||||||
watcher.MaxSegment = -1
|
watcher.MaxSegment = -1
|
||||||
|
|
||||||
// Set the Watcher's metrics so they're not nil pointers.
|
// Set the Watcher's metrics so they're not nil pointers.
|
||||||
|
@ -510,7 +510,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock()
|
||||||
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
|
||||||
watcher.MaxSegment = -1
|
watcher.MaxSegment = -1
|
||||||
go watcher.Start()
|
go watcher.Start()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue