From 8ab628b35467c95f5299ca6182827b2aafefefee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= Date: Fri, 13 Sep 2019 12:25:21 +0200 Subject: [PATCH] tsdb: allow readonly DB to create flush WAL (#6006) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR gives the readonly DB the ability to create blocks from the WAL. In order to implement this, we modify DBReadOnly.Blocks() to return an empty slice and no error if no blocks are found. xref: https://github.com/prometheus/tsdb/issues/346#issuecomment-520786524 Signed-off-by: Lucas Servén Marín --- tsdb/db.go | 51 +++++++++++++++++++++++++++++++--- tsdb/db_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index c80aa0704..e29aedf5b 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -286,6 +286,49 @@ func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { }, nil } +// FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. +// Samples that are in existing blocks will not be written to the new block. +// Note that if the read only database is running concurrently with a +// writable database then writing the WAL to the database directory can race. +func (db *DBReadOnly) FlushWAL(dir string) error { + blockReaders, err := db.Blocks() + if err != nil { + return errors.Wrap(err, "read blocks") + } + maxBlockTime := int64(math.MinInt64) + if len(blockReaders) > 0 { + maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime + } + w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal")) + if err != nil { + return err + } + head, err := NewHead(nil, db.logger, w, 1) + if err != nil { + return err + } + // Set the min valid time for the ingested wal samples + // to be no lower than the maxt of the last block. + if err := head.Init(maxBlockTime); err != nil { + return errors.Wrap(err, "read WAL") + } + mint := head.MinTime() + maxt := head.MaxTime() + rh := &rangeHead{ + head: head, + mint: mint, + maxt: maxt, + } + compactor, err := NewLeveledCompactor(context.Background(), nil, db.logger, DefaultOptions.BlockRanges, chunkenc.NewPool()) + if err != nil { + return errors.Wrap(err, "create leveled compactor") + } + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + _, err = compactor.Write(dir, rh, mint, maxt+1, nil) + return errors.Wrap(err, "writing WAL") +} + // Querier loads the wal and returns a new querier over the data partition for the given time range. // Current implementation doesn't support multiple Queriers. func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { @@ -294,12 +337,12 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { return nil, ErrClosed default: } - blocksReaders, err := db.Blocks() + blockReaders, err := db.Blocks() if err != nil { return nil, err } - blocks := make([]*Block, len(blocksReaders)) - for i, b := range blocksReaders { + blocks := make([]*Block, len(blockReaders)) + for i, b := range blockReaders { b, ok := b.(*Block) if !ok { return nil, errors.New("unable to convert a read only block to a normal block") @@ -380,7 +423,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { } if len(loadable) == 0 { - return nil, errors.New("no blocks found") + return nil, nil } sort.Slice(loadable, func(i, j int) bool { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 0ca2ef52f..9a4df6b82 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2359,3 +2359,76 @@ func TestDBReadOnlyClosing(t *testing.T) { _, err = db.Querier(0, 1) testutil.Equals(t, err, ErrClosed) } + +func TestDBReadOnly_FlushWAL(t *testing.T) { + var ( + dbDir string + logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + err error + maxt int + ) + + // Boostrap the db. + { + dbDir, err = ioutil.TempDir("", "test") + testutil.Ok(t, err) + + defer func() { + testutil.Ok(t, os.RemoveAll(dbDir)) + }() + + // Append data to the WAL. + db, err := Open(dbDir, logger, nil, nil) + testutil.Ok(t, err) + db.DisableCompactions() + app := db.Appender() + maxt = 1000 + for i := 0; i < maxt; i++ { + _, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + defer func() { testutil.Ok(t, db.Close()) }() + } + + // Flush WAL. + db, err := OpenDBReadOnly(dbDir, logger) + testutil.Ok(t, err) + + flush, err := ioutil.TempDir("", "flush") + testutil.Ok(t, err) + + defer func() { + testutil.Ok(t, os.RemoveAll(flush)) + }() + testutil.Ok(t, db.FlushWAL(flush)) + testutil.Ok(t, db.Close()) + + // Reopen the DB from the flushed WAL block. + db, err = OpenDBReadOnly(flush, logger) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, db.Close()) }() + blocks, err := db.Blocks() + testutil.Ok(t, err) + testutil.Equals(t, len(blocks), 1) + + querier, err := db.Querier(0, int64(maxt)-1) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, querier.Close()) }() + + // Sum the values. + seriesSet, err := querier.Select(labels.NewEqualMatcher(defaultLabelName, "flush")) + testutil.Ok(t, err) + + sum := 0.0 + for seriesSet.Next() { + series := seriesSet.At().Iterator() + for series.Next() { + _, v := series.At() + sum += v + } + testutil.Ok(t, series.Err()) + } + testutil.Ok(t, seriesSet.Err()) + testutil.Equals(t, 1000.0, sum) +}