diff --git a/tsdb/db_test.go b/tsdb/db_test.go index e4fa3e7d05..daafbd963e 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2858,3 +2858,95 @@ func TestChunkReader_ConcurrentReads(t *testing.T) { } testutil.Ok(t, r.Close()) } + +// TestCompactHead ensures that the head compaction +// creates a block that is ready for loading and +// does not cause data loss. +// This test: +// * opens a storage; +// * appends values; +// * compacts the head; and +// * queries the db to ensure the samples are present from the compacted head. +func TestCompactHead(t *testing.T) { + dbDir, err := ioutil.TempDir("", "testFlush") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dbDir)) }() + + // Open a DB and append data to the WAL. + tsdbCfg := &Options{ + RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond), + NoLockfile: true, + MinBlockDuration: int64(time.Hour * 2 / time.Millisecond), + MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond), + WALCompression: true, + } + + db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) + testutil.Ok(t, err) + app := db.Appender() + var expSamples []sample + maxt := 100 + for i := 0; i < maxt; i++ { + val := rand.Float64() + _, err := app.Add(labels.FromStrings("a", "b"), int64(i), val) + testutil.Ok(t, err) + expSamples = append(expSamples, sample{int64(i), val}) + } + testutil.Ok(t, app.Commit()) + + // Compact the Head to create a new block. + testutil.Ok(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1))) + testutil.Ok(t, db.Close()) + + // Delete everything but the new block and + // reopen the db to query it to ensure it includes the head data. + testutil.Ok(t, deleteNonBlocks(db.Dir())) + db, err = Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) + testutil.Ok(t, err) + testutil.Equals(t, 1, len(db.Blocks())) + testutil.Equals(t, int64(maxt), db.Head().MinTime()) + defer func() { testutil.Ok(t, db.Close()) }() + querier, err := db.Querier(context.Background(), 0, int64(maxt)-1) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, querier.Close()) }() + + seriesSet, _, err := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"}) + testutil.Ok(t, err) + var actSamples []sample + + for seriesSet.Next() { + series := seriesSet.At().Iterator() + for series.Next() { + time, val := series.At() + actSamples = append(actSamples, sample{int64(time), val}) + } + testutil.Ok(t, series.Err()) + } + testutil.Equals(t, expSamples, actSamples) + testutil.Ok(t, seriesSet.Err()) +} + +func deleteNonBlocks(dbDir string) error { + dirs, err := ioutil.ReadDir(dbDir) + if err != nil { + return err + } + for _, dir := range dirs { + if ok := isBlockDir(dir); !ok { + if err := os.RemoveAll(filepath.Join(dbDir, dir.Name())); err != nil { + return err + } + } + } + dirs, err = ioutil.ReadDir(dbDir) + if err != nil { + return err + } + for _, dir := range dirs { + if ok := isBlockDir(dir); !ok { + return errors.Errorf("root folder:%v still hase non block directory:%v", dbDir, dir.Name()) + } + } + + return nil +}