diff --git a/go.mod b/go.mod index 948a525e7..390c28a7d 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible go.mongodb.org/mongo-driver v1.3.2 // indirect - go.uber.org/atomic v1.6.0 // indirect + go.uber.org/atomic v1.6.0 go.uber.org/goleak v1.0.0 golang.org/x/net v0.0.0-20200707034311-ab3426394381 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index a654682d1..e211b23d5 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -25,12 +25,12 @@ import ( "sort" "strconv" "sync" - "sync/atomic" "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/chunkenc" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" + "go.uber.org/atomic" ) // Head chunk file header fields constants. @@ -78,9 +78,7 @@ func (e *CorruptionErr) Error() string { // ChunkDiskMapper is for writing the Head block chunks to the disk // and access chunks via mmapped file. type ChunkDiskMapper struct { - // Keep all 64bit atomically accessed variables at the top of this struct. - // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info. - curFileNumBytes int64 // Bytes written in current open file. + curFileNumBytes atomic.Int64 // Bytes written in current open file. /// Writer. dir *os.File @@ -343,7 +341,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { }() cdm.size += cdm.curFileSize() - atomic.StoreInt64(&cdm.curFileNumBytes, int64(n)) + cdm.curFileNumBytes.Store(int64(n)) if cdm.curFile != nil { cdm.readPathMtx.Lock() @@ -394,7 +392,7 @@ func (cdm *ChunkDiskMapper) finalizeCurFile() error { func (cdm *ChunkDiskMapper) write(b []byte) error { n, err := cdm.chkWriter.Write(b) - atomic.AddInt64(&cdm.curFileNumBytes, int64(n)) + cdm.curFileNumBytes.Add(int64(n)) return err } @@ -736,7 +734,7 @@ func (cdm *ChunkDiskMapper) Size() int64 { } func (cdm *ChunkDiskMapper) curFileSize() int64 { - return atomic.LoadInt64(&cdm.curFileNumBytes) + return cdm.curFileNumBytes.Load() } // Close closes all the open files in ChunkDiskMapper. diff --git a/tsdb/db.go b/tsdb/db.go index e02ad5fae..57003968d 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -762,7 +762,7 @@ func (db *DB) Compact() (err error) { break } mint := db.head.MinTime() - maxt := rangeForTimestamp(mint, db.head.chunkRange) + maxt := rangeForTimestamp(mint, db.head.chunkRange.Load()) // Wrap head into a range that bounds all reads to it. // We remove 1 millisecond from maxt because block diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2c26cebf3..0a6fc8fed 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -575,7 +575,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { testutil.Ok(t, err) // Hackingly introduce "race", by having lower max time then maxTime in last chunk. - db.head.maxTime = db.head.maxTime - 10 + db.head.maxTime.Sub(10) defer func() { testutil.Ok(t, os.RemoveAll(snap)) diff --git a/tsdb/head.go b/tsdb/head.go index 066c722ea..a94f41db8 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -21,7 +21,6 @@ import ( "sort" "strings" "sync" - "sync/atomic" "time" "github.com/go-kit/kit/log" @@ -38,6 +37,7 @@ import ( "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/wal" + "go.uber.org/atomic" ) var ( @@ -51,13 +51,11 @@ var ( // Head handles reads and writes of time series data within a time window. type Head struct { - // Keep all 64bit atomically accessed variables at the top of this struct. - // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info. - chunkRange int64 - numSeries uint64 - minTime, maxTime int64 // Current min and max of the samples included in the head. - minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. - lastSeriesID uint64 + chunkRange atomic.Int64 + numSeries atomic.Uint64 + minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. + minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. + lastSeriesID atomic.Uint64 metrics *headMetrics wal *wal.WAL @@ -302,9 +300,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int h := &Head{ wal: wal, logger: l, - chunkRange: chunkRange, - minTime: math.MaxInt64, - maxTime: math.MinInt64, series: newStripeSeries(stripeSize, seriesCallback), values: map[string]stringset{}, symbols: map[string]struct{}{}, @@ -320,6 +315,9 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int chunkDirRoot: chkDirRoot, seriesCallback: seriesCallback, } + h.chunkRange.Store(chunkRange) + h.minTime.Store(math.MaxInt64) + h.maxTime.Store(math.MinInt64) h.metrics = newHeadMetrics(h, r) if pool == nil { @@ -389,7 +387,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { if mint >= lt { break } - if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) { + if h.minTime.CAS(lt, mint) { break } } @@ -398,7 +396,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { if maxt <= ht { break } - if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) { + if h.maxTime.CAS(ht, maxt) { break } } @@ -407,7 +405,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. - var unknownRefs uint64 + var unknownRefs atomic.Uint64 // Start workers that each process samples for a partition of the series ID space. // They are connected through a ring of channels which ensures that all sample batches @@ -459,8 +457,8 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks inputs[i] = make(chan []record.RefSample, 300) go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { - unknown := h.processWALSamples(h.minValidTime, input, output) - atomic.AddUint64(&unknownRefs, unknown) + unknown := h.processWALSamples(h.minValidTime.Load(), input, output) + unknownRefs.Add(unknown) wg.Done() }(inputs[i], outputs[i]) } @@ -546,8 +544,8 @@ Outer: multiRef[s.Ref] = series.ref } - if h.lastSeriesID < s.Ref { - h.lastSeriesID = s.Ref + if h.lastSeriesID.Load() < s.Ref { + h.lastSeriesID.Store(s.Ref) } } //lint:ignore SA6002 relax staticcheck verification. @@ -588,11 +586,11 @@ Outer: case []tombstones.Stone: for _, s := range v { for _, itv := range s.Intervals { - if itv.Maxt < h.minValidTime { + if itv.Maxt < h.minValidTime.Load() { continue } if m := h.series.getByID(s.Ref); m == nil { - unknownRefs++ + unknownRefs.Inc() continue } h.tombstones.AddInterval(s.Ref, itv) @@ -627,7 +625,7 @@ Outer: return errors.Wrap(r.Err(), "read records") } - if unknownRefs > 0 { + if unknownRefs.Load() > 0 { level.Warn(h.logger).Log("msg", "Unknown series references", "count", unknownRefs) } return nil @@ -637,7 +635,7 @@ Outer: // It should be called before using an appender so that it // limits the ingested samples to the head min valid time. func (h *Head) Init(minValidTime int64) error { - h.minValidTime = minValidTime + h.minValidTime.Store(minValidTime) defer h.postings.EnsureOrder() defer h.gc() // After loading the wal remove the obsolete data from the head. @@ -729,7 +727,7 @@ func (h *Head) Init(minValidTime int64) error { func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) { mmappedChunks := map[uint64][]*mmappedChunk{} if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { - if maxt < h.minValidTime { + if maxt < h.minValidTime.Load() { return nil } @@ -786,12 +784,12 @@ func (h *Head) Truncate(mint int64) (err error) { if h.MinTime() >= mint && !initialize { return nil } - atomic.StoreInt64(&h.minTime, mint) - atomic.StoreInt64(&h.minValidTime, mint) + h.minTime.Store(mint) + h.minValidTime.Store(mint) // Ensure that max time is at least as high as min time. for h.MaxTime() < mint { - atomic.CompareAndSwapInt64(&h.maxTime, h.MaxTime(), mint) + h.maxTime.CAS(h.MaxTime(), mint) } // This was an initial call to Truncate after loading blocks on startup. @@ -894,12 +892,12 @@ func (h *Head) Truncate(mint int64) (err error) { // for a completely fresh head with an empty WAL. // Returns true if the initialization took an effect. func (h *Head) initTime(t int64) (initialized bool) { - if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) { + if !h.minTime.CAS(math.MaxInt64, t) { return false } // Ensure that max time is initialized to at least the min time we just set. // Concurrent appenders may already have set it to a higher value. - atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t) + h.maxTime.CAS(math.MinInt64, t) return true } @@ -1030,7 +1028,7 @@ func (h *Head) appender() *headAppender { head: h, // Set the minimum valid time to whichever is greater the head min valid time or the compaction window. // This ensures that no samples will be added within the compaction window to avoid races. - minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2), + minValidTime: max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2), mint: math.MaxInt64, maxt: math.MinInt64, samples: h.getAppendBuffer(), @@ -1320,9 +1318,7 @@ func (h *Head) gc() { h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) - // Using AddUint64 to subtract series removed. - // See: https://golang.org/pkg/sync/atomic/#AddUint64. - atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1)) + h.numSeries.Sub(uint64(seriesRemoved)) // Remove deleted series IDs from the postings lists. h.postings.Delete(deleted) @@ -1410,7 +1406,7 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkRead // NumSeries returns the number of active series in the head. func (h *Head) NumSeries() uint64 { - return atomic.LoadUint64(&h.numSeries) + return h.numSeries.Load() } // Meta returns meta information about the head. @@ -1430,19 +1426,19 @@ func (h *Head) Meta() BlockMeta { // MinTime returns the lowest time bound on visible data in the head. func (h *Head) MinTime() int64 { - return atomic.LoadInt64(&h.minTime) + return h.minTime.Load() } // MaxTime returns the highest timestamp seen in data of the head. func (h *Head) MaxTime() int64 { - return atomic.LoadInt64(&h.maxTime) + return h.maxTime.Load() } // compactable returns whether the head has a compactable range. // The head has a compactable range when the head time range is 1.5 times the chunk range. // The 0.5 acts as a buffer of the appendable window. func (h *Head) compactable() bool { - return h.MaxTime()-h.MinTime() > h.chunkRange/2*3 + return h.MaxTime()-h.MinTime() > h.chunkRange.Load()/2*3 } // Close flushes the WAL and closes the head. @@ -1696,13 +1692,13 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e } // Optimistically assume that we are the first one to create the series. - id := atomic.AddUint64(&h.lastSeriesID, 1) + id := h.lastSeriesID.Inc() return h.getOrCreateWithID(id, hash, lset) } func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) { - s := newMemSeries(lset, id, h.chunkRange, &h.memChunkPool) + s := newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool) s, created, err := h.series.getOrSet(hash, s) if err != nil { @@ -1713,7 +1709,7 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie } h.metrics.seriesCreated.Inc() - atomic.AddUint64(&h.numSeries, 1) + h.numSeries.Inc() h.symMtx.Lock() defer h.symMtx.Unlock() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 368ed8c8c..2cdc9dbc3 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -234,7 +234,7 @@ func TestHead_ReadWAL(t *testing.T) { populateTestWAL(t, w, entries) testutil.Ok(t, head.Init(math.MinInt64)) - testutil.Equals(t, uint64(101), head.lastSeriesID) + testutil.Equals(t, uint64(101), head.lastSeriesID.Load()) s10 := head.series.getByID(10) s11 := head.series.getByID(11) @@ -1721,16 +1721,16 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - testutil.Equals(t, int64(math.MinInt64), db.head.minValidTime) + testutil.Equals(t, int64(math.MinInt64), db.head.minValidTime.Load()) testutil.Ok(t, db.Compact()) - testutil.Assert(t, db.head.minValidTime > 0, "") + testutil.Assert(t, db.head.minValidTime.Load() > 0, "") app = db.Appender() - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime-2, 99) + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99) testutil.Equals(t, storage.ErrOutOfBounds, err) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime-1, 99) + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99) testutil.Equals(t, storage.ErrOutOfBounds, err) testutil.Equals(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) testutil.Ok(t, app.Commit()) @@ -1738,22 +1738,22 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { // Some more valid samples for out of order. app = db.Appender() for i := 1; i <= 5; i++ { - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+int64(i), 99) + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99) testutil.Ok(t, err) } testutil.Ok(t, app.Commit()) // Test out of order metric. app = db.Appender() - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+2, 99) + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99) testutil.Equals(t, storage.ErrOutOfOrderSample, err) testutil.Equals(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+3, 99) + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99) testutil.Equals(t, storage.ErrOutOfOrderSample, err) testutil.Equals(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+4, 99) + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99) testutil.Equals(t, storage.ErrOutOfOrderSample, err) testutil.Equals(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) testutil.Ok(t, app.Commit())