diff --git a/tsdb/db.go b/tsdb/db.go index c80aa07043..e29aedf5ba 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -286,6 +286,49 @@ func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { }, nil } +// FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. +// Samples that are in existing blocks will not be written to the new block. +// Note that if the read only database is running concurrently with a +// writable database then writing the WAL to the database directory can race. +func (db *DBReadOnly) FlushWAL(dir string) error { + blockReaders, err := db.Blocks() + if err != nil { + return errors.Wrap(err, "read blocks") + } + maxBlockTime := int64(math.MinInt64) + if len(blockReaders) > 0 { + maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime + } + w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal")) + if err != nil { + return err + } + head, err := NewHead(nil, db.logger, w, 1) + if err != nil { + return err + } + // Set the min valid time for the ingested wal samples + // to be no lower than the maxt of the last block. + if err := head.Init(maxBlockTime); err != nil { + return errors.Wrap(err, "read WAL") + } + mint := head.MinTime() + maxt := head.MaxTime() + rh := &rangeHead{ + head: head, + mint: mint, + maxt: maxt, + } + compactor, err := NewLeveledCompactor(context.Background(), nil, db.logger, DefaultOptions.BlockRanges, chunkenc.NewPool()) + if err != nil { + return errors.Wrap(err, "create leveled compactor") + } + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + _, err = compactor.Write(dir, rh, mint, maxt+1, nil) + return errors.Wrap(err, "writing WAL") +} + // Querier loads the wal and returns a new querier over the data partition for the given time range. // Current implementation doesn't support multiple Queriers. func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { @@ -294,12 +337,12 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { return nil, ErrClosed default: } - blocksReaders, err := db.Blocks() + blockReaders, err := db.Blocks() if err != nil { return nil, err } - blocks := make([]*Block, len(blocksReaders)) - for i, b := range blocksReaders { + blocks := make([]*Block, len(blockReaders)) + for i, b := range blockReaders { b, ok := b.(*Block) if !ok { return nil, errors.New("unable to convert a read only block to a normal block") @@ -380,7 +423,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { } if len(loadable) == 0 { - return nil, errors.New("no blocks found") + return nil, nil } sort.Slice(loadable, func(i, j int) bool { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 0ca2ef52fa..9a4df6b820 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2359,3 +2359,76 @@ func TestDBReadOnlyClosing(t *testing.T) { _, err = db.Querier(0, 1) testutil.Equals(t, err, ErrClosed) } + +func TestDBReadOnly_FlushWAL(t *testing.T) { + var ( + dbDir string + logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + err error + maxt int + ) + + // Boostrap the db. + { + dbDir, err = ioutil.TempDir("", "test") + testutil.Ok(t, err) + + defer func() { + testutil.Ok(t, os.RemoveAll(dbDir)) + }() + + // Append data to the WAL. + db, err := Open(dbDir, logger, nil, nil) + testutil.Ok(t, err) + db.DisableCompactions() + app := db.Appender() + maxt = 1000 + for i := 0; i < maxt; i++ { + _, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + defer func() { testutil.Ok(t, db.Close()) }() + } + + // Flush WAL. + db, err := OpenDBReadOnly(dbDir, logger) + testutil.Ok(t, err) + + flush, err := ioutil.TempDir("", "flush") + testutil.Ok(t, err) + + defer func() { + testutil.Ok(t, os.RemoveAll(flush)) + }() + testutil.Ok(t, db.FlushWAL(flush)) + testutil.Ok(t, db.Close()) + + // Reopen the DB from the flushed WAL block. + db, err = OpenDBReadOnly(flush, logger) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, db.Close()) }() + blocks, err := db.Blocks() + testutil.Ok(t, err) + testutil.Equals(t, len(blocks), 1) + + querier, err := db.Querier(0, int64(maxt)-1) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, querier.Close()) }() + + // Sum the values. + seriesSet, err := querier.Select(labels.NewEqualMatcher(defaultLabelName, "flush")) + testutil.Ok(t, err) + + sum := 0.0 + for seriesSet.Next() { + series := seriesSet.At().Iterator() + for series.Next() { + _, v := series.At() + sum += v + } + testutil.Ok(t, series.Err()) + } + testutil.Ok(t, seriesSet.Err()) + testutil.Equals(t, 1000.0, sum) +}