mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
make Close methods for the querier safe to call more than once. (#581)
* make Close methods for the querier safe to call more than once. Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
parent
19d402d154
commit
5512826f13
|
@ -1,4 +1,6 @@
|
||||||
## master / unreleased
|
## master / unreleased
|
||||||
|
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.
|
||||||
|
|
||||||
|
|
||||||
## 0.7.1
|
## 0.7.1
|
||||||
- [ENHANCEMENT] Reduce memory usage in mergedPostings.Seek
|
- [ENHANCEMENT] Reduce memory usage in mergedPostings.Seek
|
||||||
|
|
|
@ -205,6 +205,8 @@ type blockQuerier struct {
|
||||||
chunks ChunkReader
|
chunks ChunkReader
|
||||||
tombstones TombstoneReader
|
tombstones TombstoneReader
|
||||||
|
|
||||||
|
closed bool
|
||||||
|
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,12 +254,15 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *blockQuerier) Close() error {
|
func (q *blockQuerier) Close() error {
|
||||||
var merr tsdb_errors.MultiError
|
if q.closed {
|
||||||
|
return errors.New("block querier already closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
var merr tsdb_errors.MultiError
|
||||||
merr.Add(q.index.Close())
|
merr.Add(q.index.Close())
|
||||||
merr.Add(q.chunks.Close())
|
merr.Add(q.chunks.Close())
|
||||||
merr.Add(q.tombstones.Close())
|
merr.Add(q.tombstones.Close())
|
||||||
|
q.closed = true
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1898,3 +1898,30 @@ func TestPostingsForMatchers(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestClose ensures that calling Close more than once doesn't block and doesn't panic.
|
||||||
|
func TestClose(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "test_storage")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Opening test dir failed: %s", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
createBlock(t, dir, genSeries(1, 1, 0, 10))
|
||||||
|
createBlock(t, dir, genSeries(1, 1, 10, 20))
|
||||||
|
|
||||||
|
db, err := Open(dir, nil, nil, DefaultOptions)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Opening test storage failed: %s", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, db.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
q, err := db.Querier(0, 20)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Ok(t, q.Close())
|
||||||
|
testutil.NotOk(t, q.Close())
|
||||||
|
}
|
||||||
|
|
|
@ -159,9 +159,9 @@ type WAL struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
segmentSize int
|
segmentSize int
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
segment *Segment // active segment
|
segment *Segment // Active segment.
|
||||||
donePages int // pages written to the segment
|
donePages int // Pages written to the segment.
|
||||||
page *page // active page
|
page *page // Active page.
|
||||||
stopc chan chan struct{}
|
stopc chan chan struct{}
|
||||||
actorc chan func()
|
actorc chan func()
|
||||||
closed bool // To allow calling Close() more than once without blocking.
|
closed bool // To allow calling Close() more than once without blocking.
|
||||||
|
@ -606,7 +606,7 @@ func (w *WAL) Close() (err error) {
|
||||||
defer w.mtx.Unlock()
|
defer w.mtx.Unlock()
|
||||||
|
|
||||||
if w.closed {
|
if w.closed {
|
||||||
return nil
|
return errors.New("wal already closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush the last page and zero out all its remaining size.
|
// Flush the last page and zero out all its remaining size.
|
||||||
|
|
|
@ -305,6 +305,19 @@ func TestCorruptAndCarryOn(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestClose ensures that calling Close more than once doesn't panic and doesn't block.
|
||||||
|
func TestClose(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "wal_repair")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
w, err := NewSize(nil, nil, dir, pageSize)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Ok(t, w.Close())
|
||||||
|
testutil.NotOk(t, w.Close())
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkWAL_LogBatched(b *testing.B) {
|
func BenchmarkWAL_LogBatched(b *testing.B) {
|
||||||
dir, err := ioutil.TempDir("", "bench_logbatch")
|
dir, err := ioutil.TempDir("", "bench_logbatch")
|
||||||
testutil.Ok(b, err)
|
testutil.Ok(b, err)
|
||||||
|
|
Loading…
Reference in a new issue