tsdb: allow readonly DB to create flush WAL (#6006)

This PR gives the readonly DB the ability to create blocks from the WAL.
In order to implement this, we modify DBReadOnly.Blocks() to return an
empty slice and no error if no blocks are found.
xref: https://github.com/prometheus/tsdb/issues/346#issuecomment-520786524

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
This commit is contained in:
Lucas Servén Marín 2019-09-13 12:25:21 +02:00 committed by Bartlomiej Plotka
parent 26e8d25e0b
commit 8ab628b354
2 changed files with 120 additions and 4 deletions

View file

@ -286,6 +286,49 @@ func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) {
}, nil }, nil
} }
// FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL.
// Samples that are in existing blocks will not be written to the new block.
// Note that if the read only database is running concurrently with a
// writable database then writing the WAL to the database directory can race.
func (db *DBReadOnly) FlushWAL(dir string) error {
blockReaders, err := db.Blocks()
if err != nil {
return errors.Wrap(err, "read blocks")
}
maxBlockTime := int64(math.MinInt64)
if len(blockReaders) > 0 {
maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime
}
w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal"))
if err != nil {
return err
}
head, err := NewHead(nil, db.logger, w, 1)
if err != nil {
return err
}
// Set the min valid time for the ingested wal samples
// to be no lower than the maxt of the last block.
if err := head.Init(maxBlockTime); err != nil {
return errors.Wrap(err, "read WAL")
}
mint := head.MinTime()
maxt := head.MaxTime()
rh := &rangeHead{
head: head,
mint: mint,
maxt: maxt,
}
compactor, err := NewLeveledCompactor(context.Background(), nil, db.logger, DefaultOptions.BlockRanges, chunkenc.NewPool())
if err != nil {
return errors.Wrap(err, "create leveled compactor")
}
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
_, err = compactor.Write(dir, rh, mint, maxt+1, nil)
return errors.Wrap(err, "writing WAL")
}
// Querier loads the wal and returns a new querier over the data partition for the given time range. // Querier loads the wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers. // Current implementation doesn't support multiple Queriers.
func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) {
@ -294,12 +337,12 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) {
return nil, ErrClosed return nil, ErrClosed
default: default:
} }
blocksReaders, err := db.Blocks() blockReaders, err := db.Blocks()
if err != nil { if err != nil {
return nil, err return nil, err
} }
blocks := make([]*Block, len(blocksReaders)) blocks := make([]*Block, len(blockReaders))
for i, b := range blocksReaders { for i, b := range blockReaders {
b, ok := b.(*Block) b, ok := b.(*Block)
if !ok { if !ok {
return nil, errors.New("unable to convert a read only block to a normal block") return nil, errors.New("unable to convert a read only block to a normal block")
@ -380,7 +423,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
} }
if len(loadable) == 0 { if len(loadable) == 0 {
return nil, errors.New("no blocks found") return nil, nil
} }
sort.Slice(loadable, func(i, j int) bool { sort.Slice(loadable, func(i, j int) bool {

View file

@ -2359,3 +2359,76 @@ func TestDBReadOnlyClosing(t *testing.T) {
_, err = db.Querier(0, 1) _, err = db.Querier(0, 1)
testutil.Equals(t, err, ErrClosed) testutil.Equals(t, err, ErrClosed)
} }
func TestDBReadOnly_FlushWAL(t *testing.T) {
var (
dbDir string
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
err error
maxt int
)
// Boostrap the db.
{
dbDir, err = ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dbDir))
}()
// Append data to the WAL.
db, err := Open(dbDir, logger, nil, nil)
testutil.Ok(t, err)
db.DisableCompactions()
app := db.Appender()
maxt = 1000
for i := 0; i < maxt; i++ {
_, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
defer func() { testutil.Ok(t, db.Close()) }()
}
// Flush WAL.
db, err := OpenDBReadOnly(dbDir, logger)
testutil.Ok(t, err)
flush, err := ioutil.TempDir("", "flush")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(flush))
}()
testutil.Ok(t, db.FlushWAL(flush))
testutil.Ok(t, db.Close())
// Reopen the DB from the flushed WAL block.
db, err = OpenDBReadOnly(flush, logger)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, db.Close()) }()
blocks, err := db.Blocks()
testutil.Ok(t, err)
testutil.Equals(t, len(blocks), 1)
querier, err := db.Querier(0, int64(maxt)-1)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, querier.Close()) }()
// Sum the values.
seriesSet, err := querier.Select(labels.NewEqualMatcher(defaultLabelName, "flush"))
testutil.Ok(t, err)
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
for series.Next() {
_, v := series.At()
sum += v
}
testutil.Ok(t, series.Err())
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, 1000.0, sum)
}