mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Merge pull request #204 from prometheus/fix-wal-closing
Close WAL when closing the DB
This commit is contained in:
commit
9703325901
2
db.go
2
db.go
|
@ -101,7 +101,6 @@ type DB struct {
|
||||||
opts *Options
|
opts *Options
|
||||||
chunkPool chunks.Pool
|
chunkPool chunks.Pool
|
||||||
compactor Compactor
|
compactor Compactor
|
||||||
wal WAL
|
|
||||||
|
|
||||||
// Mutex for that must be held when modifying the general block layout.
|
// Mutex for that must be held when modifying the general block layout.
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
@ -572,6 +571,7 @@ func (db *DB) Close() error {
|
||||||
if db.lockf != nil {
|
if db.lockf != nil {
|
||||||
merr.Add(db.lockf.Unlock())
|
merr.Add(db.lockf.Unlock())
|
||||||
}
|
}
|
||||||
|
merr.Add(db.head.Close())
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
27
db_test.go
27
db_test.go
|
@ -524,3 +524,30 @@ func TestDB_e2e(t *testing.T) {
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWALFlushedOnDBClose(t *testing.T) {
|
||||||
|
tmpdir, _ := ioutil.TempDir("", "test")
|
||||||
|
defer os.RemoveAll(tmpdir)
|
||||||
|
|
||||||
|
db, err := Open(tmpdir, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
|
||||||
|
|
||||||
|
app := db.Appender()
|
||||||
|
_, err = app.Add(lbls, 0, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
db.Close()
|
||||||
|
|
||||||
|
db, err = Open(tmpdir, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
q, err := db.Querier(0, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
values, err := q.LabelValues("labelname")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, values, []string{"labelvalue"})
|
||||||
|
}
|
||||||
|
|
5
head.go
5
head.go
|
@ -739,6 +739,11 @@ func (h *Head) MaxTime() int64 {
|
||||||
return atomic.LoadInt64(&h.maxTime)
|
return atomic.LoadInt64(&h.maxTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close flushes the WAL and closes the head.
|
||||||
|
func (h *Head) Close() error {
|
||||||
|
return h.wal.Close()
|
||||||
|
}
|
||||||
|
|
||||||
type headChunkReader struct {
|
type headChunkReader struct {
|
||||||
head *Head
|
head *Head
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
|
|
@ -37,6 +37,7 @@ func BenchmarkCreateSeries(b *testing.B) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
}
|
}
|
||||||
|
defer h.Close()
|
||||||
|
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
@ -131,6 +132,7 @@ func TestHead_ReadWAL(t *testing.T) {
|
||||||
|
|
||||||
head, err := NewHead(nil, nil, wal, 1000)
|
head, err := NewHead(nil, nil, wal, 1000)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer head.Close()
|
||||||
|
|
||||||
require.NoError(t, head.ReadWAL())
|
require.NoError(t, head.ReadWAL())
|
||||||
require.Equal(t, uint64(100), head.lastSeriesID)
|
require.Equal(t, uint64(100), head.lastSeriesID)
|
||||||
|
@ -163,6 +165,7 @@ func TestHead_ReadWAL(t *testing.T) {
|
||||||
func TestHead_Truncate(t *testing.T) {
|
func TestHead_Truncate(t *testing.T) {
|
||||||
h, err := NewHead(nil, nil, nil, 1000)
|
h, err := NewHead(nil, nil, nil, 1000)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer h.Close()
|
||||||
|
|
||||||
h.initTime(0)
|
h.initTime(0)
|
||||||
|
|
||||||
|
@ -275,6 +278,7 @@ func TestHeadDeleteSimple(t *testing.T) {
|
||||||
|
|
||||||
head, err := NewHead(nil, nil, nil, 1000)
|
head, err := NewHead(nil, nil, nil, 1000)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer head.Close()
|
||||||
|
|
||||||
app := head.Appender()
|
app := head.Appender()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue