diff --git a/compact.go b/compact.go index a1720605a..468c0aaa0 100644 --- a/compact.go +++ b/compact.go @@ -141,11 +141,11 @@ func (c *compactor) match(bs []*BlockMeta) bool { return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange } -var entropy = rand.New(rand.NewSource(time.Now().UnixNano())) - func mergeBlockMetas(blocks ...Block) (res BlockMeta) { m0 := blocks[0].Meta() + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + res.Sequence = m0.Sequence res.MinTime = m0.MinTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime diff --git a/db.go b/db.go index 2c00b7d79..1c5e0b330 100644 --- a/db.go +++ b/db.go @@ -232,6 +232,9 @@ func (db *DB) retentionCutoff() (bool, error) { db.mtx.RLock() defer db.mtx.RUnlock() + db.headmtx.RLock() + defer db.headmtx.RUnlock() + // We don't count the span covered by head blocks towards the // retention time as it generally makes up a fraction of it. if len(db.blocks)-len(db.heads) == 0 { diff --git a/head.go b/head.go index 4864276f9..55b80940f 100644 --- a/head.go +++ b/head.go @@ -64,6 +64,9 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head if err := os.MkdirAll(tmp, 0777); err != nil { return nil, err } + + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + ulid, err := ulid.New(ulid.Now(), entropy) if err != nil { return nil, err @@ -556,8 +559,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { // Returned postings have no longer monotonic IDs and MUST NOT be used for regular // postings set operations, i.e. intersect and merge. func (h *headBlock) remapPostings(p Postings) Postings { - // Expand the postings but only up until the point where the mapper - // covers existing metrics. ep := make([]uint32, 0, 64) for p.Next() { @@ -603,6 +604,9 @@ func (s *memSeries) cut() *memChunk { } func (s *memSeries) append(t int64, v float64) bool { + s.mtx.Lock() + defer s.mtx.Unlock() + var c *memChunk if s.app == nil || s.head().samples > 2000 { diff --git a/wal.go b/wal.go index 2a49dcf41..9a1ab315a 100644 --- a/wal.go +++ b/wal.go @@ -61,6 +61,15 @@ const ( walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB ) +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable *crc32.Table + +func init() { + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) +} + // OpenWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { @@ -84,7 +93,7 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, donec: make(chan struct{}), stopc: make(chan struct{}), segmentSize: walSegmentSizeBytes, - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + crc32: crc32.New(castagnoliTable), } if err := w.initSegments(); err != nil { return nil, err