mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Replace sync/atomic with uber-go/atomic (#7683)
* storage: Replace usage of sync/atomic with uber-go/atomic
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* tsdb: Replace usage of sync/atomic with uber-go/atomic
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* web: Replace usage of sync/atomic with uber-go/atomic
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* notifier: Replace usage of sync/atomic with uber-go/atomic
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* cmd: Replace usage of sync/atomic with uber-go/atomic
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* scripts: Verify that we are not using restricted packages
It checks that we are not directly importing 'sync/atomic'.
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* Reorganise imports in blocks
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* notifier/test: Apply PR suggestions
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* storage/remote: avoid storing references on newEntry
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* Revert "scripts: Verify that we are not using restricted packages"
This reverts commit 278d32748e
.
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
* web: Group imports accordingly
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
This commit is contained in:
parent
924e7239b7
commit
b58a613443
|
@ -30,7 +30,6 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -47,6 +46,7 @@ import (
|
||||||
"github.com/prometheus/common/version"
|
"github.com/prometheus/common/version"
|
||||||
jcfg "github.com/uber/jaeger-client-go/config"
|
jcfg "github.com/uber/jaeger-client-go/config"
|
||||||
jprom "github.com/uber/jaeger-lib/metrics/prometheus"
|
jprom "github.com/uber/jaeger-lib/metrics/prometheus"
|
||||||
|
"go.uber.org/atomic"
|
||||||
kingpin "gopkg.in/alecthomas/kingpin.v2"
|
kingpin "gopkg.in/alecthomas/kingpin.v2"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
|
||||||
|
@ -801,18 +801,18 @@ func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer,
|
||||||
}
|
}
|
||||||
|
|
||||||
type safePromQLNoStepSubqueryInterval struct {
|
type safePromQLNoStepSubqueryInterval struct {
|
||||||
value int64
|
value atomic.Int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func durationToInt64Millis(d time.Duration) int64 {
|
func durationToInt64Millis(d time.Duration) int64 {
|
||||||
return int64(d / time.Millisecond)
|
return int64(d / time.Millisecond)
|
||||||
}
|
}
|
||||||
func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) {
|
func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) {
|
||||||
atomic.StoreInt64(&i.value, durationToInt64Millis(time.Duration(ev)))
|
i.value.Store(durationToInt64Millis(time.Duration(ev)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 {
|
func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 {
|
||||||
return atomic.LoadInt64(&i.value)
|
return i.value.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func reloadConfig(filename string, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...func(*config.Config) error) (err error) {
|
func reloadConfig(filename string, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...func(*config.Config) error) (err error) {
|
||||||
|
|
|
@ -26,19 +26,19 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/go-openapi/strfmt"
|
"github.com/go-openapi/strfmt"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
|
"github.com/prometheus/alertmanager/api/v2/models"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
config_util "github.com/prometheus/common/config"
|
config_util "github.com/prometheus/common/config"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/common/version"
|
"github.com/prometheus/common/version"
|
||||||
|
|
||||||
"github.com/prometheus/alertmanager/api/v2/models"
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
@ -466,7 +466,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
numSuccess uint64
|
numSuccess atomic.Uint64
|
||||||
)
|
)
|
||||||
for _, ams := range amSets {
|
for _, ams := range amSets {
|
||||||
var (
|
var (
|
||||||
|
@ -527,7 +527,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
|
||||||
level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
|
level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
|
||||||
n.metrics.errors.WithLabelValues(url).Inc()
|
n.metrics.errors.WithLabelValues(url).Inc()
|
||||||
} else {
|
} else {
|
||||||
atomic.AddUint64(&numSuccess, 1)
|
numSuccess.Inc()
|
||||||
}
|
}
|
||||||
n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds())
|
n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds())
|
||||||
n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts)))
|
n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts)))
|
||||||
|
@ -541,7 +541,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
return numSuccess > 0
|
return numSuccess.Load() > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func alertsToOpenAPIAlerts(alerts []*Alert) models.PostableAlerts {
|
func alertsToOpenAPIAlerts(alerts []*Alert) models.PostableAlerts {
|
||||||
|
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -30,6 +29,7 @@ import (
|
||||||
"github.com/prometheus/alertmanager/api/v2/models"
|
"github.com/prometheus/alertmanager/api/v2/models"
|
||||||
config_util "github.com/prometheus/common/config"
|
config_util "github.com/prometheus/common/config"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"go.uber.org/atomic"
|
||||||
yaml "gopkg.in/yaml.v2"
|
yaml "gopkg.in/yaml.v2"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
@ -102,10 +102,12 @@ func TestHandlerSendAll(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
errc = make(chan error, 1)
|
errc = make(chan error, 1)
|
||||||
expected = make([]*Alert, 0, maxBatchSize)
|
expected = make([]*Alert, 0, maxBatchSize)
|
||||||
status1, status2 = int32(http.StatusOK), int32(http.StatusOK)
|
status1, status2 atomic.Int32
|
||||||
)
|
)
|
||||||
|
status1.Store(int32(http.StatusOK))
|
||||||
|
status2.Store(int32(http.StatusOK))
|
||||||
|
|
||||||
newHTTPServer := func(u, p string, status *int32) *httptest.Server {
|
newHTTPServer := func(u, p string, status *atomic.Int32) *httptest.Server {
|
||||||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -128,7 +130,7 @@ func TestHandlerSendAll(t *testing.T) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = alertsEqual(expected, alerts)
|
err = alertsEqual(expected, alerts)
|
||||||
}
|
}
|
||||||
w.WriteHeader(int(atomic.LoadInt32(status)))
|
w.WriteHeader(int(status.Load()))
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
server1 := newHTTPServer("prometheus", "testing_password", &status1)
|
server1 := newHTTPServer("prometheus", "testing_password", &status1)
|
||||||
|
@ -194,11 +196,11 @@ func TestHandlerSendAll(t *testing.T) {
|
||||||
testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
|
testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
|
||||||
checkNoErr()
|
checkNoErr()
|
||||||
|
|
||||||
atomic.StoreInt32(&status1, int32(http.StatusNotFound))
|
status1.Store(int32(http.StatusNotFound))
|
||||||
testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
|
testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
|
||||||
checkNoErr()
|
checkNoErr()
|
||||||
|
|
||||||
atomic.StoreInt32(&status2, int32(http.StatusInternalServerError))
|
status2.Store(int32(http.StatusInternalServerError))
|
||||||
testutil.Assert(t, !h.sendAll(h.queue...), "all sends succeeded unexpectedly")
|
testutil.Assert(t, !h.sendAll(h.queue...), "all sends succeeded unexpectedly")
|
||||||
checkNoErr()
|
checkNoErr()
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,15 +15,14 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
|
// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
|
||||||
type ewmaRate struct {
|
type ewmaRate struct {
|
||||||
// Keep all 64bit atomically accessed variables at the top of this struct.
|
newEvents atomic.Int64
|
||||||
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info.
|
|
||||||
newEvents int64
|
|
||||||
|
|
||||||
alpha float64
|
alpha float64
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
|
@ -50,7 +49,7 @@ func (r *ewmaRate) rate() float64 {
|
||||||
|
|
||||||
// tick assumes to be called every r.interval.
|
// tick assumes to be called every r.interval.
|
||||||
func (r *ewmaRate) tick() {
|
func (r *ewmaRate) tick() {
|
||||||
newEvents := atomic.SwapInt64(&r.newEvents, 0)
|
newEvents := r.newEvents.Swap(0)
|
||||||
instantRate := float64(newEvents) / r.interval.Seconds()
|
instantRate := float64(newEvents) / r.interval.Seconds()
|
||||||
|
|
||||||
r.mutex.Lock()
|
r.mutex.Lock()
|
||||||
|
@ -66,5 +65,5 @@ func (r *ewmaRate) tick() {
|
||||||
|
|
||||||
// inc counts one event.
|
// inc counts one event.
|
||||||
func (r *ewmaRate) incr(incr int64) {
|
func (r *ewmaRate) incr(incr int64) {
|
||||||
atomic.AddInt64(&r.newEvents, incr)
|
r.newEvents.Add(incr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,8 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
|
@ -40,13 +41,15 @@ type pool struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type entry struct {
|
type entry struct {
|
||||||
// Keep all 64bit atomically accessed variables at the top of this struct.
|
refs atomic.Int64
|
||||||
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info.
|
|
||||||
refs int64
|
|
||||||
|
|
||||||
s string
|
s string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newEntry(s string) *entry {
|
||||||
|
return &entry{s: s}
|
||||||
|
}
|
||||||
|
|
||||||
func newPool() *pool {
|
func newPool() *pool {
|
||||||
return &pool{
|
return &pool{
|
||||||
pool: map[string]*entry{},
|
pool: map[string]*entry{},
|
||||||
|
@ -62,20 +65,18 @@ func (p *pool) intern(s string) string {
|
||||||
interned, ok := p.pool[s]
|
interned, ok := p.pool[s]
|
||||||
p.mtx.RUnlock()
|
p.mtx.RUnlock()
|
||||||
if ok {
|
if ok {
|
||||||
atomic.AddInt64(&interned.refs, 1)
|
interned.refs.Inc()
|
||||||
return interned.s
|
return interned.s
|
||||||
}
|
}
|
||||||
p.mtx.Lock()
|
p.mtx.Lock()
|
||||||
defer p.mtx.Unlock()
|
defer p.mtx.Unlock()
|
||||||
if interned, ok := p.pool[s]; ok {
|
if interned, ok := p.pool[s]; ok {
|
||||||
atomic.AddInt64(&interned.refs, 1)
|
interned.refs.Inc()
|
||||||
return interned.s
|
return interned.s
|
||||||
}
|
}
|
||||||
|
|
||||||
p.pool[s] = &entry{
|
p.pool[s] = newEntry(s)
|
||||||
s: s,
|
p.pool[s].refs.Store(1)
|
||||||
refs: 1,
|
|
||||||
}
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,14 +90,14 @@ func (p *pool) release(s string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
refs := atomic.AddInt64(&interned.refs, -1)
|
refs := interned.refs.Dec()
|
||||||
if refs > 0 {
|
if refs > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p.mtx.Lock()
|
p.mtx.Lock()
|
||||||
defer p.mtx.Unlock()
|
defer p.mtx.Unlock()
|
||||||
if atomic.LoadInt64(&interned.refs) != 0 {
|
if interned.refs.Load() != 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
delete(p.pool, s)
|
delete(p.pool, s)
|
||||||
|
|
|
@ -20,7 +20,6 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -33,7 +32,7 @@ func TestIntern(t *testing.T) {
|
||||||
interned, ok := interner.pool[testString]
|
interned, ok := interner.pool[testString]
|
||||||
|
|
||||||
testutil.Equals(t, true, ok)
|
testutil.Equals(t, true, ok)
|
||||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIntern_MultiRef(t *testing.T) {
|
func TestIntern_MultiRef(t *testing.T) {
|
||||||
|
@ -43,13 +42,13 @@ func TestIntern_MultiRef(t *testing.T) {
|
||||||
interned, ok := interner.pool[testString]
|
interned, ok := interner.pool[testString]
|
||||||
|
|
||||||
testutil.Equals(t, true, ok)
|
testutil.Equals(t, true, ok)
|
||||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||||
|
|
||||||
interner.intern(testString)
|
interner.intern(testString)
|
||||||
interned, ok = interner.pool[testString]
|
interned, ok = interner.pool[testString]
|
||||||
|
|
||||||
testutil.Equals(t, true, ok)
|
testutil.Equals(t, true, ok)
|
||||||
testutil.Assert(t, interned.refs == 2, fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs))
|
testutil.Assert(t, interned.refs.Load() == 2, fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIntern_DeleteRef(t *testing.T) {
|
func TestIntern_DeleteRef(t *testing.T) {
|
||||||
|
@ -59,7 +58,7 @@ func TestIntern_DeleteRef(t *testing.T) {
|
||||||
interned, ok := interner.pool[testString]
|
interned, ok := interner.pool[testString]
|
||||||
|
|
||||||
testutil.Equals(t, true, ok)
|
testutil.Equals(t, true, ok)
|
||||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||||
|
|
||||||
interner.release(testString)
|
interner.release(testString)
|
||||||
_, ok = interner.pool[testString]
|
_, ok = interner.pool[testString]
|
||||||
|
@ -72,7 +71,7 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) {
|
||||||
interner.intern(testString)
|
interner.intern(testString)
|
||||||
interned, ok := interner.pool[testString]
|
interned, ok := interner.pool[testString]
|
||||||
testutil.Equals(t, true, ok)
|
testutil.Equals(t, true, ok)
|
||||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||||
|
|
||||||
go interner.release(testString)
|
go interner.release(testString)
|
||||||
|
|
||||||
|
@ -84,5 +83,5 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) {
|
||||||
interned, ok = interner.pool[testString]
|
interned, ok = interner.pool[testString]
|
||||||
interner.mtx.RUnlock()
|
interner.mtx.RUnlock()
|
||||||
testutil.Equals(t, true, ok)
|
testutil.Equals(t, true, ok)
|
||||||
testutil.Assert(t, atomic.LoadInt64(&interned.refs) == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
|
@ -27,6 +26,7 @@ import (
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
"github.com/opentracing/opentracing-go/ext"
|
"github.com/opentracing/opentracing-go/ext"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
@ -235,8 +235,7 @@ type WriteClient interface {
|
||||||
// indicated by the provided WriteClient. Implements writeTo interface
|
// indicated by the provided WriteClient. Implements writeTo interface
|
||||||
// used by WAL Watcher.
|
// used by WAL Watcher.
|
||||||
type QueueManager struct {
|
type QueueManager struct {
|
||||||
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
lastSendTimestamp atomic.Int64
|
||||||
lastSendTimestamp int64
|
|
||||||
|
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
flushDeadline time.Duration
|
flushDeadline time.Duration
|
||||||
|
@ -537,7 +536,7 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
|
||||||
// We shouldn't reshard if Prometheus hasn't been able to send to the
|
// We shouldn't reshard if Prometheus hasn't been able to send to the
|
||||||
// remote endpoint successfully within some period of time.
|
// remote endpoint successfully within some period of time.
|
||||||
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
|
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
|
||||||
lsts := atomic.LoadInt64(&t.lastSendTimestamp)
|
lsts := t.lastSendTimestamp.Load()
|
||||||
if lsts < minSendTimestamp {
|
if lsts < minSendTimestamp {
|
||||||
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
|
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
|
||||||
return false
|
return false
|
||||||
|
@ -663,7 +662,7 @@ type shards struct {
|
||||||
// Emulate a wait group with a channel and an atomic int, as you
|
// Emulate a wait group with a channel and an atomic int, as you
|
||||||
// cannot select on a wait group.
|
// cannot select on a wait group.
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
running int32
|
running atomic.Int32
|
||||||
|
|
||||||
// Soft shutdown context will prevent new enqueues and deadlocks.
|
// Soft shutdown context will prevent new enqueues and deadlocks.
|
||||||
softShutdown chan struct{}
|
softShutdown chan struct{}
|
||||||
|
@ -671,7 +670,7 @@ type shards struct {
|
||||||
// Hard shutdown context is used to terminate outgoing HTTP connections
|
// Hard shutdown context is used to terminate outgoing HTTP connections
|
||||||
// after giving them a chance to terminate.
|
// after giving them a chance to terminate.
|
||||||
hardShutdown context.CancelFunc
|
hardShutdown context.CancelFunc
|
||||||
droppedOnHardShutdown uint32
|
droppedOnHardShutdown atomic.Uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// start the shards; must be called before any call to enqueue.
|
// start the shards; must be called before any call to enqueue.
|
||||||
|
@ -692,9 +691,9 @@ func (s *shards) start(n int) {
|
||||||
var hardShutdownCtx context.Context
|
var hardShutdownCtx context.Context
|
||||||
hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
|
hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
|
||||||
s.softShutdown = make(chan struct{})
|
s.softShutdown = make(chan struct{})
|
||||||
s.running = int32(n)
|
s.running.Store(int32(n))
|
||||||
s.done = make(chan struct{})
|
s.done = make(chan struct{})
|
||||||
atomic.StoreUint32(&s.droppedOnHardShutdown, 0)
|
s.droppedOnHardShutdown.Store(0)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
go s.runShard(hardShutdownCtx, i, newQueues[i])
|
go s.runShard(hardShutdownCtx, i, newQueues[i])
|
||||||
}
|
}
|
||||||
|
@ -727,7 +726,7 @@ func (s *shards) stop() {
|
||||||
// Force an unclean shutdown.
|
// Force an unclean shutdown.
|
||||||
s.hardShutdown()
|
s.hardShutdown()
|
||||||
<-s.done
|
<-s.done
|
||||||
if dropped := atomic.LoadUint32(&s.droppedOnHardShutdown); dropped > 0 {
|
if dropped := s.droppedOnHardShutdown.Load(); dropped > 0 {
|
||||||
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped)
|
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -756,7 +755,7 @@ func (s *shards) enqueue(ref uint64, sample sample) bool {
|
||||||
|
|
||||||
func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
|
func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if atomic.AddInt32(&s.running, -1) == 0 {
|
if s.running.Dec() == 0 {
|
||||||
close(s.done)
|
close(s.done)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -792,7 +791,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
|
||||||
droppedSamples := nPending + len(queue)
|
droppedSamples := nPending + len(queue)
|
||||||
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
|
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
|
||||||
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
|
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
|
||||||
atomic.AddUint32(&s.droppedOnHardShutdown, uint32(droppedSamples))
|
s.droppedOnHardShutdown.Add(uint32(droppedSamples))
|
||||||
return
|
return
|
||||||
|
|
||||||
case sample, ok := <-queue:
|
case sample, ok := <-queue:
|
||||||
|
@ -847,7 +846,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b
|
||||||
// should be maintained irrespective of success or failure.
|
// should be maintained irrespective of success or failure.
|
||||||
s.qm.samplesOut.incr(int64(len(samples)))
|
s.qm.samplesOut.incr(int64(len(samples)))
|
||||||
s.qm.samplesOutDuration.incr(int64(time.Since(begin)))
|
s.qm.samplesOutDuration.incr(int64(time.Since(begin)))
|
||||||
atomic.StoreInt64(&s.qm.lastSendTimestamp, time.Now().Unix())
|
s.qm.lastSendTimestamp.Store(time.Now().Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||||
|
|
|
@ -24,13 +24,13 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
|
@ -336,7 +336,7 @@ func TestShouldReshard(t *testing.T) {
|
||||||
m.numShards = c.startingShards
|
m.numShards = c.startingShards
|
||||||
m.samplesIn.incr(c.samplesIn)
|
m.samplesIn.incr(c.samplesIn)
|
||||||
m.samplesOut.incr(c.samplesOut)
|
m.samplesOut.incr(c.samplesOut)
|
||||||
m.lastSendTimestamp = c.lastSendTimestamp
|
m.lastSendTimestamp.Store(c.lastSendTimestamp)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
|
||||||
|
@ -497,7 +497,7 @@ func (c *TestWriteClient) Endpoint() string {
|
||||||
// point the `numCalls` property will contain a count of how many times Store()
|
// point the `numCalls` property will contain a count of how many times Store()
|
||||||
// was called.
|
// was called.
|
||||||
type TestBlockingWriteClient struct {
|
type TestBlockingWriteClient struct {
|
||||||
numCalls uint64
|
numCalls atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
||||||
|
@ -505,13 +505,13 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error {
|
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error {
|
||||||
atomic.AddUint64(&c.numCalls, 1)
|
c.numCalls.Inc()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) NumCalls() uint64 {
|
func (c *TestBlockingWriteClient) NumCalls() uint64 {
|
||||||
return atomic.LoadUint64(&c.numCalls)
|
return c.numCalls.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) Name() string {
|
func (c *TestBlockingWriteClient) Name() string {
|
||||||
|
@ -667,7 +667,7 @@ func TestCalculateDesiredShards(t *testing.T) {
|
||||||
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)
|
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)
|
||||||
m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix()))
|
m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix()))
|
||||||
|
|
||||||
atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix())
|
m.lastSendTimestamp.Store(time.Now().Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := time.Duration(0)
|
ts := time.Duration(0)
|
||||||
|
|
|
@ -17,9 +17,10 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
@ -51,11 +52,11 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
|
||||||
testutil.Ok(b, err)
|
testutil.Ok(b, err)
|
||||||
defer h.Close()
|
defer h.Close()
|
||||||
|
|
||||||
var count int64
|
var count atomic.Int64
|
||||||
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
i := atomic.AddInt64(&count, 1)
|
i := count.Inc()
|
||||||
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))))
|
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
16
web/web.go
16
web/web.go
|
@ -34,7 +34,6 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
template_text "text/template"
|
template_text "text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -51,9 +50,8 @@ import (
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/common/route"
|
"github.com/prometheus/common/route"
|
||||||
"github.com/prometheus/common/server"
|
"github.com/prometheus/common/server"
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
|
||||||
"github.com/prometheus/prometheus/tsdb/index"
|
|
||||||
"github.com/soheilhy/cmux"
|
"github.com/soheilhy/cmux"
|
||||||
|
"go.uber.org/atomic"
|
||||||
"golang.org/x/net/netutil"
|
"golang.org/x/net/netutil"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
@ -64,6 +62,8 @@ import (
|
||||||
"github.com/prometheus/prometheus/scrape"
|
"github.com/prometheus/prometheus/scrape"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/template"
|
"github.com/prometheus/prometheus/template"
|
||||||
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
"github.com/prometheus/prometheus/util/httputil"
|
"github.com/prometheus/prometheus/util/httputil"
|
||||||
api_v1 "github.com/prometheus/prometheus/web/api/v1"
|
api_v1 "github.com/prometheus/prometheus/web/api/v1"
|
||||||
api_v2 "github.com/prometheus/prometheus/web/api/v2"
|
api_v2 "github.com/prometheus/prometheus/web/api/v2"
|
||||||
|
@ -202,7 +202,7 @@ type Handler struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
now func() model.Time
|
now func() model.Time
|
||||||
|
|
||||||
ready uint32 // ready is uint32 rather than boolean to be able to use atomic functions.
|
ready atomic.Uint32 // ready is uint32 rather than boolean to be able to use atomic functions.
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyConfig updates the config field of the Handler struct
|
// ApplyConfig updates the config field of the Handler struct
|
||||||
|
@ -293,9 +293,8 @@ func New(logger log.Logger, o *Options) *Handler {
|
||||||
notifier: o.Notifier,
|
notifier: o.Notifier,
|
||||||
|
|
||||||
now: model.Now,
|
now: model.Now,
|
||||||
|
|
||||||
ready: 0,
|
|
||||||
}
|
}
|
||||||
|
h.ready.Store(0)
|
||||||
|
|
||||||
factoryTr := func(_ context.Context) api_v1.TargetRetriever { return h.scrapeManager }
|
factoryTr := func(_ context.Context) api_v1.TargetRetriever { return h.scrapeManager }
|
||||||
factoryAr := func(_ context.Context) api_v1.AlertmanagerRetriever { return h.notifier }
|
factoryAr := func(_ context.Context) api_v1.AlertmanagerRetriever { return h.notifier }
|
||||||
|
@ -484,13 +483,12 @@ func serveDebug(w http.ResponseWriter, req *http.Request) {
|
||||||
|
|
||||||
// Ready sets Handler to be ready.
|
// Ready sets Handler to be ready.
|
||||||
func (h *Handler) Ready() {
|
func (h *Handler) Ready() {
|
||||||
atomic.StoreUint32(&h.ready, 1)
|
h.ready.Store(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verifies whether the server is ready or not.
|
// Verifies whether the server is ready or not.
|
||||||
func (h *Handler) isReady() bool {
|
func (h *Handler) isReady() bool {
|
||||||
ready := atomic.LoadUint32(&h.ready)
|
return h.ready.Load() > 0
|
||||||
return ready > 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks if server is ready, calls f if it is, returns 503 if it is not.
|
// Checks if server is ready, calls f if it is, returns 503 if it is not.
|
||||||
|
|
Loading…
Reference in a new issue