mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Remove unreturned locks, detect writes on closed heads
This commit is contained in:
parent
e0b33a7a28
commit
65b846ae5b
3
db.go
3
db.go
|
@ -401,9 +401,8 @@ func (db *DB) Close() error {
|
|||
close(db.stopc)
|
||||
<-db.donec
|
||||
|
||||
// Lock mutex and leave it locked so we panic if there's a bug causing
|
||||
// the block to be used afterwards.
|
||||
db.mtx.Lock()
|
||||
defer db.mtx.Unlock()
|
||||
|
||||
var g errgroup.Group
|
||||
|
||||
|
|
10
head.go
10
head.go
|
@ -42,6 +42,7 @@ type headBlock struct {
|
|||
wal *WAL
|
||||
|
||||
activeWriters uint64
|
||||
closed bool
|
||||
|
||||
// descs holds all chunk descs for the head block. Each chunk implicitly
|
||||
// is assigned the index as its ID.
|
||||
|
@ -137,9 +138,8 @@ func (h *headBlock) inBounds(t int64) bool {
|
|||
|
||||
// Close syncs all data and closes underlying resources of the head block.
|
||||
func (h *headBlock) Close() error {
|
||||
// Lock mutex and leave it locked so we panic if there's a bug causing
|
||||
// the block to be used afterwards.
|
||||
h.mtx.Lock()
|
||||
defer h.mtx.Unlock()
|
||||
|
||||
if err := h.wal.Close(); err != nil {
|
||||
return errors.Wrapf(err, "close WAL for head %s", h.dir)
|
||||
|
@ -156,6 +156,8 @@ func (h *headBlock) Close() error {
|
|||
if meta.ULID == h.meta.ULID {
|
||||
return writeMetaFile(h.dir, &h.meta)
|
||||
}
|
||||
|
||||
h.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -175,6 +177,10 @@ func (h *headBlock) Appender() Appender {
|
|||
atomic.AddUint64(&h.activeWriters, 1)
|
||||
|
||||
h.mtx.RLock()
|
||||
|
||||
if h.closed {
|
||||
panic(fmt.Sprintf("block %s already closed", h.dir))
|
||||
}
|
||||
return &headAppender{headBlock: h, samples: getHeadAppendBuffer()}
|
||||
}
|
||||
|
||||
|
|
15
querier.go
15
querier.go
|
@ -66,6 +66,12 @@ func (s *DB) Querier(mint, maxt int64) Querier {
|
|||
|
||||
// TODO(fabxc): find nicer solution.
|
||||
if hb, ok := b.(*headBlock); ok {
|
||||
// TODO(fabxc): temporary refactored.
|
||||
hb.mtx.RLock()
|
||||
if hb.closed {
|
||||
panic(fmt.Sprintf("block %s already closed", hb.dir))
|
||||
}
|
||||
hb.mtx.RUnlock()
|
||||
q.postingsMapper = hb.remapPostings
|
||||
}
|
||||
|
||||
|
@ -135,15 +141,6 @@ type blockQuerier struct {
|
|||
mint, maxt int64
|
||||
}
|
||||
|
||||
func newBlockQuerier(ix IndexReader, c ChunkReader, mint, maxt int64) *blockQuerier {
|
||||
return &blockQuerier{
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
index: ix,
|
||||
chunks: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
||||
var (
|
||||
its []Postings
|
||||
|
|
Loading…
Reference in a new issue