Fix various races

This fixes different race condition encoutnered when running Prometheus.
It reduces the overall performance in the synthetic benchmark a fair bit
but has no indiciations of impacting a real-world setup notably.
This commit is contained in:
Fabian Reinartz 2017-03-20 14:45:27 +01:00
parent 2ef3682560
commit 9c93f8f2aa
4 changed files with 21 additions and 5 deletions

View file

@ -141,11 +141,11 @@ func (c *compactor) match(bs []*BlockMeta) bool {
return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange
} }
var entropy = rand.New(rand.NewSource(time.Now().UnixNano()))
func mergeBlockMetas(blocks ...Block) (res BlockMeta) { func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
m0 := blocks[0].Meta() m0 := blocks[0].Meta()
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
res.Sequence = m0.Sequence res.Sequence = m0.Sequence
res.MinTime = m0.MinTime res.MinTime = m0.MinTime
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime

3
db.go
View file

@ -232,6 +232,9 @@ func (db *DB) retentionCutoff() (bool, error) {
db.mtx.RLock() db.mtx.RLock()
defer db.mtx.RUnlock() defer db.mtx.RUnlock()
db.headmtx.RLock()
defer db.headmtx.RUnlock()
// We don't count the span covered by head blocks towards the // We don't count the span covered by head blocks towards the
// retention time as it generally makes up a fraction of it. // retention time as it generally makes up a fraction of it.
if len(db.blocks)-len(db.heads) == 0 { if len(db.blocks)-len(db.heads) == 0 {

View file

@ -64,6 +64,9 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head
if err := os.MkdirAll(tmp, 0777); err != nil { if err := os.MkdirAll(tmp, 0777); err != nil {
return nil, err return nil, err
} }
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
ulid, err := ulid.New(ulid.Now(), entropy) ulid, err := ulid.New(ulid.Now(), entropy)
if err != nil { if err != nil {
return nil, err return nil, err
@ -556,8 +559,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
// Returned postings have no longer monotonic IDs and MUST NOT be used for regular // Returned postings have no longer monotonic IDs and MUST NOT be used for regular
// postings set operations, i.e. intersect and merge. // postings set operations, i.e. intersect and merge.
func (h *headBlock) remapPostings(p Postings) Postings { func (h *headBlock) remapPostings(p Postings) Postings {
// Expand the postings but only up until the point where the mapper
// covers existing metrics.
ep := make([]uint32, 0, 64) ep := make([]uint32, 0, 64)
for p.Next() { for p.Next() {
@ -603,6 +604,9 @@ func (s *memSeries) cut() *memChunk {
} }
func (s *memSeries) append(t int64, v float64) bool { func (s *memSeries) append(t int64, v float64) bool {
s.mtx.Lock()
defer s.mtx.Unlock()
var c *memChunk var c *memChunk
if s.app == nil || s.head().samples > 2000 { if s.app == nil || s.head().samples > 2000 {

11
wal.go
View file

@ -61,6 +61,15 @@ const (
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
) )
// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable *crc32.Table
func init() {
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
}
// OpenWAL opens or creates a write ahead log in the given directory. // OpenWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written. // The WAL must be read completely before new data is written.
func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) {
@ -84,7 +93,7 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL,
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
segmentSize: walSegmentSizeBytes, segmentSize: walSegmentSizeBytes,
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), crc32: crc32.New(castagnoliTable),
} }
if err := w.initSegments(); err != nil { if err := w.initSegments(); err != nil {
return nil, err return nil, err