mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Kill the curation state channel.
The use of the channels for curation state were always unidiomatic. Change-Id: I1cb1d7175ebfb4faf28dff84201066278d6a0d92
This commit is contained in:
parent
f8b51c2638
commit
972e856d9b
26
main.go
26
main.go
|
@ -80,7 +80,6 @@ type prometheus struct {
|
||||||
deletionTimer *time.Ticker
|
deletionTimer *time.Ticker
|
||||||
|
|
||||||
curationMutex sync.Mutex
|
curationMutex sync.Mutex
|
||||||
curationState chan metric.CurationState
|
|
||||||
stopBackgroundOperations chan bool
|
stopBackgroundOperations chan bool
|
||||||
|
|
||||||
unwrittenSamples chan *extraction.Result
|
unwrittenSamples chan *extraction.Result
|
||||||
|
@ -88,6 +87,8 @@ type prometheus struct {
|
||||||
ruleManager rules.RuleManager
|
ruleManager rules.RuleManager
|
||||||
notifications chan notification.NotificationReqs
|
notifications chan notification.NotificationReqs
|
||||||
storage *metric.TieredStorage
|
storage *metric.TieredStorage
|
||||||
|
|
||||||
|
curationState metric.CurationStateUpdater
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *prometheus) interruptHandler() {
|
func (p *prometheus) interruptHandler() {
|
||||||
|
@ -157,7 +158,6 @@ func (p *prometheus) close() {
|
||||||
|
|
||||||
close(p.notifications)
|
close(p.notifications)
|
||||||
close(p.stopBackgroundOperations)
|
close(p.stopBackgroundOperations)
|
||||||
close(p.curationState)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -189,7 +189,6 @@ func main() {
|
||||||
|
|
||||||
Ingester: retrieval.ChannelIngester(unwrittenSamples),
|
Ingester: retrieval.ChannelIngester(unwrittenSamples),
|
||||||
}
|
}
|
||||||
curationState := make(chan metric.CurationState, 1)
|
|
||||||
// Coprime numbers, fool!
|
// Coprime numbers, fool!
|
||||||
headCompactionTimer := time.NewTicker(*headCompactInterval)
|
headCompactionTimer := time.NewTicker(*headCompactInterval)
|
||||||
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
|
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
|
||||||
|
@ -219,16 +218,13 @@ func main() {
|
||||||
flags[f.Name] = f.Value.String()
|
flags[f.Name] = f.Value.String()
|
||||||
})
|
})
|
||||||
|
|
||||||
statusHandler := &web.StatusHandler{
|
prometheusStatus := &web.PrometheusStatusHandler{
|
||||||
PrometheusStatus: &web.PrometheusStatus{
|
BuildInfo: BuildInfo,
|
||||||
BuildInfo: BuildInfo,
|
Config: conf.String(),
|
||||||
Config: conf.String(),
|
RuleManager: ruleManager,
|
||||||
RuleManager: ruleManager,
|
TargetPools: targetManager.Pools(),
|
||||||
TargetPools: targetManager.Pools(),
|
Flags: flags,
|
||||||
Flags: flags,
|
Birth: time.Now(),
|
||||||
Birth: time.Now(),
|
|
||||||
},
|
|
||||||
CurationState: curationState,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
alertsHandler := &web.AlertsHandler{
|
alertsHandler := &web.AlertsHandler{
|
||||||
|
@ -247,7 +243,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
webService := &web.WebService{
|
webService := &web.WebService{
|
||||||
StatusHandler: statusHandler,
|
StatusHandler: prometheusStatus,
|
||||||
MetricsHandler: metricsService,
|
MetricsHandler: metricsService,
|
||||||
DatabasesHandler: databasesHandler,
|
DatabasesHandler: databasesHandler,
|
||||||
AlertsHandler: alertsHandler,
|
AlertsHandler: alertsHandler,
|
||||||
|
@ -260,7 +256,7 @@ func main() {
|
||||||
|
|
||||||
deletionTimer: deletionTimer,
|
deletionTimer: deletionTimer,
|
||||||
|
|
||||||
curationState: curationState,
|
curationState: prometheusStatus,
|
||||||
|
|
||||||
unwrittenSamples: unwrittenSamples,
|
unwrittenSamples: unwrittenSamples,
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,11 @@ import (
|
||||||
dto "github.com/prometheus/prometheus/model/generated"
|
dto "github.com/prometheus/prometheus/model/generated"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// CurationStateUpdater receives updates about the curation state.
|
||||||
|
type CurationStateUpdater interface {
|
||||||
|
UpdateCurationState(*CurationState)
|
||||||
|
}
|
||||||
|
|
||||||
// CurationState contains high-level curation state information for the
|
// CurationState contains high-level curation state information for the
|
||||||
// heads-up-display.
|
// heads-up-display.
|
||||||
type CurationState struct {
|
type CurationState struct {
|
||||||
|
@ -83,7 +88,7 @@ type watermarkScanner struct {
|
||||||
// stop functions as the global stop channel for all future operations.
|
// stop functions as the global stop channel for all future operations.
|
||||||
stop chan bool
|
stop chan bool
|
||||||
// status is the outbound channel for notifying the status page of its state.
|
// status is the outbound channel for notifying the status page of its state.
|
||||||
status chan CurationState
|
status CurationStateUpdater
|
||||||
}
|
}
|
||||||
|
|
||||||
// run facilitates the curation lifecycle.
|
// run facilitates the curation lifecycle.
|
||||||
|
@ -92,7 +97,7 @@ type watermarkScanner struct {
|
||||||
// curated.
|
// curated.
|
||||||
// curationState is the on-disk store where the curation remarks are made for
|
// curationState is the on-disk store where the curation remarks are made for
|
||||||
// how much progress has been made.
|
// how much progress has been made.
|
||||||
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) {
|
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status CurationStateUpdater) (err error) {
|
||||||
defer func(t time.Time) {
|
defer func(t time.Time) {
|
||||||
duration := float64(time.Since(t) / time.Millisecond)
|
duration := float64(time.Since(t) / time.Millisecond)
|
||||||
|
|
||||||
|
@ -108,13 +113,8 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
|
||||||
curationDuration.IncrementBy(labels, duration)
|
curationDuration.IncrementBy(labels, duration)
|
||||||
curationDurations.Add(labels, duration)
|
curationDurations.Add(labels, duration)
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
defer func() {
|
|
||||||
select {
|
defer status.UpdateCurationState(&CurationState{Active: false})
|
||||||
case status <- CurationState{Active: false}:
|
|
||||||
case <-status:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
iterator := samples.NewIterator(true)
|
iterator := samples.NewIterator(true)
|
||||||
defer iterator.Close()
|
defer iterator.Close()
|
||||||
|
@ -201,16 +201,12 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
|
||||||
|
|
||||||
curationFilterOperations.Increment(labels)
|
curationFilterOperations.Increment(labels)
|
||||||
|
|
||||||
select {
|
w.status.UpdateCurationState(&CurationState{
|
||||||
case w.status <- CurationState{
|
|
||||||
Active: true,
|
Active: true,
|
||||||
Name: w.processor.Name(),
|
Name: w.processor.Name(),
|
||||||
Limit: w.ignoreYoungerThan,
|
Limit: w.ignoreYoungerThan,
|
||||||
Fingerprint: fingerprint,
|
Fingerprint: fingerprint,
|
||||||
}:
|
})
|
||||||
case <-w.status:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if w.shouldStop() {
|
if w.shouldStop() {
|
||||||
|
|
|
@ -112,6 +112,10 @@ func (s sampleGroup) Get() (key, value proto.Message) {
|
||||||
return k, v
|
return k, v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type noopUpdater bool
|
||||||
|
|
||||||
|
func (noopUpdater) UpdateCurationState(*CurationState) {}
|
||||||
|
|
||||||
func TestCuratorCompactionProcessor(t *testing.T) {
|
func TestCuratorCompactionProcessor(t *testing.T) {
|
||||||
scenarios := []struct {
|
scenarios := []struct {
|
||||||
in in
|
in in
|
||||||
|
@ -872,8 +876,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer samples.Close()
|
defer samples.Close()
|
||||||
|
|
||||||
updates := make(chan CurationState, 100)
|
updates := new(noopUpdater)
|
||||||
defer close(updates)
|
|
||||||
|
|
||||||
stop := make(chan bool)
|
stop := make(chan bool)
|
||||||
defer close(stop)
|
defer close(stop)
|
||||||
|
@ -1396,8 +1399,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer samples.Close()
|
defer samples.Close()
|
||||||
|
|
||||||
updates := make(chan CurationState, 100)
|
updates := new(noopUpdater)
|
||||||
defer close(updates)
|
|
||||||
|
|
||||||
stop := make(chan bool)
|
stop := make(chan bool)
|
||||||
defer close(stop)
|
defer close(stop)
|
||||||
|
|
|
@ -14,15 +14,18 @@
|
||||||
package web
|
package web
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/prometheus/prometheus/retrieval"
|
|
||||||
"github.com/prometheus/prometheus/rules"
|
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/retrieval"
|
||||||
|
"github.com/prometheus/prometheus/rules"
|
||||||
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PrometheusStatus struct {
|
type PrometheusStatusHandler struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
|
||||||
BuildInfo map[string]string
|
BuildInfo map[string]string
|
||||||
Config string
|
Config string
|
||||||
Curation metric.CurationState
|
Curation metric.CurationState
|
||||||
|
@ -33,23 +36,16 @@ type PrometheusStatus struct {
|
||||||
Birth time.Time
|
Birth time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type StatusHandler struct {
|
func (h *PrometheusStatusHandler) UpdateCurationState(c *metric.CurationState) {
|
||||||
CurationState chan metric.CurationState
|
h.mu.Lock()
|
||||||
PrometheusStatus *PrometheusStatus
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
mutex sync.RWMutex
|
h.Curation = *c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *PrometheusStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
select {
|
h.mu.RLock()
|
||||||
case curationState := <-h.CurationState:
|
defer h.mu.RUnlock()
|
||||||
h.mutex.Lock()
|
|
||||||
defer h.mutex.Unlock()
|
|
||||||
h.PrometheusStatus.Curation = curationState
|
|
||||||
default:
|
|
||||||
h.mutex.RLock()
|
|
||||||
defer h.mutex.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
executeTemplate(w, "status", h.PrometheusStatus)
|
executeTemplate(w, "status", h)
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type WebService struct {
|
type WebService struct {
|
||||||
StatusHandler *StatusHandler
|
StatusHandler *PrometheusStatusHandler
|
||||||
DatabasesHandler *DatabasesHandler
|
DatabasesHandler *DatabasesHandler
|
||||||
MetricsHandler *api.MetricsService
|
MetricsHandler *api.MetricsService
|
||||||
AlertsHandler *AlertsHandler
|
AlertsHandler *AlertsHandler
|
||||||
|
|
Loading…
Reference in a new issue