diff --git a/head.go b/head.go index e58f629b7..81b2b0ef4 100644 --- a/head.go +++ b/head.go @@ -34,7 +34,7 @@ type HeadBlock struct { wal *WAL - bstats BlockStats + bstats *BlockStats } // OpenHeadBlock creates a new empty head block. @@ -53,9 +53,10 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { wal: wal, mapper: newPositionMapper(nil), } - - b.bstats.MinTime = math.MaxInt64 - b.bstats.MaxTime = math.MinInt64 + b.bstats = &BlockStats{ + MinTime: math.MaxInt64, + MaxTime: math.MinInt64, + } err = wal.ReadAll(&walHandler{ series: func(lset labels.Labels) { @@ -97,7 +98,7 @@ func (h *HeadBlock) dir() string { return h.d } func (h *HeadBlock) persisted() bool { return false } func (h *HeadBlock) index() IndexReader { return h } func (h *HeadBlock) series() SeriesReader { return h } -func (h *HeadBlock) stats() BlockStats { return h.bstats } +func (h *HeadBlock) stats() BlockStats { return *h.bstats } // Chunk returns the chunk for the reference number. func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { @@ -111,18 +112,12 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { } func (h *HeadBlock) interval() (int64, int64) { - h.mtx.RLock() - defer h.mtx.RUnlock() - return h.bstats.MinTime, h.bstats.MaxTime } // Stats returns statisitics about the indexed data. func (h *HeadBlock) Stats() (BlockStats, error) { - h.mtx.RLock() - defer h.mtx.RUnlock() - - return h.bstats, nil + return *h.bstats, nil } // LabelValues returns the possible label values @@ -227,8 +222,8 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { h.postings.add(cd.ref, term{}) // For the head block there's exactly one chunk per series. - h.bstats.ChunkCount++ - h.bstats.SeriesCount++ + atomic.AddUint32(&h.bstats.ChunkCount, 1) + atomic.AddUint32(&h.bstats.SeriesCount, 1) return cd } @@ -253,6 +248,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { uniqueHashes = map[uint64]uint32{} ) h.mtx.RLock() + defer h.mtx.RUnlock() for i := range samples { s := &samples[i] @@ -286,8 +282,6 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { newSamples = append(newSamples, s) } - h.mtx.RUnlock() - // Write all new series and samples to the WAL and add it to the // in-mem database on success. if err := h.wal.Log(newSeries, samples); err != nil { @@ -297,6 +291,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { // After the samples were successfully written to the WAL, there may // be no further failures. if len(newSeries) > 0 { + h.mtx.RUnlock() h.mtx.Lock() base := len(h.descs) @@ -309,9 +304,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { } h.mtx.Unlock() - h.mtx.RLock() - defer h.mtx.RUnlock() } total := len(samples) @@ -325,20 +318,14 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { } cd.append(s.t, s.v) - if t := h.bstats.MaxTime; s.t > t { - // h.bstats.MaxTime = s.t - for !atomic.CompareAndSwapInt64(&h.bstats.MaxTime, t, s.t) { - if t = h.bstats.MaxTime; s.t <= t { - break - } + for t := h.bstats.MaxTime; s.t > t; t = h.bstats.MaxTime { + if atomic.CompareAndSwapInt64(&h.bstats.MaxTime, t, s.t) { + break } } - if t := h.bstats.MinTime; s.t < t { - // h.bstats.MinTime = s.t - for !atomic.CompareAndSwapInt64(&h.bstats.MinTime, t, s.t) { - if t = h.bstats.MinTime; s.t >= t { - break - } + for t := h.bstats.MinTime; s.t < t; t = h.bstats.MinTime { + if atomic.CompareAndSwapInt64(&h.bstats.MinTime, t, s.t) { + break } } }