diff --git a/db.go b/db.go index 51c49965c..c4e97abcc 100644 --- a/db.go +++ b/db.go @@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error { if meta.Compaction.Generation == 0 { if !ok { - b, err = OpenHeadBlock(dirs[i], db.logger) + b, err = db.openHeadBlock(dirs[i]) if err != nil { return errors.Wrapf(err, "load head at %s", dirs[i]) } @@ -709,6 +709,24 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { return bs } +// openHeadBlock opens the head block at dir. +func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { + var ( + wdir = filepath.Join(dir, "wal") + l = log.With(db.logger, "wal", wdir) + ) + wal, err := OpenSegmentWAL(wdir, l, 5*time.Second) + if err != nil { + return nil, errors.Wrap(err, "open WAL %s") + } + + h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal) + if err != nil { + return nil, errors.Wrapf(err, "open head block %s", dir) + } + return h, nil +} + // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. func (db *DB) cut(mint int64) (headBlock, error) { @@ -718,7 +736,10 @@ func (db *DB) cut(mint int64) (headBlock, error) { if err != nil { return nil, err } - newHead, err := CreateHeadBlock(dir, seq, db.logger, mint, maxt) + if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil { + return nil, errors.Wrapf(err, "touch head block %s", dir) + } + newHead, err := db.openHeadBlock(dir) if err != nil { return nil, err } diff --git a/head.go b/head.go index 6edbae894..b71bbafc0 100644 --- a/head.go +++ b/head.go @@ -69,20 +69,21 @@ type HeadBlock struct { meta BlockMeta } -// CreateHeadBlock creates a new head block in dir that holds samples in the range [mint,maxt). -func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*HeadBlock, error) { +// TouchHeadBlock atomically touches a new head block in dir for +// samples in the range [mint,maxt). +func TouchHeadBlock(dir string, seq int, mint, maxt int64) error { // Make head block creation appear atomic. tmp := dir + ".tmp" if err := os.MkdirAll(tmp, 0777); err != nil { - return nil, err + return err } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) ulid, err := ulid.New(ulid.Now(), entropy) if err != nil { - return nil, err + return err } if err := writeMetaFile(tmp, &BlockMeta{ @@ -91,20 +92,13 @@ func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*Head MinTime: mint, MaxTime: maxt, }); err != nil { - return nil, err + return err } - if err := renameFile(tmp, dir); err != nil { - return nil, err - } - return OpenHeadBlock(dir, l) + return renameFile(tmp, dir) } // OpenHeadBlock opens the head block in dir. -func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { - wal, err := OpenSegmentWAL(dir, log.With(l, "component", "wal"), 5*time.Second) - if err != nil { - return nil, err - } +func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -119,8 +113,11 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { postings: &memPostings{m: make(map[term][]uint32)}, meta: *meta, } + return h, h.init() +} - r := wal.Reader() +func (h *HeadBlock) init() error { + r := h.wal.Reader() for r.Next() { series, samples := r.At() @@ -131,21 +128,17 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { } for _, s := range samples { if int(s.Ref) >= len(h.series) { - return nil, errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) + return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) } h.series[s.Ref].append(s.T, s.V) if !h.inBounds(s.T) { - return nil, errors.Wrap(ErrOutOfBounds, "consume WAL") + return errors.Wrap(ErrOutOfBounds, "consume WAL") } h.meta.Stats.NumSamples++ } } - if err := r.Err(); err != nil { - return nil, errors.Wrap(err, "consume WAL") - } - - return h, nil + return errors.Wrap(r.Err(), "consume WAL") } // inBounds returns true if the given timestamp is within the valid diff --git a/head_test.go b/head_test.go index 45b4f49d0..aa9138060 100644 --- a/head_test.go +++ b/head_test.go @@ -20,6 +20,7 @@ import ( "os" "sort" "testing" + "time" "unsafe" "github.com/pkg/errors" @@ -30,6 +31,19 @@ import ( "github.com/stretchr/testify/require" ) +// createTestHeadBlock creates a new head block with a SegmentWAL. +func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock { + err := TouchHeadBlock(dir, 0, mint, maxt) + require.NoError(t, err) + + wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) + require.NoError(t, err) + + h, err := OpenHeadBlock(dir, nil, wal) + require.NoError(t, err) + return h +} + func BenchmarkCreateSeries(b *testing.B) { lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6) require.NoError(b, err) @@ -39,8 +53,7 @@ func BenchmarkCreateSeries(b *testing.B) { require.NoError(b, err) defer os.RemoveAll(dir) - h, err := CreateHeadBlock(dir, 0, nil, 0, 1) - require.NoError(b, err) + h := createTestHeadBlock(b, dir, 0, 1) b.ReportAllocs() b.ResetTimer() @@ -90,14 +103,13 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { } func TestAmendDatapointCausesError(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - require.NoError(t, err, "Error creating head block") + hb := createTestHeadBlock(t, dir, 0, 1000) app := hb.Appender() - _, err = app.Add(labels.Labels{}, 0, 0) + _, err := app.Add(labels.Labels{}, 0, 0) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -107,14 +119,13 @@ func TestAmendDatapointCausesError(t *testing.T) { } func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - require.NoError(t, err, "Error creating head block") + hb := createTestHeadBlock(t, dir, 0, 1000) app := hb.Appender() - _, err = app.Add(labels.Labels{}, 0, math.NaN()) + _, err := app.Add(labels.Labels{}, 0, math.NaN()) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -124,14 +135,13 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { } func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - require.NoError(t, err, "Error creating head block") + hb := createTestHeadBlock(t, dir, 0, 1000) app := hb.Appender() - _, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) + _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -141,15 +151,14 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { } func TestSkippingInvalidValuesInSameTxn(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - require.NoError(t, err) + hb := createTestHeadBlock(t, dir, 0, 1000) // Append AmendedValue. app := hb.Appender() - _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 1) + _, err := app.Add(labels.Labels{{"a", "b"}}, 0, 1) require.NoError(t, err) _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2) require.NoError(t, err) @@ -243,11 +252,10 @@ func TestHeadBlock_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []sample{} } - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime) - require.NoError(t, err) + hb := createTestHeadBlock(t, dir, minTime, maxTime) app := hb.Appender() for _, l := range lbls {