Fix head block stats races

This commit is contained in:
Fabian Reinartz 2017-01-07 18:02:17 +01:00
parent 6aa922c5a6
commit 1943f8d1bb
7 changed files with 58 additions and 35 deletions

View file

@ -4,6 +4,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
"sync"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -20,11 +21,14 @@ type block interface {
} }
type BlockStats struct { type BlockStats struct {
MinTime int64 // Time range of samples in the block.
MaxTime int64 MinTime, MaxTime int64
SampleCount uint64 SampleCount uint64
SeriesCount uint32 SeriesCount uint64
ChunkCount uint32 ChunkCount uint64
mtx sync.RWMutex
} }
const ( const (

View file

@ -219,7 +219,7 @@ func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWrite
return err return err
} }
stats.ChunkCount += uint32(len(chunks)) stats.ChunkCount += uint64(len(chunks))
stats.SeriesCount++ stats.SeriesCount++
for _, l := range lset { for _, l := range lset {

3
db.go
View file

@ -352,8 +352,7 @@ func (db *DB) appendBatch(samples []hashedSample) error {
db.metrics.samplesAppended.Add(float64(len(samples))) db.metrics.samplesAppended.Add(float64(len(samples)))
} }
// TODO(fabxc): randomize over time and use better scoring function. if head.fullness() > 1.0 {
if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 {
select { select {
case db.cutc <- struct{}{}: case db.cutc <- struct{}{}:
default: default:

53
head.go
View file

@ -5,7 +5,6 @@ import (
"math" "math"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/bradfitz/slice" "github.com/bradfitz/slice"
@ -98,7 +97,13 @@ func (h *HeadBlock) dir() string { return h.d }
func (h *HeadBlock) persisted() bool { return false } func (h *HeadBlock) persisted() bool { return false }
func (h *HeadBlock) index() IndexReader { return h } func (h *HeadBlock) index() IndexReader { return h }
func (h *HeadBlock) series() SeriesReader { return h } func (h *HeadBlock) series() SeriesReader { return h }
func (h *HeadBlock) stats() BlockStats { return *h.bstats }
func (h *HeadBlock) stats() BlockStats {
h.bstats.mtx.RLock()
defer h.bstats.mtx.RUnlock()
return *h.bstats
}
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
@ -221,10 +226,6 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
h.postings.add(cd.ref, term{}) h.postings.add(cd.ref, term{})
// For the head block there's exactly one chunk per series.
atomic.AddUint32(&h.bstats.ChunkCount, 1)
atomic.AddUint32(&h.bstats.SeriesCount, 1)
return cd return cd
} }
@ -307,8 +308,11 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
h.mtx.RLock() h.mtx.RLock()
} }
total := len(samples) var (
total = uint64(len(samples))
mint = int64(math.MaxInt64)
maxt = int64(math.MinInt64)
)
for _, s := range samples { for _, s := range samples {
cd := h.descs[s.ref] cd := h.descs[s.ref]
// Skip duplicate samples. // Skip duplicate samples.
@ -318,23 +322,38 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
} }
cd.append(s.t, s.v) cd.append(s.t, s.v)
for t := h.bstats.MaxTime; s.t > t; t = h.bstats.MaxTime { if mint > s.t {
if atomic.CompareAndSwapInt64(&h.bstats.MaxTime, t, s.t) { mint = s.t
break
}
} }
for t := h.bstats.MinTime; s.t < t; t = h.bstats.MinTime { if maxt < s.t {
if atomic.CompareAndSwapInt64(&h.bstats.MinTime, t, s.t) { maxt = s.t
break
}
} }
} }
atomic.AddUint64(&h.bstats.SampleCount, uint64(total)) h.bstats.mtx.Lock()
defer h.bstats.mtx.Unlock()
h.bstats.SampleCount += total
h.bstats.SeriesCount += uint64(len(newSeries))
h.bstats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series
if mint < h.bstats.MinTime {
h.bstats.MinTime = mint
}
if maxt > h.bstats.MaxTime {
h.bstats.MaxTime = maxt
}
return nil return nil
} }
func (h *HeadBlock) fullness() float64 {
h.bstats.mtx.RLock()
defer h.bstats.mtx.RUnlock()
return float64(h.bstats.SampleCount) / float64(h.bstats.SeriesCount+1) / 250
}
func (h *HeadBlock) updateMapping() { func (h *HeadBlock) updateMapping() {
h.mapper.mtx.Lock() h.mapper.mtx.Lock()
defer h.mapper.mtx.Unlock() defer h.mapper.mtx.Unlock()

View file

@ -212,9 +212,9 @@ func (r *indexReader) Stats() (BlockStats, error) {
return BlockStats{ return BlockStats{
MinTime: int64(binary.BigEndian.Uint64(b)), MinTime: int64(binary.BigEndian.Uint64(b)),
MaxTime: int64(binary.BigEndian.Uint64(b[8:])), MaxTime: int64(binary.BigEndian.Uint64(b[8:])),
SeriesCount: binary.BigEndian.Uint32(b[16:]), SeriesCount: binary.BigEndian.Uint64(b[16:]),
ChunkCount: binary.BigEndian.Uint32(b[20:]), ChunkCount: binary.BigEndian.Uint64(b[24:]),
SampleCount: binary.BigEndian.Uint64(b[24:]), SampleCount: binary.BigEndian.Uint64(b[32:]),
}, nil }, nil
} }

View file

@ -5,6 +5,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
@ -21,7 +22,7 @@ func BenchmarkWALWrite(b *testing.B) {
require.NoError(b, os.RemoveAll(d)) require.NoError(b, os.RemoveAll(d))
}() }()
wal, err := OpenWAL(d) wal, err := OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err) require.NoError(b, err)
f, err := os.Open("cmd/tsdb/testdata.1m") f, err := os.Open("cmd/tsdb/testdata.1m")
@ -78,7 +79,7 @@ func BenchmarkWALRead(b *testing.B) {
require.NoError(b, os.RemoveAll(d)) require.NoError(b, os.RemoveAll(d))
}() }()
wal, err := OpenWAL(d) wal, err := OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err) require.NoError(b, err)
var ( var (
@ -111,7 +112,7 @@ func BenchmarkWALRead(b *testing.B) {
b.ResetTimer() b.ResetTimer()
wal, err = OpenWAL(d) wal, err = OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err) require.NoError(b, err)
var numSeries, numSamples int var numSeries, numSamples int
@ -144,7 +145,7 @@ func BenchmarkWALReadIntoHead(b *testing.B) {
require.NoError(b, os.RemoveAll(d)) require.NoError(b, os.RemoveAll(d))
}() }()
wal, err := OpenWAL(d) wal, err := OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err) require.NoError(b, err)
var ( var (
@ -177,7 +178,7 @@ func BenchmarkWALReadIntoHead(b *testing.B) {
b.ResetTimer() b.ResetTimer()
_, err = OpenHeadBlock(d) _, err = OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err) require.NoError(b, err)
// stat, _ := head.wal.f.Stat() // stat, _ := head.wal.f.Stat()

View file

@ -259,9 +259,9 @@ func (w *indexWriter) WriteStats(stats BlockStats) error {
binary.BigEndian.PutUint64(b[0:], uint64(stats.MinTime)) binary.BigEndian.PutUint64(b[0:], uint64(stats.MinTime))
binary.BigEndian.PutUint64(b[8:], uint64(stats.MaxTime)) binary.BigEndian.PutUint64(b[8:], uint64(stats.MaxTime))
binary.BigEndian.PutUint32(b[16:], stats.SeriesCount) binary.BigEndian.PutUint64(b[16:], stats.SeriesCount)
binary.BigEndian.PutUint32(b[20:], stats.ChunkCount) binary.BigEndian.PutUint64(b[24:], stats.ChunkCount)
binary.BigEndian.PutUint64(b[24:], stats.SampleCount) binary.BigEndian.PutUint64(b[32:], stats.SampleCount)
err := w.section(64, flagStd, func(wr io.Writer) error { err := w.section(64, flagStd, func(wr io.Writer) error {
return w.write(wr, b[:]) return w.write(wr, b[:])