mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 05:04:05 -08:00
Split remote write storage into its own type
This allows other processes to reuse just the remote write code without having to use the remote read code as well. This will be used to create a sidecar capable of sending remote write payloads. Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
parent
7cd5cf0b69
commit
a38a54fa11
|
@ -15,13 +15,10 @@ package remote
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -40,13 +37,7 @@ type Storage struct {
|
|||
logger log.Logger
|
||||
mtx sync.Mutex
|
||||
|
||||
configHash [16]byte
|
||||
|
||||
// For writes
|
||||
walDir string
|
||||
queues []*QueueManager
|
||||
samplesIn *ewmaRate
|
||||
flushDeadline time.Duration
|
||||
rws *WriteStorage
|
||||
|
||||
// For reads
|
||||
queryables []storage.Queryable
|
||||
|
@ -61,28 +52,17 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
|
|||
s := &Storage{
|
||||
logger: logging.Dedupe(l, 1*time.Minute),
|
||||
localStartTimeCallback: stCallback,
|
||||
flushDeadline: flushDeadline,
|
||||
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
walDir: walDir,
|
||||
rws: NewWriteStorage(l, walDir, flushDeadline),
|
||||
}
|
||||
go s.run()
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Storage) run() {
|
||||
ticker := time.NewTicker(shardUpdateDuration)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
s.samplesIn.tick()
|
||||
}
|
||||
}
|
||||
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
func (s *Storage) ApplyConfig(conf *config.Config) error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
if err := s.applyRemoteWriteConfig(conf); err != nil {
|
||||
if err := s.rws.ApplyConfig(conf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -113,66 +93,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// applyRemoteWriteConfig applies the remote write config only if the config has changed.
|
||||
// The caller must hold the lock on s.mtx.
|
||||
func (s *Storage) applyRemoteWriteConfig(conf *config.Config) error {
|
||||
// Remote write queues only need to change if the remote write config or
|
||||
// external labels change. Hash these together and only reload if the hash
|
||||
// changes.
|
||||
cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash := md5.Sum(append(cfgBytes, externalLabelBytes...))
|
||||
if hash == s.configHash {
|
||||
level.Debug(s.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers")
|
||||
return nil
|
||||
}
|
||||
|
||||
s.configHash = hash
|
||||
|
||||
// Update write queues
|
||||
newQueues := []*QueueManager{}
|
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
for i, rwConf := range conf.RemoteWriteConfigs {
|
||||
c, err := NewClient(i, &ClientConfig{
|
||||
URL: rwConf.URL,
|
||||
Timeout: rwConf.RemoteTimeout,
|
||||
HTTPClientConfig: rwConf.HTTPClientConfig,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newQueues = append(newQueues, NewQueueManager(
|
||||
s.logger,
|
||||
s.walDir,
|
||||
s.samplesIn,
|
||||
rwConf.QueueConfig,
|
||||
conf.GlobalConfig.ExternalLabels,
|
||||
rwConf.WriteRelabelConfigs,
|
||||
c,
|
||||
s.flushDeadline,
|
||||
))
|
||||
}
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Stop()
|
||||
}
|
||||
|
||||
s.queues = newQueues
|
||||
for _, q := range s.queues {
|
||||
q.Start()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (s *Storage) StartTime() (int64, error) {
|
||||
return int64(model.Latest), nil
|
||||
|
@ -196,16 +116,16 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie
|
|||
return storage.NewMergeQuerier(nil, queriers), nil
|
||||
}
|
||||
|
||||
// Appender implements storage.Storage.
|
||||
func (s *Storage) Appender() (storage.Appender, error) {
|
||||
return s.rws.Appender()
|
||||
}
|
||||
|
||||
// Close the background processing of the storage queues.
|
||||
func (s *Storage) Close() error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Stop()
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.rws.Close()
|
||||
}
|
||||
|
||||
func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher {
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
|
@ -42,7 +41,7 @@ func TestStorageLifecycle(t *testing.T) {
|
|||
s.ApplyConfig(conf)
|
||||
|
||||
// make sure remote write has a queue.
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
testutil.Equals(t, 1, len(s.rws.queues))
|
||||
|
||||
// make sure remote write has a queue.
|
||||
testutil.Equals(t, 1, len(s.queryables))
|
||||
|
@ -51,33 +50,6 @@ func TestStorageLifecycle(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
}
|
||||
|
||||
func TestUpdateExternalLabels(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "TestUpdateExternalLabels")
|
||||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline)
|
||||
|
||||
externalLabels := labels.FromStrings("external", "true")
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
&config.DefaultRemoteWriteConfig,
|
||||
},
|
||||
}
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels)
|
||||
|
||||
conf.GlobalConfig.ExternalLabels = externalLabels
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
testutil.Equals(t, externalLabels, s.queues[0].externalLabels)
|
||||
|
||||
err = s.Close()
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
||||
func TestUpdateRemoteReadConfigs(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "TestUpdateRemoteReadConfigs")
|
||||
testutil.Ok(t, err)
|
||||
|
@ -100,31 +72,3 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
|
|||
err = s.Close()
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
||||
func TestUpdateRemoteWriteConfigsNoop(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "TestUpdateRemoteWriteConfigsNoop")
|
||||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
&config.DefaultRemoteWriteConfig,
|
||||
},
|
||||
}
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
queue := s.queues[0]
|
||||
|
||||
conf.RemoteReadConfigs = []*config.RemoteReadConfig{
|
||||
&config.DefaultRemoteReadConfig,
|
||||
}
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same")
|
||||
|
||||
err = s.Close()
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
|
|
@ -14,8 +14,16 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
@ -37,15 +45,121 @@ var (
|
|||
}
|
||||
)
|
||||
|
||||
// Appender implements scrape.Appendable.
|
||||
func (s *Storage) Appender() (storage.Appender, error) {
|
||||
// WriteStorage represents all the remote write storage.
|
||||
type WriteStorage struct {
|
||||
logger log.Logger
|
||||
mtx sync.Mutex
|
||||
|
||||
configHash [16]byte
|
||||
walDir string
|
||||
queues []*QueueManager
|
||||
samplesIn *ewmaRate
|
||||
flushDeadline time.Duration
|
||||
}
|
||||
|
||||
// NewWriteStorage creates and runs a WriteStorage.
|
||||
func NewWriteStorage(logger log.Logger, walDir string, flushDeadline time.Duration) *WriteStorage {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
rws := &WriteStorage{
|
||||
logger: logger,
|
||||
flushDeadline: flushDeadline,
|
||||
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
walDir: walDir,
|
||||
}
|
||||
go rws.run()
|
||||
return rws
|
||||
}
|
||||
|
||||
func (rws *WriteStorage) run() {
|
||||
ticker := time.NewTicker(shardUpdateDuration)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
rws.samplesIn.tick()
|
||||
}
|
||||
}
|
||||
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||
rws.mtx.Lock()
|
||||
defer rws.mtx.Unlock()
|
||||
|
||||
// Remote write queues only need to change if the remote write config or
|
||||
// external labels change. Hash these together and only reload if the hash
|
||||
// changes.
|
||||
cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash := md5.Sum(append(cfgBytes, externalLabelBytes...))
|
||||
if hash == rws.configHash {
|
||||
level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers")
|
||||
return nil
|
||||
}
|
||||
|
||||
rws.configHash = hash
|
||||
|
||||
// Update write queues
|
||||
newQueues := []*QueueManager{}
|
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
for i, rwConf := range conf.RemoteWriteConfigs {
|
||||
c, err := NewClient(i, &ClientConfig{
|
||||
URL: rwConf.URL,
|
||||
Timeout: rwConf.RemoteTimeout,
|
||||
HTTPClientConfig: rwConf.HTTPClientConfig,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newQueues = append(newQueues, NewQueueManager(
|
||||
rws.logger,
|
||||
rws.walDir,
|
||||
rws.samplesIn,
|
||||
rwConf.QueueConfig,
|
||||
conf.GlobalConfig.ExternalLabels,
|
||||
rwConf.WriteRelabelConfigs,
|
||||
c,
|
||||
rws.flushDeadline,
|
||||
))
|
||||
}
|
||||
|
||||
for _, q := range rws.queues {
|
||||
q.Stop()
|
||||
}
|
||||
|
||||
rws.queues = newQueues
|
||||
for _, q := range rws.queues {
|
||||
q.Start()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Appender implements storage.Storage.
|
||||
func (rws *WriteStorage) Appender() (storage.Appender, error) {
|
||||
return ×tampTracker{
|
||||
storage: s,
|
||||
writeStorage: rws,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the WriteStorage.
|
||||
func (rws *WriteStorage) Close() error {
|
||||
rws.mtx.Lock()
|
||||
defer rws.mtx.Unlock()
|
||||
for _, q := range rws.queues {
|
||||
q.Stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type timestampTracker struct {
|
||||
storage *Storage
|
||||
writeStorage *WriteStorage
|
||||
samples int64
|
||||
highestTimestamp int64
|
||||
}
|
||||
|
@ -67,7 +181,7 @@ func (t *timestampTracker) AddFast(l labels.Labels, _ uint64, ts int64, v float6
|
|||
|
||||
// Commit implements storage.Appender.
|
||||
func (t *timestampTracker) Commit() error {
|
||||
t.storage.samplesIn.incr(t.samples)
|
||||
t.writeStorage.samplesIn.incr(t.samples)
|
||||
|
||||
samplesIn.Add(float64(t.samples))
|
||||
highestTimestamp.Set(float64(t.highestTimestamp / 1000))
|
||||
|
|
95
storage/remote/write_test.go
Normal file
95
storage/remote/write_test.go
Normal file
|
@ -0,0 +1,95 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func TestWriteStorageLifecycle(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "TestWriteStorageLifecycle")
|
||||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewWriteStorage(nil, dir, defaultFlushDeadline)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
&config.DefaultRemoteWriteConfig,
|
||||
},
|
||||
}
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
|
||||
err = s.Close()
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
||||
func TestUpdateExternalLabels(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "TestUpdateExternalLabels")
|
||||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewWriteStorage(nil, dir, defaultFlushDeadline)
|
||||
|
||||
externalLabels := labels.FromStrings("external", "true")
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
&config.DefaultRemoteWriteConfig,
|
||||
},
|
||||
}
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels)
|
||||
|
||||
conf.GlobalConfig.ExternalLabels = externalLabels
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
testutil.Equals(t, externalLabels, s.queues[0].externalLabels)
|
||||
|
||||
err = s.Close()
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
||||
func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "TestWriteStorageApplyConfigsIdempotent")
|
||||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewWriteStorage(nil, dir, defaultFlushDeadline)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
&config.DefaultRemoteWriteConfig,
|
||||
},
|
||||
}
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
queue := s.queues[0]
|
||||
|
||||
s.ApplyConfig(conf)
|
||||
testutil.Equals(t, 1, len(s.queues))
|
||||
testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same")
|
||||
|
||||
err = s.Close()
|
||||
testutil.Ok(t, err)
|
||||
}
|
Loading…
Reference in a new issue