diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 4ce98dfc5..5aad119ae 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -32,8 +32,9 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/tsdb" tsdbLabels "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wal" ) // String constants for instrumentation. @@ -191,7 +192,7 @@ type QueueManager struct { externalLabels labels.Labels relabelConfigs []*relabel.Config client StorageClient - watcher *WALWatcher + watcher *wal.Watcher seriesLabels map[uint64]labels.Labels seriesSegmentIndexes map[uint64]int @@ -223,7 +224,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { +func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { if logger == nil { logger = log.NewNopLogger() } @@ -252,7 +253,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), } - t.watcher = NewWALWatcher(logger, name, t, walDir) + t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, name, t, walDir) t.shards = t.newShards() return t @@ -260,7 +261,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. -func (t *QueueManager) Append(samples []tsdb.RefSample) bool { +func (t *QueueManager) Append(samples []record.RefSample) bool { outer: for _, s := range samples { lbls, ok := t.seriesLabels[s.Ref] @@ -376,7 +377,7 @@ func (t *QueueManager) Stop() { } // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. -func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { +func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { for _, s := range series { ls := processExternalLabels(s.Labels, t.externalLabels) lbls := relabel.Process(ls, t.relabelConfigs...) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index bb54b0b7c..229bcacaf 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -36,8 +36,8 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/tsdb" tsdbLabels "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/util/testutil" ) @@ -60,7 +60,7 @@ func TestSampleDelivery(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) // These should be received by the client. @@ -88,7 +88,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -106,16 +106,16 @@ func TestSampleDeliveryTimeout(t *testing.T) { func TestSampleDeliveryOrder(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts - samples := make([]tsdb.RefSample, 0, n) - series := make([]tsdb.RefSeries, 0, n) + samples := make([]record.RefSample, 0, n) + series := make([]record.RefSeries, 0, n) for i := 0; i < n; i++ { name := fmt.Sprintf("test_metric_%d", i%ts) - samples = append(samples, tsdb.RefSample{ + samples = append(samples, record.RefSample{ Ref: uint64(i), T: int64(i), V: float64(i), }) - series = append(series, tsdb.RefSeries{ + series = append(series, record.RefSeries{ Ref: uint64(i), Labels: tsdbLabels.Labels{tsdbLabels.Label{Name: "__name__", Value: name}}, }) @@ -128,7 +128,7 @@ func TestSampleDeliveryOrder(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -146,7 +146,7 @@ func TestShutdown(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend) m.StoreSeries(series, 0) m.Start() @@ -182,11 +182,11 @@ func TestSeriesReset(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) for i := 0; i < numSegments; i++ { - series := []tsdb.RefSeries{} + series := []record.RefSeries{} for j := 0; j < numSeries; j++ { - series = append(series, tsdb.RefSeries{Ref: uint64((i * 100) + j), Labels: tsdbLabels.Labels{{Name: "a", Value: "a"}}}) + series = append(series, record.RefSeries{Ref: uint64((i * 100) + j), Labels: tsdbLabels.Labels{{Name: "a", Value: "a"}}}) } m.StoreSeries(series, i) } @@ -210,7 +210,7 @@ func TestReshard(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -242,7 +242,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { - m = NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m = NewQueueManager(nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() h.Unlock() h.Lock() @@ -259,11 +259,11 @@ func TestReshardRaceWithStop(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { c := NewTestStorageClient() - m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() for i := 1; i < 1000; i++ { - m.StoreSeries([]tsdb.RefSeries{ + m.StoreSeries([]record.RefSeries{ { Ref: uint64(i), Labels: tsdbLabels.Labels{ @@ -281,17 +281,17 @@ func TestReleaseNoninternedString(t *testing.T) { testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric)) } -func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) { - samples := make([]tsdb.RefSample, 0, n) - series := make([]tsdb.RefSeries, 0, n) +func createTimeseries(n int) ([]record.RefSample, []record.RefSeries) { + samples := make([]record.RefSample, 0, n) + series := make([]record.RefSeries, 0, n) for i := 0; i < n; i++ { name := fmt.Sprintf("test_metric_%d", i) - samples = append(samples, tsdb.RefSample{ + samples = append(samples, record.RefSample{ Ref: uint64(i), T: int64(i), V: float64(i), }) - series = append(series, tsdb.RefSeries{ + series = append(series, record.RefSeries{ Ref: uint64(i), Labels: tsdbLabels.Labels{{Name: "__name__", Value: name}}, }) @@ -299,7 +299,7 @@ func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) { return samples, series } -func getSeriesNameFromRef(r tsdb.RefSeries) string { +func getSeriesNameFromRef(r record.RefSeries) string { for _, l := range r.Labels { if l.Name == "__name__" { return l.Value @@ -323,7 +323,7 @@ func NewTestStorageClient() *TestStorageClient { } } -func (c *TestStorageClient) expectSamples(ss []tsdb.RefSample, series []tsdb.RefSeries) { +func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { c.mtx.Lock() defer c.mtx.Unlock() @@ -351,7 +351,7 @@ func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { } } -func (c *TestStorageClient) expectSampleCount(ss []tsdb.RefSample) { +func (c *TestStorageClient) expectSampleCount(ss []record.RefSample) { c.mtx.Lock() defer c.mtx.Unlock() c.wg.Add(len(ss)) @@ -443,7 +443,7 @@ func BenchmarkSampleDelivery(b *testing.B) { testutil.Ok(b, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) // These should be received by the client. @@ -484,12 +484,12 @@ func BenchmarkStartup(b *testing.B) { for n := 0; n < b.N; n++ { c := NewTestBlockedStorageClient() - m := NewQueueManager(logger, dir, + m := NewQueueManager(nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) - m.watcher.startTime = math.MaxInt64 - m.watcher.maxSegment = segments[len(segments)-2] - err := m.watcher.run() + m.watcher.StartTime = math.MaxInt64 + m.watcher.MaxSegment = segments[len(segments)-2] + err := m.watcher.Run() testutil.Ok(b, err) } } diff --git a/storage/remote/write.go b/storage/remote/write.go index c322a4347..aded4e27c 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -141,6 +141,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { return err } newQueues = append(newQueues, NewQueueManager( + prometheus.DefaultRegisterer, rws.logger, rws.walDir, rws.samplesIn, diff --git a/tsdb/block.go b/tsdb/block.go index 0030fbd68..f71d4c912 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" ) // IndexWriter serializes the index for a block of series data. @@ -135,8 +136,8 @@ type BlockReader interface { // Chunks returns a ChunkReader over the block's data. Chunks() (ChunkReader, error) - // Tombstones returns a TombstoneReader over the block's deleted data. - Tombstones() (TombstoneReader, error) + // Tombstones returns a tombstones.Reader over the block's deleted data. + Tombstones() (tombstones.Reader, error) // Meta provides meta information about the block reader. Meta() BlockMeta @@ -279,7 +280,7 @@ type Block struct { chunkr ChunkReader indexr IndexReader - tombstones TombstoneReader + tombstones tombstones.Reader logger log.Logger @@ -321,7 +322,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er } closers = append(closers, ir) - tr, sizeTomb, err := readTombstones(dir) + tr, sizeTomb, err := tombstones.ReadTombstones(dir) if err != nil { return nil, err } @@ -412,11 +413,11 @@ func (pb *Block) Chunks() (ChunkReader, error) { } // Tombstones returns a new TombstoneReader against the block data. -func (pb *Block) Tombstones() (TombstoneReader, error) { +func (pb *Block) Tombstones() (tombstones.Reader, error) { if err := pb.startRead(); err != nil { return nil, err } - return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil + return blockTombstoneReader{Reader: pb.tombstones, b: pb}, nil } // GetSymbolTableSize returns the Symbol Table Size in the index of this block. @@ -483,7 +484,7 @@ func (r blockIndexReader) Close() error { } type blockTombstoneReader struct { - TombstoneReader + tombstones.Reader b *Block } @@ -519,7 +520,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := newMemTombstones() + stones := tombstones.NewMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -535,7 +536,7 @@ Outer: if chk.OverlapsClosedInterval(mint, maxt) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) - stones.addInterval(p.At(), Interval{tmin, tmax}) + stones.AddInterval(p.At(), tombstones.Interval{Mint: tmin, Maxt: tmax}) continue Outer } } @@ -545,9 +546,9 @@ Outer: return p.Err() } - err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + err = pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error { for _, iv := range ivs { - stones.addInterval(id, iv) + stones.AddInterval(id, iv) } return nil }) @@ -557,7 +558,7 @@ Outer: pb.tombstones = stones pb.meta.Stats.NumTombstones = pb.tombstones.Total() - n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones) + n, err := tombstones.WriteFile(pb.logger, pb.dir, pb.tombstones) if err != nil { return err } @@ -575,7 +576,7 @@ Outer: func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { numStones := 0 - if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error { numStones += len(ivs) return nil }); err != nil { @@ -610,7 +611,7 @@ func (pb *Block) Snapshot(dir string) error { for _, fname := range []string{ metaFilename, indexFilename, - tombstoneFilename, + tombstones.TombstonesFilename, } { if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil { return errors.Wrapf(err, "create snapshot %s", fname) diff --git a/tsdb/compact.go b/tsdb/compact.go index f88d27fba..5a39a9650 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" ) // ExponentialBlockRanges returns the time ranges based on the stepSize. @@ -607,7 +608,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil { + if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -768,7 +769,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, // // TODO think how to avoid the typecasting to verify when it is head block. if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime { - dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) + dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) } else // Sanity check for disk blocks. @@ -876,15 +877,15 @@ type compactionSeriesSet struct { p index.Postings index IndexReader chunks ChunkReader - tombstones TombstoneReader + tombstones tombstones.Reader l labels.Labels c []chunks.Meta - intervals Intervals + intervals tombstones.Intervals err error } -func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p index.Postings) *compactionSeriesSet { +func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings) *compactionSeriesSet { return &compactionSeriesSet{ index: i, chunks: c, @@ -914,7 +915,7 @@ func (c *compactionSeriesSet) Next() bool { if len(c.intervals) > 0 { chks := make([]chunks.Meta, 0, len(c.c)) for _, chk := range c.c { - if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { + if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(c.intervals)) { chks = append(chks, chk) } } @@ -942,7 +943,7 @@ func (c *compactionSeriesSet) Err() error { return c.p.Err() } -func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) { +func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return c.l, c.c, c.intervals } @@ -952,7 +953,7 @@ type compactionMerger struct { aok, bok bool l labels.Labels c []chunks.Meta - intervals Intervals + intervals tombstones.Intervals } func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { @@ -1008,7 +1009,7 @@ func (c *compactionMerger) Next() bool { _, cb, rb := c.b.At() for _, r := range rb { - ra = ra.add(r) + ra = ra.Add(r) } c.l = append(c.l[:0], l...) @@ -1029,6 +1030,6 @@ func (c *compactionMerger) Err() error { return c.b.Err() } -func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) { +func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return c.l, c.c, c.intervals } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 3b56cacd2..7ce08b5ff 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/util/testutil" ) @@ -455,10 +456,10 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { type erringBReader struct{} -func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } -func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } -func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } -func (erringBReader) Meta() BlockMeta { return BlockMeta{} } +func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } +func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } +func (erringBReader) Tombstones() (tombstones.Reader, error) { return nil, errors.New("tombstones") } +func (erringBReader) Meta() BlockMeta { return BlockMeta{} } type nopChunkWriter struct{} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 9a4df6b82..049192d33 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -33,6 +33,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/util/testutil" @@ -243,27 +245,27 @@ func TestDeleteSimple(t *testing.T) { numSamples := int64(10) cases := []struct { - intervals Intervals + Intervals tombstones.Intervals remaint []int64 }{ { - intervals: Intervals{{0, 3}}, + Intervals: tombstones.Intervals{{Mint: 0, Maxt: 3}}, remaint: []int64{4, 5, 6, 7, 8, 9}, }, { - intervals: Intervals{{1, 3}}, + Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}}, remaint: []int64{0, 4, 5, 6, 7, 8, 9}, }, { - intervals: Intervals{{1, 3}, {4, 7}}, + Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}}, remaint: []int64{0, 8, 9}, }, { - intervals: Intervals{{1, 3}, {4, 700}}, + Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 700}}, remaint: []int64{0}, }, { // This case is to ensure that labels and symbols are deleted. - intervals: Intervals{{0, 9}}, + Intervals: tombstones.Intervals{{Mint: 0, Maxt: 9}}, remaint: []int64{}, }, } @@ -288,7 +290,7 @@ Outer: // TODO(gouthamve): Reset the tombstones somehow. // Delete the ranges. - for _, r := range c.intervals { + for _, r := range c.Intervals { testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) } @@ -561,11 +563,11 @@ func TestDB_SnapshotWithDelete(t *testing.T) { testutil.Ok(t, app.Commit()) cases := []struct { - intervals Intervals + intervals tombstones.Intervals remaint []int64 }{ { - intervals: Intervals{{1, 3}, {4, 7}}, + intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}}, remaint: []int64{0, 8, 9}, }, } @@ -888,11 +890,11 @@ func TestTombstoneClean(t *testing.T) { testutil.Ok(t, app.Commit()) cases := []struct { - intervals Intervals + intervals tombstones.Intervals remaint []int64 }{ { - intervals: Intervals{{1, 3}, {4, 7}}, + intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}}, remaint: []int64{0, 8, 9}, }, } @@ -964,7 +966,7 @@ func TestTombstoneClean(t *testing.T) { } for _, b := range db.Blocks() { - testutil.Equals(t, newMemTombstones(), b.tombstones) + testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones) } } } @@ -990,8 +992,8 @@ func TestTombstoneCleanFail(t *testing.T) { block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. - tomb := newMemTombstones() - tomb.addInterval(0, Interval{0, 1}) + tomb := tombstones.NewMemTombstones() + tomb.AddInterval(0, tombstones.Interval{Mint: 0, Maxt: 1}) block.tombstones = tomb db.blocks = append(db.blocks, block) @@ -1470,13 +1472,13 @@ func TestInitializeHeadTimestamp(t *testing.T) { w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) - var enc RecordEncoder + var enc record.Encoder err = w.Log( - enc.Series([]RefSeries{ + enc.Series([]record.RefSeries{ {Ref: 123, Labels: labels.FromStrings("a", "1")}, {Ref: 124, Labels: labels.FromStrings("a", "2")}, }, nil), - enc.Samples([]RefSample{ + enc.Samples([]record.RefSample{ {Ref: 123, T: 5000, V: 1}, {Ref: 124, T: 15000, V: 1}, }, nil), @@ -1520,13 +1522,13 @@ func TestInitializeHeadTimestamp(t *testing.T) { w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) - var enc RecordEncoder + var enc record.Encoder err = w.Log( - enc.Series([]RefSeries{ + enc.Series([]record.RefSeries{ {Ref: 123, Labels: labels.FromStrings("a", "1")}, {Ref: 124, Labels: labels.FromStrings("a", "2")}, }, nil), - enc.Samples([]RefSample{ + enc.Samples([]record.RefSample{ {Ref: 123, T: 5000, V: 1}, {Ref: 124, T: 15000, V: 1}, }, nil), diff --git a/tsdb/head.go b/tsdb/head.go index 6c9975a13..9fbe07c9b 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -33,6 +33,8 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/wal" ) @@ -54,7 +56,7 @@ var ( // emptyTombstoneReader is a no-op Tombstone Reader. // This is used by head to satisfy the Tombstones() function call. - emptyTombstoneReader = newMemTombstones() + emptyTombstoneReader = tombstones.NewMemTombstones() ) // Head handles reads and writes of time series data within a time window. @@ -64,6 +66,7 @@ type Head struct { wal *wal.WAL logger log.Logger appendPool sync.Pool + seriesPool sync.Pool bytesPool sync.Pool numSeries uint64 @@ -256,7 +259,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( minValidTime int64, - input <-chan []RefSample, output chan<- []RefSample, + input <-chan []record.RefSample, output chan<- []record.RefSample, ) (unknownRefs uint64) { defer close(output) @@ -331,8 +334,8 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { wg sync.WaitGroup multiRefLock sync.Mutex n = runtime.GOMAXPROCS(0) - inputs = make([]chan []RefSample, n) - outputs = make([]chan []RefSample, n) + inputs = make([]chan []record.RefSample, n) + outputs = make([]chan []record.RefSample, n) ) wg.Add(n) @@ -349,10 +352,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { }() for i := 0; i < n; i++ { - outputs[i] = make(chan []RefSample, 300) - inputs[i] = make(chan []RefSample, 300) + outputs[i] = make(chan []record.RefSample, 300) + inputs[i] = make(chan []record.RefSample, 300) - go func(input <-chan []RefSample, output chan<- []RefSample) { + go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { unknown := h.processWALSamples(h.minValidTime, input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() @@ -360,11 +363,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } var ( - dec RecordDecoder - series []RefSeries - samples []RefSample - tstones []Stone - allStones = newMemTombstones() + dec record.Decoder + series []record.RefSeries + samples []record.RefSample + tstones []tombstones.Stone + allStones = tombstones.NewMemTombstones() ) defer func() { if err := allStones.Close(); err != nil { @@ -376,7 +379,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: series, err = dec.Series(rec, series) if err != nil { return &wal.CorruptionErr{ @@ -399,7 +402,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { h.lastSeriesID = s.Ref } } - case RecordSamples: + case record.Samples: samples, err = dec.Samples(rec, samples) s := samples if err != nil { @@ -418,9 +421,9 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { if len(samples) < m { m = len(samples) } - shards := make([][]RefSample, n) + shards := make([][]record.RefSample, n) for i := 0; i < n; i++ { - var buf []RefSample + var buf []record.RefSample select { case buf = <-outputs[i]: default: @@ -440,7 +443,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { samples = samples[m:] } samples = s // Keep whole slice for reuse. - case RecordTombstones: + case record.Tombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { return &wal.CorruptionErr{ @@ -450,15 +453,15 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } } for _, s := range tstones { - for _, itv := range s.intervals { + for _, itv := range s.Intervals { if itv.Maxt < h.minValidTime { continue } - if m := h.series.getByID(s.ref); m == nil { + if m := h.series.getByID(s.Ref); m == nil { unknownRefs++ continue } - allStones.addInterval(s.ref, itv) + allStones.AddInterval(s.Ref, itv) } } default: @@ -482,7 +485,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { return errors.Wrap(r.Err(), "read records") } - if err := allStones.Iter(func(ref uint64, dranges Intervals) error { + if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error { return h.chunkRewrite(ref, dranges) }); err != nil { return errors.Wrap(r.Err(), "deleting samples from tombstones") @@ -508,8 +511,8 @@ func (h *Head) Init(minValidTime int64) error { level.Info(h.logger).Log("msg", "replaying WAL, this may take awhile") // Backfill the checkpoint first if it exists. - dir, startFrom, err := LastCheckpoint(h.wal.Dir()) - if err != nil && err != ErrNotFound { + dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) + if err != nil && err != record.ErrNotFound { return errors.Wrap(err, "find last checkpoint") } multiRef := map[uint64]uint64{} @@ -629,7 +632,7 @@ func (h *Head) Truncate(mint int64) (err error) { return ok } h.metrics.checkpointCreationTotal.Inc() - if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil { + if _, err = wal.Checkpoint(h.wal, first, last, keep, mint); err != nil { h.metrics.checkpointCreationFail.Inc() return errors.Wrap(err, "create checkpoint") } @@ -651,7 +654,7 @@ func (h *Head) Truncate(mint int64) (err error) { h.deletedMtx.Unlock() h.metrics.checkpointDeleteTotal.Inc() - if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil { + if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil { // Leftover old checkpoints do not cause problems down the line beyond // occupying disk space. // They will just be ignored since a higher checkpoint exists. @@ -693,7 +696,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) { return h.head.chunksRange(h.mint, h.maxt), nil } -func (h *rangeHead) Tombstones() (TombstoneReader, error) { +func (h *rangeHead) Tombstones() (tombstones.Reader, error) { return emptyTombstoneReader, nil } @@ -779,6 +782,7 @@ func (h *Head) appender() *headAppender { mint: math.MaxInt64, maxt: math.MinInt64, samples: h.getAppendBuffer(), + sampleSeries: h.getSeriesBuffer(), } } @@ -789,19 +793,32 @@ func max(a, b int64) int64 { return b } -func (h *Head) getAppendBuffer() []RefSample { +func (h *Head) getAppendBuffer() []record.RefSample { b := h.appendPool.Get() if b == nil { - return make([]RefSample, 0, 512) + return make([]record.RefSample, 0, 512) } - return b.([]RefSample) + return b.([]record.RefSample) } -func (h *Head) putAppendBuffer(b []RefSample) { +func (h *Head) putAppendBuffer(b []record.RefSample) { //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. h.appendPool.Put(b[:0]) } +func (h *Head) getSeriesBuffer() []*memSeries { + b := h.seriesPool.Get() + if b == nil { + return make([]*memSeries, 0, 512) + } + return b.([]*memSeries) +} + +func (h *Head) putSeriesBuffer(b []*memSeries) { + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + h.seriesPool.Put(b[:0]) +} + func (h *Head) getBytesBuffer() []byte { b := h.bytesPool.Get() if b == nil { @@ -820,8 +837,9 @@ type headAppender struct { minValidTime int64 // No samples below this timestamp are allowed. mint, maxt int64 - series []RefSeries - samples []RefSample + series []record.RefSeries + samples []record.RefSample + sampleSeries []*memSeries } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -834,7 +852,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro s, created := a.head.getOrCreate(lset.Hash(), lset) if created { - a.series = append(a.series, RefSeries{ + a.series = append(a.series, record.RefSeries{ Ref: s.ref, Labels: lset, }) @@ -866,12 +884,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { a.maxt = t } - a.samples = append(a.samples, RefSample{ - Ref: ref, - T: t, - V: v, - series: s, + a.samples = append(a.samples, record.RefSample{ + Ref: ref, + T: t, + V: v, }) + a.sampleSeries = append(a.sampleSeries, s) return nil } @@ -884,7 +902,7 @@ func (a *headAppender) log() error { defer func() { a.head.putBytesBuffer(buf) }() var rec []byte - var enc RecordEncoder + var enc record.Encoder if len(a.series) > 0 { rec = enc.Series(a.series, buf) @@ -908,18 +926,20 @@ func (a *headAppender) log() error { func (a *headAppender) Commit() error { defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) + defer a.head.putSeriesBuffer(a.sampleSeries) if err := a.log(); err != nil { return errors.Wrap(err, "write to WAL") } total := len(a.samples) - - for _, s := range a.samples { - s.series.Lock() - ok, chunkCreated := s.series.append(s.T, s.V) - s.series.pendingCommit = false - s.series.Unlock() + var series *memSeries + for i, s := range a.samples { + series = a.sampleSeries[i] + series.Lock() + ok, chunkCreated := series.append(s.T, s.V) + series.pendingCommit = false + series.Unlock() if !ok { total-- @@ -938,10 +958,12 @@ func (a *headAppender) Commit() error { func (a *headAppender) Rollback() error { a.head.metrics.activeAppenders.Dec() - for _, s := range a.samples { - s.series.Lock() - s.series.pendingCommit = false - s.series.Unlock() + var series *memSeries + for i := range a.samples { + series = a.sampleSeries[i] + series.Lock() + series.pendingCommit = false + series.Unlock() } a.head.putAppendBuffer(a.samples) @@ -964,7 +986,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { return errors.Wrap(err, "select series") } - var stones []Stone + var stones []tombstones.Stone dirty := false for p.Next() { series := h.series.getByID(p.At()) @@ -976,9 +998,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { // Delete only until the current values and not beyond. t0, t1 = clampInterval(mint, maxt, t0, t1) if h.wal != nil { - stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}}) } - if err := h.chunkRewrite(p.At(), Intervals{{t0, t1}}); err != nil { + if err := h.chunkRewrite(p.At(), tombstones.Intervals{{Mint: t0, Maxt: t1}}); err != nil { return errors.Wrap(err, "delete samples") } dirty = true @@ -986,7 +1008,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { if p.Err() != nil { return p.Err() } - var enc RecordEncoder + var enc record.Encoder if h.wal != nil { // Although we don't store the stones in the head // we need to write them to the WAL to mark these as deleted @@ -1005,7 +1027,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { // chunkRewrite re-writes the chunks which overlaps with deleted ranges // and removes the samples in the deleted ranges. // Chunks is deleted if no samples are left at the end. -func (h *Head) chunkRewrite(ref uint64, dranges Intervals) (err error) { +func (h *Head) chunkRewrite(ref uint64, dranges tombstones.Intervals) (err error) { if len(dranges) == 0 { return nil } @@ -1097,7 +1119,7 @@ func (h *Head) gc() { } // Tombstones returns a new reader over the head's tombstones -func (h *Head) Tombstones() (TombstoneReader, error) { +func (h *Head) Tombstones() (tombstones.Reader, error) { return emptyTombstoneReader, nil } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index acfc82a5e..784a79239 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -30,6 +30,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/util/testutil" @@ -51,14 +53,14 @@ func BenchmarkCreateSeries(b *testing.B) { } func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { - var enc RecordEncoder + var enc record.Encoder for _, r := range recs { switch v := r.(type) { - case []RefSeries: + case []record.RefSeries: testutil.Ok(t, w.Log(enc.Series(v, nil))) - case []RefSample: + case []record.RefSample: testutil.Ok(t, w.Log(enc.Samples(v, nil))) - case []Stone: + case []tombstones.Stone: testutil.Ok(t, w.Log(enc.Tombstones(v, nil))) } } @@ -69,22 +71,22 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { testutil.Ok(t, err) defer sr.Close() - var dec RecordDecoder + var dec record.Decoder r := wal.NewReader(sr) for r.Next() { rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: series, err := dec.Series(rec, nil) testutil.Ok(t, err) recs = append(recs, series) - case RecordSamples: + case record.Samples: samples, err := dec.Samples(rec, nil) testutil.Ok(t, err) recs = append(recs, samples) - case RecordTombstones: + case record.Tombstones: tstones, err := dec.Tombstones(rec, nil) testutil.Ok(t, err) recs = append(recs, tstones) @@ -100,28 +102,28 @@ func TestHead_ReadWAL(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { entries := []interface{}{ - []RefSeries{ + []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, {Ref: 11, Labels: labels.FromStrings("a", "2")}, {Ref: 100, Labels: labels.FromStrings("a", "3")}, }, - []RefSample{ + []record.RefSample{ {Ref: 0, T: 99, V: 1}, {Ref: 10, T: 100, V: 2}, {Ref: 100, T: 100, V: 3}, }, - []RefSeries{ + []record.RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "4")}, // This series has two refs pointing to it. {Ref: 101, Labels: labels.FromStrings("a", "3")}, }, - []RefSample{ + []record.RefSample{ {Ref: 10, T: 101, V: 5}, {Ref: 50, T: 101, V: 6}, {Ref: 101, T: 101, V: 7}, }, - []Stone{ - {ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}}, + []tombstones.Stone{ + {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, }, } dir, err := ioutil.TempDir("", "test_read_wal") @@ -326,14 +328,14 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { entries := []interface{}{ - []RefSeries{ + []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, }, - []RefSample{}, - []RefSeries{ + []record.RefSample{}, + []record.RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "2")}, }, - []RefSample{ + []record.RefSample{ {Ref: 50, T: 80, V: 1}, {Ref: 50, T: 90, V: 1}, }, @@ -371,27 +373,27 @@ func TestHeadDeleteSimple(t *testing.T) { lblDefault := labels.Label{Name: "a", Value: "b"} cases := []struct { - dranges Intervals + dranges tombstones.Intervals smplsExp []sample }{ { - dranges: Intervals{{0, 3}}, + dranges: tombstones.Intervals{{Mint: 0, Maxt: 3}}, smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}), }, { - dranges: Intervals{{1, 3}}, + dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}}, smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}), }, { - dranges: Intervals{{1, 3}, {4, 7}}, + dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}}, smplsExp: buildSmpls([]int64{0, 8, 9}), }, { - dranges: Intervals{{1, 3}, {4, 700}}, + dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 700}}, smplsExp: buildSmpls([]int64{0}), }, { // This case is to ensure that labels and symbols are deleted. - dranges: Intervals{{0, 9}}, + dranges: tombstones.Intervals{{Mint: 0, Maxt: 9}}, smplsExp: buildSmpls([]int64{}), }, } @@ -591,7 +593,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { testutil.Ok(t, hb.Close()) // Confirm there's been a checkpoint. - cdir, _, err := LastCheckpoint(dir) + cdir, _, err := wal.LastCheckpoint(dir) testutil.Ok(t, err) // Read in checkpoint and WAL. recs := readTestWAL(t, cdir) @@ -600,11 +602,11 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { var series, samples, stones int for _, rec := range recs { switch rec.(type) { - case []RefSeries: + case []record.RefSeries: series++ - case []RefSample: + case []record.RefSample: samples++ - case []Stone: + case []tombstones.Stone: stones++ default: t.Fatalf("unknown record type") @@ -692,18 +694,18 @@ func TestDelete_e2e(t *testing.T) { // Delete a time-range from each-selector. dels := []struct { ms []labels.Matcher - drange Intervals + drange tombstones.Intervals }{ { ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, - drange: Intervals{{300, 500}, {600, 670}}, + drange: tombstones.Intervals{{Mint: 300, Maxt: 500}, {Mint: 600, Maxt: 670}}, }, { ms: []labels.Matcher{ labels.NewEqualMatcher("a", "b"), labels.NewEqualMatcher("job", "prom-k8s"), }, - drange: Intervals{{300, 500}, {100, 670}}, + drange: tombstones.Intervals{{Mint: 300, Maxt: 500}, {Mint: 100, Maxt: 670}}, }, { ms: []labels.Matcher{ @@ -711,7 +713,7 @@ func TestDelete_e2e(t *testing.T) { labels.NewEqualMatcher("instance", "localhost:9090"), labels.NewEqualMatcher("job", "prometheus"), }, - drange: Intervals{{300, 400}, {100, 6700}}, + drange: tombstones.Intervals{{Mint: 300, Maxt: 400}, {Mint: 100, Maxt: 6700}}, }, // TODO: Add Regexp Matchers. } @@ -794,12 +796,12 @@ func boundedSamples(full []tsdbutil.Sample, mint, maxt int64) []tsdbutil.Sample return full } -func deletedSamples(full []tsdbutil.Sample, dranges Intervals) []tsdbutil.Sample { +func deletedSamples(full []tsdbutil.Sample, dranges tombstones.Intervals) []tsdbutil.Sample { ds := make([]tsdbutil.Sample, 0, len(full)) Outer: for _, s := range full { for _, r := range dranges { - if r.inBounds(s.T()) { + if r.InBounds(s.T()) { continue Outer } } @@ -1055,9 +1057,9 @@ func TestHead_LogRollback(t *testing.T) { testutil.Equals(t, 1, len(recs)) - series, ok := recs[0].([]RefSeries) + series, ok := recs[0].([]record.RefSeries) testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) - testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) + testutil.Equals(t, []record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) }) } } @@ -1065,7 +1067,7 @@ func TestHead_LogRollback(t *testing.T) { // TestWalRepair_DecodingError ensures that a repair is run for an error // when decoding a record. func TestWalRepair_DecodingError(t *testing.T) { - var enc RecordEncoder + var enc record.Encoder for name, test := range map[string]struct { corrFunc func(rec []byte) []byte // Func that applies the corruption to a record. rec []byte @@ -1077,10 +1079,10 @@ func TestWalRepair_DecodingError(t *testing.T) { // Do not modify the base record because it is Logged multiple times. res := make([]byte, len(rec)) copy(res, rec) - res[0] = byte(RecordInvalid) + res[0] = byte(record.Invalid) return res }, - enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), + enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 9, 5, }, @@ -1088,7 +1090,7 @@ func TestWalRepair_DecodingError(t *testing.T) { func(rec []byte) []byte { return rec[:3] }, - enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), + enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 9, 5, }, @@ -1096,7 +1098,7 @@ func TestWalRepair_DecodingError(t *testing.T) { func(rec []byte) []byte { return rec[:3] }, - enc.Samples([]RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}), + enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}), 9, 5, }, @@ -1104,7 +1106,7 @@ func TestWalRepair_DecodingError(t *testing.T) { func(rec []byte) []byte { return rec[:3] }, - enc.Tombstones([]Stone{{ref: 1, intervals: Intervals{}}}, []byte{}), + enc.Tombstones([]tombstones.Stone{{Ref: 1, Intervals: tombstones.Intervals{}}}, []byte{}), 9, 5, }, diff --git a/tsdb/mocks_test.go b/tsdb/mocks_test.go index e49c92868..fa4642054 100644 --- a/tsdb/mocks_test.go +++ b/tsdb/mocks_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" ) type mockIndexWriter struct { @@ -72,7 +73,9 @@ type mockBReader struct { maxt int64 } -func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } -func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } -func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil } -func (r *mockBReader) Meta() BlockMeta { return BlockMeta{MinTime: r.mint, MaxTime: r.maxt} } +func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } +func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } +func (r *mockBReader) Tombstones() (tombstones.Reader, error) { + return tombstones.NewMemTombstones(), nil +} +func (r *mockBReader) Meta() BlockMeta { return BlockMeta{MinTime: r.mint, MaxTime: r.maxt} } diff --git a/tsdb/querier.go b/tsdb/querier.go index 357232e46..96880e30a 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -25,6 +25,7 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" ) // Querier provides querying access over time series data of a fixed @@ -204,7 +205,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { type blockQuerier struct { index IndexReader chunks ChunkReader - tombstones TombstoneReader + tombstones tombstones.Reader closed bool @@ -671,7 +672,7 @@ func (s *mergedVerticalSeriesSet) Next() bool { // actual series itself. type ChunkSeriesSet interface { Next() bool - At() (labels.Labels, []chunks.Meta, Intervals) + At() (labels.Labels, []chunks.Meta, tombstones.Intervals) Err() error } @@ -680,19 +681,19 @@ type ChunkSeriesSet interface { type baseChunkSeries struct { p index.Postings index IndexReader - tombstones TombstoneReader + tombstones tombstones.Reader lset labels.Labels chks []chunks.Meta - intervals Intervals + intervals tombstones.Intervals err error } // LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet // over them. It drops chunks based on tombstones in the given reader. -func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { +func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = newMemTombstones() + tr = tombstones.NewMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { @@ -705,7 +706,7 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) }, nil } -func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) { +func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return s.lset, s.chks, s.intervals } @@ -741,7 +742,7 @@ func (s *baseChunkSeries) Next() bool { // Only those chunks that are not entirely deleted. chks := make([]chunks.Meta, 0, len(s.chks)) for _, chk := range s.chks { - if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) { + if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(s.intervals)) { chks = append(chks, chk) } } @@ -768,10 +769,10 @@ type populatedChunkSeries struct { err error chks []chunks.Meta lset labels.Labels - intervals Intervals + intervals tombstones.Intervals } -func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) { +func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return s.lset, s.chks, s.intervals } @@ -866,7 +867,7 @@ type chunkSeries struct { mint, maxt int64 - intervals Intervals + intervals tombstones.Intervals } func (s *chunkSeries) Labels() labels.Labels { @@ -1067,10 +1068,10 @@ type chunkSeriesIterator struct { maxt, mint int64 - intervals Intervals + intervals tombstones.Intervals } -func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []chunks.Meta, dranges tombstones.Intervals, mint, maxt int64) *chunkSeriesIterator { csi := &chunkSeriesIterator{ chunks: cs, i: 0, @@ -1169,7 +1170,7 @@ func (it *chunkSeriesIterator) Err() error { type deletedIterator struct { it chunkenc.Iterator - intervals Intervals + intervals tombstones.Intervals } func (it *deletedIterator) At() (int64, float64) { @@ -1182,7 +1183,7 @@ Outer: ts, _ := it.it.At() for _, tr := range it.intervals { - if tr.inBounds(ts) { + if tr.InBounds(ts) { continue Outer } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 88dfe8f33..aba3418b1 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/testutil" ) @@ -368,7 +369,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: newMemTombstones(), + tombstones: tombstones.NewMemTombstones(), mint: c.mint, maxt: c.maxt, @@ -415,7 +416,7 @@ func TestBlockQuerierDelete(t *testing.T) { cases := struct { data []seriesSamples - tombstones TombstoneReader + tombstones tombstones.Reader queries []query }{ data: []seriesSamples{ @@ -460,11 +461,11 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: &memTombstones{intvlGroups: map[uint64]Intervals{ - 1: {{1, 3}}, - 2: {{1, 3}, {6, 10}}, - 3: {{6, 10}}, - }}, + tombstones: tombstones.NewTestMemTombstones([]tombstones.Intervals{ + tombstones.Intervals{{Mint: 1, Maxt: 3}}, + tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 6, Maxt: 10}}, + tombstones.Intervals{{Mint: 6, Maxt: 10}}, + }), queries: []query{ { mint: 2, @@ -524,6 +525,7 @@ func TestBlockQuerierDelete(t *testing.T) { }, } + fmt.Println("tombstones", cases.tombstones) Outer: for _, c := range cases.queries { ir, cr, _, _ := createIdxChkReaders(t, cases.data) @@ -637,7 +639,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: index.NewListPostings(tc.postings), index: mi, - tombstones: newMemTombstones(), + tombstones: tombstones.NewMemTombstones(), } i := 0 @@ -1159,7 +1161,7 @@ func (m *mockChunkSeriesSet) Next() bool { return m.i < len(m.l) } -func (m *mockChunkSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) { +func (m *mockChunkSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return m.l[m.i], m.cm[m.i], nil } @@ -1254,18 +1256,18 @@ func TestDeletedIterator(t *testing.T) { } cases := []struct { - r Intervals + r tombstones.Intervals }{ - {r: Intervals{{1, 20}}}, - {r: Intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}}, - {r: Intervals{{1, 10}, {12, 20}, {20, 30}}}, - {r: Intervals{{1, 10}, {12, 23}, {25, 30}}}, - {r: Intervals{{1, 23}, {12, 20}, {25, 30}}}, - {r: Intervals{{1, 23}, {12, 20}, {25, 3000}}}, - {r: Intervals{{0, 2000}}}, - {r: Intervals{{500, 2000}}}, - {r: Intervals{{0, 200}}}, - {r: Intervals{{1000, 20000}}}, + {r: tombstones.Intervals{{Mint: 1, Maxt: 20}}}, + {r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 20}, {Mint: 21, Maxt: 23}, {Mint: 25, Maxt: 30}}}, + {r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 20}, {Mint: 20, Maxt: 30}}}, + {r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 23}, {Mint: 25, Maxt: 30}}}, + {r: tombstones.Intervals{{Mint: 1, Maxt: 23}, {Mint: 12, Maxt: 20}, {Mint: 25, Maxt: 30}}}, + {r: tombstones.Intervals{{Mint: 1, Maxt: 23}, {Mint: 12, Maxt: 20}, {Mint: 25, Maxt: 3000}}}, + {r: tombstones.Intervals{{Mint: 0, Maxt: 2000}}}, + {r: tombstones.Intervals{{Mint: 500, Maxt: 2000}}}, + {r: tombstones.Intervals{{Mint: 0, Maxt: 200}}}, + {r: tombstones.Intervals{{Mint: 1000, Maxt: 20000}}}, } for _, c := range cases { @@ -1275,7 +1277,7 @@ func TestDeletedIterator(t *testing.T) { for it.Next() { i++ for _, tr := range ranges { - if tr.inBounds(i) { + if tr.InBounds(i) { i = tr.Maxt + 1 ranges = ranges[1:] } @@ -1290,7 +1292,7 @@ func TestDeletedIterator(t *testing.T) { // There has been an extra call to Next(). i++ for _, tr := range ranges { - if tr.inBounds(i) { + if tr.InBounds(i) { i = tr.Maxt + 1 ranges = ranges[1:] } diff --git a/tsdb/record.go b/tsdb/record/record.go similarity index 65% rename from tsdb/record.go rename to tsdb/record/record.go index 174d3bd14..87b1c8378 100644 --- a/tsdb/record.go +++ b/tsdb/record/record.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package record import ( "math" @@ -21,45 +21,64 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" ) -// RecordType represents the data type of a record. -type RecordType uint8 +// Type represents the data type of a record. +type Type uint8 const ( - // RecordInvalid is returned for unrecognised WAL record types. - RecordInvalid RecordType = 255 - // RecordSeries is used to match WAL records of type Series. - RecordSeries RecordType = 1 - // RecordSamples is used to match WAL records of type Samples. - RecordSamples RecordType = 2 - // RecordTombstones is used to match WAL records of type Tombstones. - RecordTombstones RecordType = 3 + // Invalid is returned for unrecognised WAL record types. + Invalid Type = 255 + // Series is used to match WAL records of type Series. + Series Type = 1 + // Samples is used to match WAL records of type Samples. + Samples Type = 2 + // Tombstones is used to match WAL records of type Tombstones. + Tombstones Type = 3 ) -// RecordDecoder decodes series, sample, and tombstone records. +var ( + // ErrNotFound is returned if a looked up resource was not found. Duplicate ErrNotFound from head.go. + ErrNotFound = errors.New("not found") +) + +// RefSeries is the series labels with the series ID. +type RefSeries struct { + Ref uint64 + Labels labels.Labels +} + +// RefSample is a timestamp/value pair associated with a reference to a series. +type RefSample struct { + Ref uint64 + T int64 + V float64 +} + +// Decoder decodes series, sample, and tombstone records. // The zero value is ready to use. -type RecordDecoder struct { +type Decoder struct { } // Type returns the type of the record. -// Return RecordInvalid if no valid record type is found. -func (d *RecordDecoder) Type(rec []byte) RecordType { +// Returns RecordInvalid if no valid record type is found. +func (d *Decoder) Type(rec []byte) Type { if len(rec) < 1 { - return RecordInvalid + return Invalid } - switch t := RecordType(rec[0]); t { - case RecordSeries, RecordSamples, RecordTombstones: + switch t := Type(rec[0]); t { + case Series, Samples, Tombstones: return t } - return RecordInvalid + return Invalid } // Series appends series in rec to the given slice. -func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { +func (d *Decoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { dec := encoding.Decbuf{B: rec} - if RecordType(dec.Byte()) != RecordSeries { + if Type(dec.Byte()) != Series { return nil, errors.New("invalid record type") } for len(dec.B) > 0 && dec.Err() == nil { @@ -88,10 +107,10 @@ func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, err } // Samples appends samples in rec to the given slice. -func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { +func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { dec := encoding.Decbuf{B: rec} - if RecordType(dec.Byte()) != RecordSamples { + if Type(dec.Byte()) != Samples { return nil, errors.New("invalid record type") } if dec.Len() == 0 { @@ -123,16 +142,16 @@ func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, e } // Tombstones appends tombstones in rec to the given slice. -func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) { +func (d *Decoder) Tombstones(rec []byte, tstones []tombstones.Stone) ([]tombstones.Stone, error) { dec := encoding.Decbuf{B: rec} - if RecordType(dec.Byte()) != RecordTombstones { + if Type(dec.Byte()) != Tombstones { return nil, errors.New("invalid record type") } for dec.Len() > 0 && dec.Err() == nil { - tstones = append(tstones, Stone{ - ref: dec.Be64(), - intervals: Intervals{ + tstones = append(tstones, tombstones.Stone{ + Ref: dec.Be64(), + Intervals: tombstones.Intervals{ {Mint: dec.Varint64(), Maxt: dec.Varint64()}, }, }) @@ -146,15 +165,15 @@ func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) return tstones, nil } -// RecordEncoder encodes series, sample, and tombstones records. +// Encoder encodes series, sample, and tombstones records. // The zero value is ready to use. -type RecordEncoder struct { +type Encoder struct { } // Series appends the encoded series to b and returns the resulting slice. -func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { +func (e *Encoder) Series(series []RefSeries, b []byte) []byte { buf := encoding.Encbuf{B: b} - buf.PutByte(byte(RecordSeries)) + buf.PutByte(byte(Series)) for _, s := range series { buf.PutBE64(s.Ref) @@ -169,9 +188,9 @@ func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { } // Samples appends the encoded samples to b and returns the resulting slice. -func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { +func (e *Encoder) Samples(samples []RefSample, b []byte) []byte { buf := encoding.Encbuf{B: b} - buf.PutByte(byte(RecordSamples)) + buf.PutByte(byte(Samples)) if len(samples) == 0 { return buf.Get() @@ -193,13 +212,13 @@ func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { } // Tombstones appends the encoded tombstones to b and returns the resulting slice. -func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte { +func (e *Encoder) Tombstones(tstones []tombstones.Stone, b []byte) []byte { buf := encoding.Encbuf{B: b} - buf.PutByte(byte(RecordTombstones)) + buf.PutByte(byte(Tombstones)) for _, s := range tstones { - for _, iv := range s.intervals { - buf.PutBE64(s.ref) + for _, iv := range s.Intervals { + buf.PutBE64(s.Ref) buf.PutVarint64(iv.Mint) buf.PutVarint64(iv.Maxt) } diff --git a/tsdb/record_test.go b/tsdb/record/record_test.go similarity index 81% rename from tsdb/record_test.go rename to tsdb/record/record_test.go index 482780c1c..feed8069d 100644 --- a/tsdb/record_test.go +++ b/tsdb/record/record_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package record import ( "testing" @@ -20,12 +20,13 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/util/testutil" ) func TestRecord_EncodeDecode(t *testing.T) { - var enc RecordEncoder - var dec RecordDecoder + var enc Encoder + var dec Decoder series := []RefSeries{ { @@ -54,31 +55,31 @@ func TestRecord_EncodeDecode(t *testing.T) { // Intervals get split up into single entries. So we don't get back exactly // what we put in. - tstones := []Stone{ - {ref: 123, intervals: Intervals{ + tstones := []tombstones.Stone{ + {Ref: 123, Intervals: tombstones.Intervals{ {Mint: -1000, Maxt: 1231231}, {Mint: 5000, Maxt: 0}, }}, - {ref: 13, intervals: Intervals{ + {Ref: 13, Intervals: tombstones.Intervals{ {Mint: -1000, Maxt: -11}, {Mint: 5000, Maxt: 1000}, }}, } decTstones, err := dec.Tombstones(enc.Tombstones(tstones, nil), nil) testutil.Ok(t, err) - testutil.Equals(t, []Stone{ - {ref: 123, intervals: Intervals{{Mint: -1000, Maxt: 1231231}}}, - {ref: 123, intervals: Intervals{{Mint: 5000, Maxt: 0}}}, - {ref: 13, intervals: Intervals{{Mint: -1000, Maxt: -11}}}, - {ref: 13, intervals: Intervals{{Mint: 5000, Maxt: 1000}}}, + testutil.Equals(t, []tombstones.Stone{ + {Ref: 123, Intervals: tombstones.Intervals{{Mint: -1000, Maxt: 1231231}}}, + {Ref: 123, Intervals: tombstones.Intervals{{Mint: 5000, Maxt: 0}}}, + {Ref: 13, Intervals: tombstones.Intervals{{Mint: -1000, Maxt: -11}}}, + {Ref: 13, Intervals: tombstones.Intervals{{Mint: 5000, Maxt: 1000}}}, }, decTstones) } // TestRecord_Corruputed ensures that corrupted records return the correct error. // Bugfix check for pull/521 and pull/523. func TestRecord_Corruputed(t *testing.T) { - var enc RecordEncoder - var dec RecordDecoder + var enc Encoder + var dec Decoder t.Run("Test corrupted series record", func(t *testing.T) { series := []RefSeries{ @@ -104,8 +105,8 @@ func TestRecord_Corruputed(t *testing.T) { }) t.Run("Test corrupted tombstone record", func(t *testing.T) { - tstones := []Stone{ - {ref: 123, intervals: Intervals{ + tstones := []tombstones.Stone{ + {Ref: 123, Intervals: tombstones.Intervals{ {Mint: -1000, Maxt: 1231231}, {Mint: 5000, Maxt: 0}, }}, diff --git a/tsdb/tombstones.go b/tsdb/tombstones/tombstones.go similarity index 74% rename from tsdb/tombstones.go rename to tsdb/tombstones/tombstones.go index 8910f7b8b..e303f8f64 100644 --- a/tsdb/tombstones.go +++ b/tsdb/tombstones/tombstones.go @@ -11,11 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package tombstones import ( "encoding/binary" "fmt" + "hash" + "hash/crc32" "io" "io/ioutil" "os" @@ -30,7 +32,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" ) -const tombstoneFilename = "tombstones" +const TombstonesFilename = "tombstones" const ( // MagicTombstone is 4 bytes at the head of a tombstone file. @@ -39,8 +41,23 @@ const ( tombstoneFormatV1 = 1 ) -// TombstoneReader gives access to tombstone intervals by series reference. -type TombstoneReader interface { +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable *crc32.Table + +func init() { + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) +} + +// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the +// polynomial may be easily changed in one location at a later time, if necessary. +func newCRC32() hash.Hash32 { + return crc32.New(castagnoliTable) +} + +// Reader gives access to tombstone intervals by series reference. +type Reader interface { // Get returns deletion intervals for the series with the given reference. Get(ref uint64) (Intervals, error) @@ -54,8 +71,8 @@ type TombstoneReader interface { Close() error } -func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) { - path := filepath.Join(dir, tombstoneFilename) +func WriteFile(logger log.Logger, dir string, tr Reader) (int64, error) { + path := filepath.Join(dir, TombstonesFilename) tmp := path + ".tmp" hash := newCRC32() var size int @@ -129,14 +146,14 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int6 // Stone holds the information on the posting and time-range // that is deleted. type Stone struct { - ref uint64 - intervals Intervals + Ref uint64 + Intervals Intervals } -func readTombstones(dir string) (TombstoneReader, int64, error) { - b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) +func ReadTombstones(dir string) (Reader, int64, error) { + b, err := ioutil.ReadFile(filepath.Join(dir, TombstonesFilename)) if os.IsNotExist(err) { - return newMemTombstones(), 0, nil + return NewMemTombstones(), 0, nil } else if err != nil { return nil, 0, err } @@ -166,7 +183,7 @@ func readTombstones(dir string) (TombstoneReader, int64, error) { return nil, 0, errors.New("checksum did not match") } - stonesMap := newMemTombstones() + stonesMap := NewMemTombstones() for d.Len() > 0 { k := d.Uvarint64() @@ -176,7 +193,7 @@ func readTombstones(dir string) (TombstoneReader, int64, error) { return nil, 0, d.Err() } - stonesMap.addInterval(k, Interval{mint, maxt}) + stonesMap.AddInterval(k, Interval{mint, maxt}) } return stonesMap, int64(len(b)), nil @@ -187,12 +204,22 @@ type memTombstones struct { mtx sync.RWMutex } -// newMemTombstones creates new in memory TombstoneReader +// NewMemTombstones creates new in memory Tombstone Reader // that allows adding new intervals. -func newMemTombstones() *memTombstones { +func NewMemTombstones() *memTombstones { return &memTombstones{intvlGroups: make(map[uint64]Intervals)} } +func NewTestMemTombstones(intervals []Intervals) *memTombstones { + ret := NewMemTombstones() + for i, intervalsGroup := range intervals { + for _, interval := range intervalsGroup { + ret.AddInterval(uint64(i+1), interval) + } + } + return ret +} + func (t *memTombstones) Get(ref uint64) (Intervals, error) { t.mtx.RLock() defer t.mtx.RUnlock() @@ -221,12 +248,13 @@ func (t *memTombstones) Total() uint64 { return total } -// addInterval to an existing memTombstones -func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { +// AddInterval to an existing memTombstones. +func (t *memTombstones) AddInterval(ref uint64, itvs ...Interval) { t.mtx.Lock() defer t.mtx.Unlock() for _, itv := range itvs { - t.intvlGroups[ref] = t.intvlGroups[ref].add(itv) + fmt.Println("adding interval to ref: ", ref) + t.intvlGroups[ref] = t.intvlGroups[ref].Add(itv) } } @@ -239,13 +267,13 @@ type Interval struct { Mint, Maxt int64 } -func (tr Interval) inBounds(t int64) bool { +func (tr Interval) InBounds(t int64) bool { return t >= tr.Mint && t <= tr.Maxt } -func (tr Interval) isSubrange(dranges Intervals) bool { +func (tr Interval) IsSubrange(dranges Intervals) bool { for _, r := range dranges { - if r.inBounds(tr.Mint) && r.inBounds(tr.Maxt) { + if r.InBounds(tr.Mint) && r.InBounds(tr.Maxt) { return true } } @@ -256,12 +284,12 @@ func (tr Interval) isSubrange(dranges Intervals) bool { // Intervals represents a set of increasing and non-overlapping time-intervals. type Intervals []Interval -// add the new time-range to the existing ones. +// Add the new time-range to the existing ones. // The existing ones must be sorted. -func (itvs Intervals) add(n Interval) Intervals { +func (itvs Intervals) Add(n Interval) Intervals { for i, r := range itvs { // TODO(gouthamve): Make this codepath easier to digest. - if r.inBounds(n.Mint-1) || r.inBounds(n.Mint) { + if r.InBounds(n.Mint-1) || r.InBounds(n.Mint) { if n.Maxt > r.Maxt { itvs[i].Maxt = n.Maxt } @@ -282,7 +310,7 @@ func (itvs Intervals) add(n Interval) Intervals { return itvs } - if r.inBounds(n.Maxt+1) || r.inBounds(n.Maxt) { + if r.InBounds(n.Maxt+1) || r.InBounds(n.Maxt) { if n.Mint < r.Maxt { itvs[i].Mint = n.Mint } diff --git a/tsdb/tombstones_test.go b/tsdb/tombstones/tombstones_test.go similarity index 89% rename from tsdb/tombstones_test.go rename to tsdb/tombstones/tombstones_test.go index e37721a98..81a82c8c5 100644 --- a/tsdb/tombstones_test.go +++ b/tsdb/tombstones/tombstones_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package tombstones import ( "io/ioutil" @@ -33,7 +33,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := newMemTombstones() + stones := NewMemTombstones() // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -41,16 +41,16 @@ func TestWriteAndReadbackTombStones(t *testing.T) { dranges := make(Intervals, 0, numRanges) mint := rand.Int63n(time.Now().UnixNano()) for j := 0; j < numRanges; j++ { - dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)}) + dranges = dranges.Add(Interval{mint, mint + rand.Int63n(1000)}) mint += rand.Int63n(1000) + 1 } - stones.addInterval(ref, dranges...) + stones.AddInterval(ref, dranges...) } - _, err := writeTombstoneFile(log.NewNopLogger(), tmpdir, stones) + _, err := WriteFile(log.NewNopLogger(), tmpdir, stones) testutil.Ok(t, err) - restr, _, err := readTombstones(tmpdir) + restr, _, err := ReadTombstones(tmpdir) testutil.Ok(t, err) // Compare the two readers. @@ -122,20 +122,20 @@ func TestAddingNewIntervals(t *testing.T) { for _, c := range cases { - testutil.Equals(t, c.exp, c.exist.add(c.new)) + testutil.Equals(t, c.exp, c.exist.Add(c.new)) } } // TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines. func TestMemTombstonesConcurrency(t *testing.T) { - tomb := newMemTombstones() + tomb := NewMemTombstones() totalRuns := 100 var wg sync.WaitGroup wg.Add(2) go func() { for x := 0; x < totalRuns; x++ { - tomb.addInterval(uint64(x), Interval{int64(x), int64(x)}) + tomb.AddInterval(uint64(x), Interval{int64(x), int64(x)}) } wg.Done() }() diff --git a/tsdb/wal.go b/tsdb/wal.go index 187feabd9..f7e28301e 100644 --- a/tsdb/wal.go +++ b/tsdb/wal.go @@ -34,6 +34,8 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/wal" ) @@ -89,9 +91,9 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // DEPRECATED: use wal pkg combined with the record codex instead. type WAL interface { Reader() WALReader - LogSeries([]RefSeries) error - LogSamples([]RefSample) error - LogDeletes([]Stone) error + LogSeries([]record.RefSeries) error + LogSamples([]record.RefSample) error + LogDeletes([]tombstones.Stone) error Truncate(mint int64, keep func(uint64) bool) error Close() error } @@ -99,27 +101,12 @@ type WAL interface { // WALReader reads entries from a WAL. type WALReader interface { Read( - seriesf func([]RefSeries), - samplesf func([]RefSample), - deletesf func([]Stone), + seriesf func([]record.RefSeries), + samplesf func([]record.RefSample), + deletesf func([]tombstones.Stone), ) error } -// RefSeries is the series labels with the series ID. -type RefSeries struct { - Ref uint64 - Labels labels.Labels -} - -// RefSample is a timestamp/value pair associated with a reference to a series. -type RefSample struct { - Ref uint64 - T int64 - V float64 - - series *memSeries -} - // segmentFile wraps a file object of a segment and tracks the highest timestamp // it contains. During WAL truncating, all segments with no higher timestamp than // the truncation threshold can be compacted. @@ -240,9 +227,9 @@ type repairingWALReader struct { } func (r *repairingWALReader) Read( - seriesf func([]RefSeries), - samplesf func([]RefSample), - deletesf func([]Stone), + seriesf func([]record.RefSeries), + samplesf func([]record.RefSample), + deletesf func([]tombstones.Stone), ) error { err := r.r.Read(seriesf, samplesf, deletesf) if err == nil { @@ -348,8 +335,8 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { var ( csf = newSegmentFile(f) crc32 = newCRC32() - decSeries = []RefSeries{} - activeSeries = []RefSeries{} + decSeries = []record.RefSeries{} + activeSeries = []record.RefSeries{} ) for r.next() { @@ -433,7 +420,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { // LogSeries writes a batch of new series labels to the log. // The series have to be ordered. -func (w *SegmentWAL) LogSeries(series []RefSeries) error { +func (w *SegmentWAL) LogSeries(series []record.RefSeries) error { buf := w.getBuffer() flag := w.encodeSeries(buf, series) @@ -460,7 +447,7 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { } // LogSamples writes a batch of new samples to the log. -func (w *SegmentWAL) LogSamples(samples []RefSample) error { +func (w *SegmentWAL) LogSamples(samples []record.RefSample) error { buf := w.getBuffer() flag := w.encodeSamples(buf, samples) @@ -486,7 +473,7 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { } // LogDeletes write a batch of new deletes to the log. -func (w *SegmentWAL) LogDeletes(stones []Stone) error { +func (w *SegmentWAL) LogDeletes(stones []tombstones.Stone) error { buf := w.getBuffer() flag := w.encodeDeletes(buf, stones) @@ -504,7 +491,7 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { tf := w.head() for _, s := range stones { - for _, iv := range s.intervals { + for _, iv := range s.Intervals { if tf.maxTime < iv.Maxt { tf.maxTime = iv.Maxt } @@ -797,7 +784,7 @@ const ( walDeletesSimple = 1 ) -func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []RefSeries) uint8 { +func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []record.RefSeries) uint8 { for _, s := range series { buf.PutBE64(s.Ref) buf.PutUvarint(len(s.Labels)) @@ -810,7 +797,7 @@ func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []RefSeries) uint return walSeriesSimple } -func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []RefSample) uint8 { +func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []record.RefSample) uint8 { if len(samples) == 0 { return walSamplesSimple } @@ -831,10 +818,10 @@ func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []RefSample) ui return walSamplesSimple } -func (w *SegmentWAL) encodeDeletes(buf *encoding.Encbuf, stones []Stone) uint8 { +func (w *SegmentWAL) encodeDeletes(buf *encoding.Encbuf, stones []tombstones.Stone) uint8 { for _, s := range stones { - for _, iv := range s.intervals { - buf.PutBE64(s.ref) + for _, iv := range s.Intervals { + buf.PutBE64(s.Ref) buf.PutVarint64(iv.Mint) buf.PutVarint64(iv.Maxt) } @@ -877,9 +864,9 @@ func (r *walReader) Err() error { } func (r *walReader) Read( - seriesf func([]RefSeries), - samplesf func([]RefSample), - deletesf func([]Stone), + seriesf func([]record.RefSeries), + samplesf func([]record.RefSample), + deletesf func([]tombstones.Stone), ) error { // Concurrency for replaying the WAL is very limited. We at least split out decoding and // processing into separate threads. @@ -898,19 +885,19 @@ func (r *walReader) Read( for x := range datac { switch v := x.(type) { - case []RefSeries: + case []record.RefSeries: if seriesf != nil { seriesf(v) } //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. seriesPool.Put(v[:0]) - case []RefSample: + case []record.RefSample: if samplesf != nil { samplesf(v) } //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. samplePool.Put(v[:0]) - case []Stone: + case []tombstones.Stone: if deletesf != nil { deletesf(v) } @@ -931,11 +918,11 @@ func (r *walReader) Read( // Those should generally be catched by entry decoding before. switch et { case WALEntrySeries: - var series []RefSeries + var series []record.RefSeries if v := seriesPool.Get(); v == nil { - series = make([]RefSeries, 0, 512) + series = make([]record.RefSeries, 0, 512) } else { - series = v.([]RefSeries) + series = v.([]record.RefSeries) } err = r.decodeSeries(flag, b, &series) @@ -952,11 +939,11 @@ func (r *walReader) Read( } } case WALEntrySamples: - var samples []RefSample + var samples []record.RefSample if v := samplePool.Get(); v == nil { - samples = make([]RefSample, 0, 512) + samples = make([]record.RefSample, 0, 512) } else { - samples = v.([]RefSample) + samples = v.([]record.RefSample) } err = r.decodeSamples(flag, b, &samples) @@ -974,11 +961,11 @@ func (r *walReader) Read( } } case WALEntryDeletes: - var deletes []Stone + var deletes []tombstones.Stone if v := deletePool.Get(); v == nil { - deletes = make([]Stone, 0, 512) + deletes = make([]tombstones.Stone, 0, 512) } else { - deletes = v.([]Stone) + deletes = v.([]tombstones.Stone) } err = r.decodeDeletes(flag, b, &deletes) @@ -991,7 +978,7 @@ func (r *walReader) Read( // Update the times for the WAL segment file. cf := r.current() for _, s := range deletes { - for _, iv := range s.intervals { + for _, iv := range s.Intervals { if cf.maxTime < iv.Maxt { cf.maxTime = iv.Maxt } @@ -1128,7 +1115,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { +func (r *walReader) decodeSeries(flag byte, b []byte, res *[]record.RefSeries) error { dec := encoding.Decbuf{B: b} for len(dec.B) > 0 && dec.Err() == nil { @@ -1142,7 +1129,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { } sort.Sort(lset) - *res = append(*res, RefSeries{ + *res = append(*res, record.RefSeries{ Ref: ref, Labels: lset, }) @@ -1156,7 +1143,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { return nil } -func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { +func (r *walReader) decodeSamples(flag byte, b []byte, res *[]record.RefSample) error { if len(b) == 0 { return nil } @@ -1172,7 +1159,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { dtime := dec.Varint64() val := dec.Be64() - *res = append(*res, RefSample{ + *res = append(*res, record.RefSample{ Ref: uint64(int64(baseRef) + dref), T: baseTime + dtime, V: math.Float64frombits(val), @@ -1188,13 +1175,13 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { return nil } -func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { +func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]tombstones.Stone) error { dec := &encoding.Decbuf{B: b} for dec.Len() > 0 && dec.Err() == nil { - *res = append(*res, Stone{ - ref: dec.Be64(), - intervals: Intervals{ + *res = append(*res, tombstones.Stone{ + Ref: dec.Be64(), + Intervals: tombstones.Intervals{ {Mint: dec.Varint64(), Maxt: dec.Varint64()}, }, }) @@ -1274,23 +1261,23 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { rdr := w.Reader() var ( - enc RecordEncoder + enc record.Encoder b []byte ) decErr := rdr.Read( - func(s []RefSeries) { + func(s []record.RefSeries) { if err != nil { return } err = repl.Log(enc.Series(s, b[:0])) }, - func(s []RefSample) { + func(s []record.RefSample) { if err != nil { return } err = repl.Log(enc.Samples(s, b[:0])) }, - func(s []Stone) { + func(s []tombstones.Stone) { if err != nil { return } diff --git a/tsdb/checkpoint.go b/tsdb/wal/checkpoint.go similarity index 88% rename from tsdb/checkpoint.go rename to tsdb/wal/checkpoint.go index d82e567f9..8d3cc8ab6 100644 --- a/tsdb/checkpoint.go +++ b/tsdb/wal/checkpoint.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package wal import ( "fmt" @@ -27,7 +27,8 @@ import ( "github.com/pkg/errors" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tombstones" ) // CheckpointStats returns stats about a created checkpoint. @@ -63,7 +64,7 @@ func LastCheckpoint(dir string) (string, int, error) { } return filepath.Join(dir, fi.Name()), idx, nil } - return "", 0, ErrNotFound + return "", 0, record.ErrNotFound } // DeleteCheckpoints deletes all checkpoints in a directory below a given index. @@ -99,15 +100,15 @@ const checkpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser { - var sgmRange []wal.SegmentRange + var sgmRange []SegmentRange dir, idx, err := LastCheckpoint(w.Dir()) - if err != nil && err != ErrNotFound { + if err != nil && err != record.ErrNotFound { return nil, errors.Wrap(err, "find last checkpoint") } last := idx + 1 @@ -118,11 +119,11 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. from = last - sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32}) + sgmRange = append(sgmRange, SegmentRange{Dir: dir, Last: math.MaxInt32}) } - sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to}) - sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...) + sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to}) + sgmReader, err = NewSegmentsRangeReader(sgmRange...) if err != nil { return nil, errors.Wrap(err, "create segment reader") } @@ -135,7 +136,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) if err := os.MkdirAll(cpdirtmp, 0777); err != nil { return nil, errors.Wrap(err, "create checkpoint dir") } - cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled()) + cp, err := New(nil, nil, cpdirtmp, w.CompressionEnabled()) if err != nil { return nil, errors.Wrap(err, "open checkpoint") } @@ -146,14 +147,14 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) os.RemoveAll(cpdirtmp) }() - r := wal.NewReader(sgmReader) + r := NewReader(sgmReader) var ( - series []RefSeries - samples []RefSample - tstones []Stone - dec RecordDecoder - enc RecordEncoder + series []record.RefSeries + samples []record.RefSample + tstones []tombstones.Stone + dec record.Decoder + enc record.Encoder buf []byte recs [][]byte ) @@ -167,7 +168,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: series, err = dec.Series(rec, series) if err != nil { return nil, errors.Wrap(err, "decode series") @@ -185,7 +186,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) stats.TotalSeries += len(series) stats.DroppedSeries += len(series) - len(repl) - case RecordSamples: + case record.Samples: samples, err = dec.Samples(rec, samples) if err != nil { return nil, errors.Wrap(err, "decode samples") @@ -203,7 +204,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) stats.TotalSamples += len(samples) stats.DroppedSamples += len(samples) - len(repl) - case RecordTombstones: + case record.Tombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { return nil, errors.Wrap(err, "decode deletes") @@ -211,7 +212,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) // Drop irrelevant tombstones in place. repl := tstones[:0] for _, s := range tstones { - for _, iv := range s.intervals { + for _, iv := range s.Intervals { if iv.Maxt >= mint { repl = append(repl, s) break diff --git a/tsdb/checkpoint_test.go b/tsdb/wal/checkpoint_test.go similarity index 89% rename from tsdb/checkpoint_test.go rename to tsdb/wal/checkpoint_test.go index 2fd4bf373..c8542781d 100644 --- a/tsdb/checkpoint_test.go +++ b/tsdb/wal/checkpoint_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package wal import ( "fmt" @@ -25,7 +25,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/labels" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/util/testutil" ) @@ -37,7 +37,7 @@ func TestLastCheckpoint(t *testing.T) { }() _, _, err = LastCheckpoint(dir) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, record.ErrNotFound, err) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) s, k, err := LastCheckpoint(dir) @@ -94,18 +94,18 @@ func TestCheckpoint(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - var enc RecordEncoder + var enc record.Encoder // Create a dummy segment to bump the initial number. - seg, err := wal.CreateSegment(dir, 100) + seg, err := CreateSegment(dir, 100) testutil.Ok(t, err) testutil.Ok(t, seg.Close()) // Manually create checkpoint for 99 and earlier. - w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress) + w, err := New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress) testutil.Ok(t, err) // Add some data we expect to be around later. - err = w.Log(enc.Series([]RefSeries{ + err = w.Log(enc.Series([]record.RefSeries{ {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, }, nil)) @@ -113,7 +113,7 @@ func TestCheckpoint(t *testing.T) { testutil.Ok(t, w.Close()) // Start a WAL and write records to it as usual. - w, err = wal.NewSize(nil, nil, dir, 64*1024, compress) + w, err = NewSize(nil, nil, dir, 64*1024, compress) testutil.Ok(t, err) var last int64 @@ -125,7 +125,7 @@ func TestCheckpoint(t *testing.T) { } // Write some series initially. if i == 0 { - b := enc.Series([]RefSeries{ + b := enc.Series([]record.RefSeries{ {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, @@ -136,7 +136,7 @@ func TestCheckpoint(t *testing.T) { // Write samples until the WAL has enough segments. // Make them have drifting timestamps within a record to see that they // get filtered properly. - b := enc.Samples([]RefSample{ + b := enc.Samples([]record.RefSample{ {Ref: 0, T: last, V: float64(i)}, {Ref: 1, T: last + 10000, V: float64(i)}, {Ref: 2, T: last + 20000, V: float64(i)}, @@ -161,22 +161,22 @@ func TestCheckpoint(t *testing.T) { testutil.Equals(t, 1, len(files)) testutil.Equals(t, "checkpoint.000106", files[0]) - sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) + sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) testutil.Ok(t, err) defer sr.Close() - var dec RecordDecoder - var series []RefSeries - r := wal.NewReader(sr) + var dec record.Decoder + var series []record.RefSeries + r := NewReader(sr) for r.Next() { rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: series, err = dec.Series(rec, series) testutil.Ok(t, err) - case RecordSamples: + case record.Samples: samples, err := dec.Samples(rec, nil) testutil.Ok(t, err) for _, s := range samples { @@ -185,7 +185,7 @@ func TestCheckpoint(t *testing.T) { } } testutil.Ok(t, r.Err()) - testutil.Equals(t, []RefSeries{ + testutil.Equals(t, []record.RefSeries{ {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, @@ -201,7 +201,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.NewSize(nil, nil, dir, 64*1024, false) + w, err := NewSize(nil, nil, dir, 64*1024, false) testutil.Ok(t, err) testutil.Ok(t, w.Log([]byte{99})) w.Close() diff --git a/tsdb/wal/reader_test.go b/tsdb/wal/reader_test.go index eb858a780..13e1932e3 100644 --- a/tsdb/wal/reader_test.go +++ b/tsdb/wal/reader_test.go @@ -41,7 +41,7 @@ type reader interface { Offset() int64 } -type record struct { +type rec struct { t recType b []byte } @@ -59,13 +59,13 @@ var readerConstructors = map[string]func(io.Reader) reader{ var data = make([]byte, 100000) var testReaderCases = []struct { - t []record + t []rec exp [][]byte fail bool }{ // Sequence of valid records. { - t: []record{ + t: []rec{ {recFull, data[0:200]}, {recFirst, data[200:300]}, {recLast, data[300:400]}, @@ -89,7 +89,7 @@ var testReaderCases = []struct { }, // Exactly at the limit of one page minus the header size { - t: []record{ + t: []rec{ {recFull, data[0 : pageSize-recordHeaderSize]}, }, exp: [][]byte{ @@ -99,7 +99,7 @@ var testReaderCases = []struct { // More than a full page, this exceeds our buffer and can never happen // when written by the WAL. { - t: []record{ + t: []rec{ {recFull, data[0 : pageSize+1]}, }, fail: true, @@ -108,7 +108,7 @@ var testReaderCases = []struct { // NB currently the non-live reader succeeds on this. I think this is a bug. // but we've seen it in production. { - t: []record{ + t: []rec{ {recFull, data[:pageSize/2]}, {recFull, data[:pageSize/2]}, }, @@ -119,22 +119,22 @@ var testReaderCases = []struct { }, // Invalid orders of record types. { - t: []record{{recMiddle, data[:200]}}, + t: []rec{{recMiddle, data[:200]}}, fail: true, }, { - t: []record{{recLast, data[:200]}}, + t: []rec{{recLast, data[:200]}}, fail: true, }, { - t: []record{ + t: []rec{ {recFirst, data[:200]}, {recFull, data[200:400]}, }, fail: true, }, { - t: []record{ + t: []rec{ {recFirst, data[:100]}, {recMiddle, data[100:200]}, {recFull, data[200:400]}, @@ -143,7 +143,7 @@ var testReaderCases = []struct { }, // Non-zero data after page termination. { - t: []record{ + t: []rec{ {recFull, data[:100]}, {recPageTerm, append(make([]byte, pageSize-recordHeaderSize-102), 1)}, }, diff --git a/storage/remote/wal_watcher.go b/tsdb/wal/watcher.go similarity index 68% rename from storage/remote/wal_watcher.go rename to tsdb/wal/watcher.go index f1b905612..2167e3324 100644 --- a/storage/remote/wal_watcher.go +++ b/tsdb/wal/watcher.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package remote +package wal import ( "fmt" @@ -28,81 +28,44 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/fileutil" - "github.com/prometheus/prometheus/tsdb/wal" - "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/record" ) const ( readPeriod = 10 * time.Millisecond checkpointPeriod = 5 * time.Second segmentCheckPeriod = 100 * time.Millisecond + consumer = "consumer" ) -var ( - watcherRecordsRead = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "wal_watcher", - Name: "records_read_total", - Help: "Number of records read by the WAL watcher from the WAL.", - }, - []string{queue, "type"}, - ) - watcherRecordDecodeFails = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "wal_watcher", - Name: "record_decode_failures_total", - Help: "Number of records read by the WAL watcher that resulted in an error when decoding.", - }, - []string{queue}, - ) - watcherSamplesSentPreTailing = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "wal_watcher", - Name: "samples_sent_pre_tailing_total", - Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.", - }, - []string{queue}, - ) - watcherCurrentSegment = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "prometheus", - Subsystem: "wal_watcher", - Name: "current_segment", - Help: "Current segment the WAL watcher is reading records from.", - }, - []string{queue}, - ) - liveReaderMetrics = wal.NewLiveReaderMetrics(prometheus.DefaultRegisterer) -) - -func init() { - prometheus.MustRegister(watcherRecordsRead) - prometheus.MustRegister(watcherRecordDecodeFails) - prometheus.MustRegister(watcherSamplesSentPreTailing) - prometheus.MustRegister(watcherCurrentSegment) -} - -type writeTo interface { - Append([]tsdb.RefSample) bool - StoreSeries([]tsdb.RefSeries, int) +// WriteTo is an interface used by the Watcher to send the samples it's read +// from the WAL on to somewhere else. +type WriteTo interface { + Append([]record.RefSample) bool + StoreSeries([]record.RefSeries, int) SeriesReset(int) } -// WALWatcher watches the TSDB WAL for a given WriteTo. -type WALWatcher struct { +type WatcherMetrics struct { + recordsRead *prometheus.CounterVec + recordDecodeFails *prometheus.CounterVec + samplesSentPreTailing *prometheus.CounterVec + currentSegment *prometheus.GaugeVec +} + +// Watcher watches the TSDB WAL for a given WriteTo. +type Watcher struct { name string - writer writeTo + writer WriteTo logger log.Logger walDir string lastCheckpoint string + metrics *WatcherMetrics + readerMetrics *liveReaderMetrics - startTime int64 + StartTime int64 recordsReadMetric *prometheus.CounterVec recordDecodeFailsMetric prometheus.Counter @@ -113,66 +76,120 @@ type WALWatcher struct { done chan struct{} // For testing, stop when we hit this segment. - maxSegment int + MaxSegment int } -// NewWALWatcher creates a new WAL watcher for a given WriteTo. -func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string) *WALWatcher { +func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { + m := &WatcherMetrics{ + recordsRead: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "records_read_total", + Help: "Number of records read by the WAL watcher from the WAL.", + }, + []string{consumer, "type"}, + ), + recordDecodeFails: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "record_decode_failures_total", + Help: "Number of records read by the WAL watcher that resulted in an error when decoding.", + }, + []string{consumer}, + ), + samplesSentPreTailing: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "samples_sent_pre_tailing_total", + Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.", + }, + []string{consumer}, + ), + currentSegment: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "current_segment", + Help: "Current segment the WAL watcher is reading records from.", + }, + []string{consumer}, + ), + } + + if reg != nil { + _ = reg.Register(m.recordsRead) + _ = reg.Register(m.recordDecodeFails) + _ = reg.Register(m.samplesSentPreTailing) + _ = reg.Register(m.currentSegment) + } + + return m +} + +// NewWatcher creates a new WAL watcher for a given WriteTo. +func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { if logger == nil { logger = log.NewNopLogger() } - return &WALWatcher{ - logger: logger, - writer: writer, - walDir: path.Join(walDir, "wal"), - name: name, - quit: make(chan struct{}), - done: make(chan struct{}), + return &Watcher{ + logger: logger, + writer: writer, + metrics: metrics, + readerMetrics: NewLiveReaderMetrics(reg), + walDir: path.Join(walDir, "wal"), + name: name, + quit: make(chan struct{}), + done: make(chan struct{}), - maxSegment: -1, + MaxSegment: -1, } } -func (w *WALWatcher) setMetrics() { +func (w *Watcher) setMetrics() { // Setup the WAL Watchers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, // stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig. - w.recordsReadMetric = watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: w.name}) - w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.name) - w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.name) - w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.name) + if w.metrics != nil { + w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name}) + w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name) + w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name) + w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name) + } } -// Start the WALWatcher. -func (w *WALWatcher) Start() { +// Start the Watcher. +func (w *Watcher) Start() { w.setMetrics() level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name) go w.loop() } -// Stop the WALWatcher. -func (w *WALWatcher) Stop() { +// Stop the Watcher. +func (w *Watcher) Stop() { close(w.quit) <-w.done // Records read metric has series and samples. - watcherRecordsRead.DeleteLabelValues(w.name, "series") - watcherRecordsRead.DeleteLabelValues(w.name, "samples") - watcherRecordDecodeFails.DeleteLabelValues(w.name) - watcherSamplesSentPreTailing.DeleteLabelValues(w.name) - watcherCurrentSegment.DeleteLabelValues(w.name) + w.metrics.recordsRead.DeleteLabelValues(w.name, "series") + w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") + w.metrics.recordDecodeFails.DeleteLabelValues(w.name) + w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) + w.metrics.currentSegment.DeleteLabelValues(w.name) level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) } -func (w *WALWatcher) loop() { +func (w *Watcher) loop() { defer close(w.done) // We may encounter failures processing the WAL; we should wait and retry. for !isClosed(w.quit) { - w.startTime = timestamp.FromTime(time.Now()) - if err := w.run(); err != nil { + w.StartTime = timestamp.FromTime(time.Now()) + if err := w.Run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } @@ -184,15 +201,17 @@ func (w *WALWatcher) loop() { } } -func (w *WALWatcher) run() error { +// Run the watcher, which will tail the WAL until the quit channel is closed +// or an error case is hit. +func (w *Watcher) Run() error { _, lastSegment, err := w.firstAndLast() if err != nil { return errors.Wrap(err, "wal.Segments") } // Backfill from the checkpoint first if it exists. - lastCheckpoint, checkpointIndex, err := tsdb.LastCheckpoint(w.walDir) - if err != nil && err != tsdb.ErrNotFound { + lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) + if err != nil && err != record.ErrNotFound { return errors.Wrap(err, "tsdb.LastCheckpoint") } @@ -220,7 +239,7 @@ func (w *WALWatcher) run() error { } // For testing: stop when you hit a specific segment. - if currentSegment == w.maxSegment { + if currentSegment == w.MaxSegment { return nil } @@ -231,7 +250,7 @@ func (w *WALWatcher) run() error { } // findSegmentForIndex finds the first segment greater than or equal to index. -func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { +func (w *Watcher) findSegmentForIndex(index int) (int, error) { refs, err := w.segments(w.walDir) if err != nil { return -1, err @@ -246,7 +265,7 @@ func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { return -1, errors.New("failed to find segment for index") } -func (w *WALWatcher) firstAndLast() (int, int, error) { +func (w *Watcher) firstAndLast() (int, int, error) { refs, err := w.segments(w.walDir) if err != nil { return -1, -1, err @@ -260,7 +279,7 @@ func (w *WALWatcher) firstAndLast() (int, int, error) { // Copied from tsdb/wal/wal.go so we do not have to open a WAL. // Plan is to move WAL watcher to TSDB and dedupe these implementations. -func (w *WALWatcher) segments(dir string) ([]int, error) { +func (w *Watcher) segments(dir string) ([]int, error) { files, err := fileutil.ReadDir(dir) if err != nil { return nil, err @@ -287,14 +306,14 @@ func (w *WALWatcher) segments(dir string) ([]int, error) { // Use tail true to indicate that the reader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. -func (w *WALWatcher) watch(segmentNum int, tail bool) error { - segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, segmentNum)) +func (w *Watcher) watch(segmentNum int, tail bool) error { + segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum)) if err != nil { return err } defer segment.Close() - reader := wal.NewLiveReader(w.logger, liveReaderMetrics, segment) + reader := NewLiveReader(w.logger, w.readerMetrics, segment) readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() @@ -382,9 +401,9 @@ func (w *WALWatcher) watch(segmentNum int, tail bool) error { } } -func (w *WALWatcher) garbageCollectSeries(segmentNum int) error { - dir, _, err := tsdb.LastCheckpoint(w.walDir) - if err != nil && err != tsdb.ErrNotFound { +func (w *Watcher) garbageCollectSeries(segmentNum int) error { + dir, _, err := LastCheckpoint(w.walDir) + if err != nil && err != record.ErrNotFound { return errors.Wrap(err, "tsdb.LastCheckpoint") } @@ -414,12 +433,12 @@ func (w *WALWatcher) garbageCollectSeries(segmentNum int) error { return nil } -func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) error { +func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { var ( - dec tsdb.RecordDecoder - series []tsdb.RefSeries - samples []tsdb.RefSample - send []tsdb.RefSample + dec record.Decoder + series []record.RefSeries + samples []record.RefSample + send []record.RefSample ) for r.Next() && !isClosed(w.quit) { @@ -427,7 +446,7 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) e w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() switch dec.Type(rec) { - case tsdb.RecordSeries: + case record.Series: series, err := dec.Series(rec, series[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() @@ -435,7 +454,7 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) e } w.writer.StoreSeries(series, segmentNum) - case tsdb.RecordSamples: + case record.Samples: // If we're not tailing a segment we can ignore any samples records we see. // This speeds up replay of the WAL by > 10x. if !tail { @@ -447,7 +466,7 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) e return err } for _, s := range samples { - if s.T > w.startTime { + if s.T > w.StartTime { send = append(send, s) } } @@ -457,9 +476,9 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) e send = send[:0] } - case tsdb.RecordTombstones: + case record.Tombstones: // noop - case tsdb.RecordInvalid: + case record.Invalid: return errors.New("invalid record") default: @@ -470,15 +489,15 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) e return r.Err() } -func recordType(rt tsdb.RecordType) string { +func recordType(rt record.Type) string { switch rt { - case tsdb.RecordInvalid: + case record.Invalid: return "invalid" - case tsdb.RecordSeries: + case record.Series: return "series" - case tsdb.RecordSamples: + case record.Samples: return "samples" - case tsdb.RecordTombstones: + case record.Tombstones: return "tombstones" default: return "unknown" @@ -486,7 +505,7 @@ func recordType(rt tsdb.RecordType) string { } // Read all the series records from a Checkpoint directory. -func (w *WALWatcher) readCheckpoint(checkpointDir string) error { +func (w *Watcher) readCheckpoint(checkpointDir string) error { level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir) index, err := checkpointNum(checkpointDir) if err != nil { @@ -504,13 +523,13 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { return errors.Wrap(err, "getSegmentSize") } - sr, err := wal.OpenReadSegment(wal.SegmentName(checkpointDir, seg)) + sr, err := OpenReadSegment(SegmentName(checkpointDir, seg)) if err != nil { return errors.Wrap(err, "unable to open segment") } defer sr.Close() - r := wal.NewLiveReader(w.logger, liveReaderMetrics, sr) + r := NewLiveReader(w.logger, w.readerMetrics, sr) if err := w.readSegment(r, index, false); err != io.EOF && err != nil { return errors.Wrap(err, "readSegment") } @@ -543,7 +562,7 @@ func checkpointNum(dir string) (int, error) { // Get size of segment. func getSegmentSize(dir string, index int) (int64, error) { i := int64(-1) - fi, err := os.Stat(wal.SegmentName(dir, index)) + fi, err := os.Stat(SegmentName(dir, index)) if err == nil { i = fi.Size() } diff --git a/storage/remote/wal_watcher_test.go b/tsdb/wal/watcher_test.go similarity index 82% rename from storage/remote/wal_watcher_test.go rename to tsdb/wal/watcher_test.go index dc2961c95..86364e3f7 100644 --- a/storage/remote/wal_watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -10,7 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package remote +package wal import ( "fmt" @@ -22,14 +22,15 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb/labels" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/util/testutil" ) var defaultRetryInterval = 100 * time.Millisecond var defaultRetries = 100 +var wMetrics = NewWatcherMetrics(prometheus.DefaultRegisterer) // retry executes f() n times at each interval until it returns true. func retry(t *testing.T, interval time.Duration, n int, f func() bool) { @@ -51,12 +52,12 @@ type writeToMock struct { seriesSegmentIndexes map[uint64]int } -func (wtm *writeToMock) Append(s []tsdb.RefSample) bool { +func (wtm *writeToMock) Append(s []record.RefSample) bool { wtm.samplesAppended += len(s) return true } -func (wtm *writeToMock) StoreSeries(series []tsdb.RefSeries, index int) { +func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { wtm.seriesLock.Lock() defer wtm.seriesLock.Unlock() for _, s := range series { @@ -104,14 +105,14 @@ func TestTailSamples(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress) + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) testutil.Ok(t, err) // Write to the initial segment then checkpoint. for i := 0; i < seriesCount; i++ { ref := i + 100 - series := enc.Series([]tsdb.RefSeries{ + series := enc.Series([]record.RefSeries{ { Ref: uint64(ref), Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, @@ -121,7 +122,7 @@ func TestTailSamples(t *testing.T) { for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ + sample := enc.Samples([]record.RefSample{ { Ref: uint64(inner), T: int64(now.UnixNano()) + 1, @@ -137,17 +138,17 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - watcher.startTime = now.UnixNano() + watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher.StartTime = now.UnixNano() // Set the Watcher's metrics so they're not nil pointers. watcher.setMetrics() for i := first; i <= last; i++ { - segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i)) + segment, err := OpenReadSegment(SegmentName(watcher.walDir, i)) testutil.Ok(t, err) defer segment.Close() - reader := wal.NewLiveReader(nil, liveReaderMetrics, segment) + reader := NewLiveReader(nil, NewLiveReaderMetrics(prometheus.DefaultRegisterer), segment) // Use tail true so we can ensure we got the right number of samples. watcher.readSegment(reader, i, true) } @@ -177,15 +178,15 @@ func TestReadToEndNoCheckpoint(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress) + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) testutil.Ok(t, err) var recs [][]byte - enc := tsdb.RecordEncoder{} + enc := record.Encoder{} for i := 0; i < seriesCount; i++ { - series := enc.Series([]tsdb.RefSeries{ + series := enc.Series([]record.RefSeries{ { Ref: uint64(i), Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, @@ -193,7 +194,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { }, nil) recs = append(recs, series) for j := 0; j < samplesCount; j++ { - sample := enc.Samples([]tsdb.RefSample{ + sample := enc.Samples([]record.RefSample{ { Ref: uint64(j), T: int64(i), @@ -216,7 +217,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) + watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) go watcher.Start() expected := seriesCount @@ -246,14 +247,14 @@ func TestReadToEndWithCheckpoint(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, segmentSize, compress) + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, segmentSize, compress) testutil.Ok(t, err) // Write to the initial segment then checkpoint. for i := 0; i < seriesCount; i++ { ref := i + 100 - series := enc.Series([]tsdb.RefSeries{ + series := enc.Series([]record.RefSeries{ { Ref: uint64(ref), Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, @@ -263,7 +264,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ + sample := enc.Samples([]record.RefSample{ { Ref: uint64(inner), T: int64(i), @@ -274,12 +275,12 @@ func TestReadToEndWithCheckpoint(t *testing.T) { } } - tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0) + Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0) w.Truncate(1) // Write more records after checkpointing. for i := 0; i < seriesCount; i++ { - series := enc.Series([]tsdb.RefSeries{ + series := enc.Series([]record.RefSeries{ { Ref: uint64(i), Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, @@ -288,7 +289,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { testutil.Ok(t, w.Log(series)) for j := 0; j < samplesCount; j++ { - sample := enc.Samples([]tsdb.RefSample{ + sample := enc.Samples([]record.RefSample{ { Ref: uint64(j), T: int64(i), @@ -302,7 +303,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) + watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) go watcher.Start() expected := seriesCount * 2 @@ -330,16 +331,16 @@ func TestReadCheckpoint(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - os.Create(wal.SegmentName(wdir, 30)) + os.Create(SegmentName(wdir, 30)) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, compress) + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) testutil.Ok(t, err) // Write to the initial segment then checkpoint. for i := 0; i < seriesCount; i++ { ref := i + 100 - series := enc.Series([]tsdb.RefSeries{ + series := enc.Series([]record.RefSeries{ { Ref: uint64(ref), Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, @@ -349,7 +350,7 @@ func TestReadCheckpoint(t *testing.T) { for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ + sample := enc.Samples([]record.RefSample{ { Ref: uint64(inner), T: int64(i), @@ -359,7 +360,7 @@ func TestReadCheckpoint(t *testing.T) { testutil.Ok(t, w.Log(sample)) } } - tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) + Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) w.Truncate(32) // Start read after checkpoint, no more data written. @@ -367,8 +368,7 @@ func TestReadCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - // watcher. + watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) go watcher.Start() expectedSeries := seriesCount @@ -398,15 +398,15 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, pageSize, compress) + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, pageSize, compress) testutil.Ok(t, err) // Write a bunch of data. for i := 0; i < segments; i++ { for j := 0; j < seriesCount; j++ { ref := j + (i * 100) - series := enc.Series([]tsdb.RefSeries{ + series := enc.Series([]record.RefSeries{ { Ref: uint64(ref), Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", j)}}, @@ -416,7 +416,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { for k := 0; k < samplesCount; k++ { inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ + sample := enc.Samples([]record.RefSample{ { Ref: uint64(inner), T: int64(i), @@ -433,18 +433,18 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { err = os.Mkdir(checkpointDir, 0777) testutil.Ok(t, err) for i := 0; i <= 4; i++ { - err := os.Rename(wal.SegmentName(dir+"/wal", i), wal.SegmentName(checkpointDir, i)) + err := os.Rename(SegmentName(dir+"/wal", i), SegmentName(checkpointDir, i)) testutil.Ok(t, err) } wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - watcher.maxSegment = -1 + watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. watcher.setMetrics() - lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir) + lastCheckpoint, _, err := LastCheckpoint(watcher.walDir) testutil.Ok(t, err) err = watcher.readCheckpoint(lastCheckpoint) @@ -477,14 +477,14 @@ func TestCheckpointSeriesReset(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, segmentSize, tc.compress) + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, segmentSize, tc.compress) testutil.Ok(t, err) // Write to the initial segment, then checkpoint later. for i := 0; i < seriesCount; i++ { ref := i + 100 - series := enc.Series([]tsdb.RefSeries{ + series := enc.Series([]record.RefSeries{ { Ref: uint64(ref), Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, @@ -494,7 +494,7 @@ func TestCheckpointSeriesReset(t *testing.T) { for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) - sample := enc.Samples([]tsdb.RefSample{ + sample := enc.Samples([]record.RefSample{ { Ref: uint64(inner), T: int64(i), @@ -509,8 +509,8 @@ func TestCheckpointSeriesReset(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - watcher.maxSegment = -1 + watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher.MaxSegment = -1 go watcher.Start() expected := seriesCount @@ -519,13 +519,13 @@ func TestCheckpointSeriesReset(t *testing.T) { }) testutil.Equals(t, seriesCount, wt.checkNumLabels()) - _, err = tsdb.Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0) + _, err = Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0) testutil.Ok(t, err) err = w.Truncate(5) testutil.Ok(t, err) - _, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal")) + _, cpi, err := LastCheckpoint(path.Join(dir, "wal")) testutil.Ok(t, err) err = watcher.garbageCollectSeries(cpi + 1) testutil.Ok(t, err) diff --git a/tsdb/wal_test.go b/tsdb/wal_test.go index 71271ae5c..3a758f2c4 100644 --- a/tsdb/wal_test.go +++ b/tsdb/wal_test.go @@ -29,6 +29,8 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/util/testutil" ) @@ -95,10 +97,10 @@ func TestSegmentWAL_Truncate(t *testing.T) { w.segmentSize = 10000 for i := 0; i < numMetrics; i += batch { - var rs []RefSeries + var rs []record.RefSeries for j, s := range series[i : i+batch] { - rs = append(rs, RefSeries{Labels: s, Ref: uint64(i+j) + 1}) + rs = append(rs, record.RefSeries{Labels: s, Ref: uint64(i+j) + 1}) } err := w.LogSeries(rs) testutil.Ok(t, err) @@ -125,11 +127,11 @@ func TestSegmentWAL_Truncate(t *testing.T) { err = w.Truncate(1000, keepf) testutil.Ok(t, err) - var expected []RefSeries + var expected []record.RefSeries for i := 1; i <= numMetrics; i++ { if i%2 == 1 || uint64(i) >= boundarySeries { - expected = append(expected, RefSeries{Ref: uint64(i), Labels: series[i-1]}) + expected = append(expected, record.RefSeries{Ref: uint64(i), Labels: series[i-1]}) } } @@ -143,10 +145,10 @@ func TestSegmentWAL_Truncate(t *testing.T) { w, err = OpenSegmentWAL(dir, nil, 0, nil) testutil.Ok(t, err) - var readSeries []RefSeries + var readSeries []record.RefSeries r := w.Reader() - testutil.Ok(t, r.Read(func(s []RefSeries) { + testutil.Ok(t, r.Read(func(s []record.RefSeries) { readSeries = append(readSeries, s...) }, nil, nil)) @@ -172,9 +174,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { }() var ( - recordedSeries [][]RefSeries - recordedSamples [][]RefSample - recordedDeletes [][]Stone + recordedSeries [][]record.RefSeries + recordedSamples [][]record.RefSample + recordedDeletes [][]tombstones.Stone ) var totalSamples int @@ -190,29 +192,29 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { r := w.Reader() var ( - resultSeries [][]RefSeries - resultSamples [][]RefSample - resultDeletes [][]Stone + resultSeries [][]record.RefSeries + resultSamples [][]record.RefSample + resultDeletes [][]tombstones.Stone ) - serf := func(series []RefSeries) { + serf := func(series []record.RefSeries) { if len(series) > 0 { - clsets := make([]RefSeries, len(series)) + clsets := make([]record.RefSeries, len(series)) copy(clsets, series) resultSeries = append(resultSeries, clsets) } } - smplf := func(smpls []RefSample) { + smplf := func(smpls []record.RefSample) { if len(smpls) > 0 { - csmpls := make([]RefSample, len(smpls)) + csmpls := make([]record.RefSample, len(smpls)) copy(csmpls, smpls) resultSamples = append(resultSamples, csmpls) } } - delf := func(stones []Stone) { + delf := func(stones []tombstones.Stone) { if len(stones) > 0 { - cst := make([]Stone, len(stones)) + cst := make([]tombstones.Stone, len(stones)) copy(cst, stones) resultDeletes = append(resultDeletes, cst) } @@ -228,11 +230,11 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { // Insert in batches and generate different amounts of samples for each. for i := 0; i < len(series); i += stepSize { - var samples []RefSample - var stones []Stone + var samples []record.RefSample + var stones []tombstones.Stone for j := 0; j < i*10; j++ { - samples = append(samples, RefSample{ + samples = append(samples, record.RefSample{ Ref: uint64(j % 10000), T: int64(j * 2), V: rand.Float64(), @@ -241,13 +243,13 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { for j := 0; j < i*20; j++ { ts := rand.Int63() - stones = append(stones, Stone{rand.Uint64(), Intervals{{ts, ts + rand.Int63n(10000)}}}) + stones = append(stones, tombstones.Stone{Ref: rand.Uint64(), Intervals: tombstones.Intervals{{Mint: ts, Maxt: ts + rand.Int63n(10000)}}}) } lbls := series[i : i+stepSize] - series := make([]RefSeries, 0, len(series)) + series := make([]record.RefSeries, 0, len(series)) for j, l := range lbls { - series = append(series, RefSeries{ + series = append(series, record.RefSeries{ Ref: uint64(i + j), Labels: l, }) @@ -382,8 +384,8 @@ func TestWALRestoreCorrupted(t *testing.T) { w, err := OpenSegmentWAL(dir, nil, 0, nil) testutil.Ok(t, err) - testutil.Ok(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) - testutil.Ok(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) + testutil.Ok(t, w.LogSamples([]record.RefSample{{T: 1, V: 2}})) + testutil.Ok(t, w.LogSamples([]record.RefSample{{T: 2, V: 3}})) testutil.Ok(t, w.cut()) @@ -392,8 +394,8 @@ func TestWALRestoreCorrupted(t *testing.T) { // Hopefully cut will complete by 2 seconds. time.Sleep(2 * time.Second) - testutil.Ok(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) - testutil.Ok(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) + testutil.Ok(t, w.LogSamples([]record.RefSample{{T: 3, V: 4}})) + testutil.Ok(t, w.LogSamples([]record.RefSample{{T: 5, V: 6}})) testutil.Ok(t, w.Close()) @@ -414,24 +416,24 @@ func TestWALRestoreCorrupted(t *testing.T) { r := w2.Reader() - serf := func(l []RefSeries) { + serf := func(l []record.RefSeries) { testutil.Equals(t, 0, len(l)) } // Weird hack to check order of reads. i := 0 - samplf := func(s []RefSample) { + samplf := func(s []record.RefSample) { if i == 0 { - testutil.Equals(t, []RefSample{{T: 1, V: 2}}, s) + testutil.Equals(t, []record.RefSample{{T: 1, V: 2}}, s) i++ } else { - testutil.Equals(t, []RefSample{{T: 99, V: 100}}, s) + testutil.Equals(t, []record.RefSample{{T: 99, V: 100}}, s) } } testutil.Ok(t, r.Read(serf, samplf, nil)) - testutil.Ok(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) + testutil.Ok(t, w2.LogSamples([]record.RefSample{{T: 99, V: 100}})) testutil.Ok(t, w2.Close()) // We should see the first valid entry and the new one, everything after @@ -482,23 +484,23 @@ func TestMigrateWAL_Fuzz(t *testing.T) { testutil.Ok(t, err) // Write some data. - testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + testutil.Ok(t, oldWAL.LogSeries([]record.RefSeries{ {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, })) - testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + testutil.Ok(t, oldWAL.LogSamples([]record.RefSample{ {Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}, })) - testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + testutil.Ok(t, oldWAL.LogSeries([]record.RefSeries{ {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, })) - testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + testutil.Ok(t, oldWAL.LogSamples([]record.RefSample{ {Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}, })) - testutil.Ok(t, oldWAL.LogDeletes([]Stone{ - {ref: 1, intervals: []Interval{{100, 200}}}, + testutil.Ok(t, oldWAL.LogDeletes([]tombstones.Stone{ + {Ref: 1, Intervals: []tombstones.Interval{{Mint: 100, Maxt: 200}}}, })) testutil.Ok(t, oldWAL.Close()) @@ -510,8 +512,8 @@ func TestMigrateWAL_Fuzz(t *testing.T) { testutil.Ok(t, err) // We can properly write some new data after migration. - var enc RecordEncoder - testutil.Ok(t, w.Log(enc.Samples([]RefSample{ + var enc record.Encoder + testutil.Ok(t, w.Log(enc.Samples([]record.RefSample{ {Ref: 500, T: 1, V: 1}, }, nil))) @@ -523,21 +525,21 @@ func TestMigrateWAL_Fuzz(t *testing.T) { r := wal.NewReader(sr) var res []interface{} - var dec RecordDecoder + var dec record.Decoder for r.Next() { rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: s, err := dec.Series(rec, nil) testutil.Ok(t, err) res = append(res, s) - case RecordSamples: + case record.Samples: s, err := dec.Samples(rec, nil) testutil.Ok(t, err) res = append(res, s) - case RecordTombstones: + case record.Tombstones: s, err := dec.Tombstones(rec, nil) testutil.Ok(t, err) res = append(res, s) @@ -548,17 +550,17 @@ func TestMigrateWAL_Fuzz(t *testing.T) { testutil.Ok(t, r.Err()) testutil.Equals(t, []interface{}{ - []RefSeries{ + []record.RefSeries{ {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, }, - []RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, - []RefSeries{ + []record.RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, + []record.RefSeries{ {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, }, - []RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, - []Stone{{ref: 1, intervals: []Interval{{100, 200}}}}, - []RefSample{{Ref: 500, T: 1, V: 1}}, + []record.RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, + []tombstones.Stone{{Ref: 1, Intervals: []tombstones.Interval{{Mint: 100, Maxt: 200}}}}, + []record.RefSample{{Ref: 500, T: 1, V: 1}}, }, res) // Migrating an already migrated WAL shouldn't do anything.