From 2d3e8d4c4a1257029192e774ea12b7de34c12490 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Mon, 9 Dec 2024 12:39:23 +0000 Subject: [PATCH] Add WAL segment versioning; add flag (only v1 allowed). Implementation for https://github.com/prometheus/proposals/pull/40 Signed-off-by: bwplotka --- cmd/prometheus/main.go | 54 +- tsdb/agent/db.go | 70 ++- tsdb/db.go | 32 +- tsdb/db_test.go | 28 +- tsdb/docs/format/wal.md | 45 +- tsdb/head.go | 10 +- tsdb/head_test.go | 6 +- tsdb/wlog/checkpoint.go | 6 +- tsdb/wlog/checkpoint_test.go | 358 +++++++------ tsdb/wlog/live_reader.go | 4 + tsdb/wlog/reader.go | 21 +- tsdb/wlog/reader_test.go | 242 ++++----- tsdb/wlog/watcher.go | 98 ++-- tsdb/wlog/watcher_test.go | 993 ++++++++++++++++++----------------- tsdb/wlog/wlog.go | 405 +++++++++----- tsdb/wlog/wlog_test.go | 478 +++++++++-------- 16 files changed, 1583 insertions(+), 1267 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f8a4ecd8c9..dcc9f8f254 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -154,21 +154,21 @@ func init() { // serverOnlyFlag creates server-only kingpin flag. func serverOnlyFlag(app *kingpin.Application, name, help string) *kingpin.FlagClause { return app.Flag(name, fmt.Sprintf("%s Use with server mode only.", help)). - PreAction(func(parseContext *kingpin.ParseContext) error { - // This will be invoked only if flag is actually provided by user. - serverOnlyFlags = append(serverOnlyFlags, "--"+name) - return nil - }) + PreAction(func(parseContext *kingpin.ParseContext) error { + // This will be invoked only if flag is actually provided by user. + serverOnlyFlags = append(serverOnlyFlags, "--"+name) + return nil + }) } // agentOnlyFlag creates agent-only kingpin flag. func agentOnlyFlag(app *kingpin.Application, name, help string) *kingpin.FlagClause { return app.Flag(name, fmt.Sprintf("%s Use with agent mode only.", help)). - PreAction(func(parseContext *kingpin.ParseContext) error { - // This will be invoked only if flag is actually provided by user. - agentOnlyFlags = append(agentOnlyFlags, "--"+name) - return nil - }) + PreAction(func(parseContext *kingpin.ParseContext) error { + // This will be invoked only if flag is actually provided by user. + agentOnlyFlags = append(agentOnlyFlags, "--"+name) + return nil + }) } type flagConfig struct { @@ -427,6 +427,9 @@ func main() { serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL."). Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd)) + serverOnlyFlag(a, "storage.tsdb.wal-version", fmt.Sprintf("Version for the new WAL segments. Supported versions: %v", wlog.ReleasedSupportedSegmentVersions())). + Default(fmt.Sprintf("%v", wlog.DefaultSegmentVersion)).Uint32Var(&cfg.tsdb.WALSegmentVersion) + serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental."). Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize) @@ -443,12 +446,15 @@ func main() { "Size at which to split WAL segment files. Example: 100MB"). Hidden().PlaceHolder("").BytesVar(&cfg.agent.WALSegmentSize) - agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL."). + agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the new WAL segments."). Default("true").BoolVar(&cfg.agent.WALCompression) agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL."). Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.agent.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd)) + agentOnlyFlag(a, "storage.agent.wal-version", fmt.Sprintf("Version for the new WAL segments. Supported versions: %v", wlog.ReleasedSupportedSegmentVersions())). + Default(fmt.Sprintf("%v", wlog.DefaultSegmentVersion)).Uint32Var(&cfg.agent.WALSegmentVersion) + agentOnlyFlag(a, "storage.agent.wal-truncate-frequency", "The frequency at which to truncate the WAL and remove old data."). Hidden().PlaceHolder("").SetValue(&cfg.agent.TruncateFrequency) @@ -1229,6 +1235,10 @@ func main() { g.Add( func() error { logger.Info("Starting TSDB ...") + if !wlog.SegmentVersion(cfg.tsdb.WALSegmentVersion).IsReleased() { + return fmt.Errorf("flag 'storage.tsdb.wal-version' was set to unsupported WAL segment version %v; supported versions %v", cfg.tsdb.WALSegmentVersion, wlog.ReleasedSupportedSegmentVersions()) + } + if cfg.tsdb.WALSegmentSize != 0 { if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 { return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB") @@ -1285,6 +1295,10 @@ func main() { g.Add( func() error { logger.Info("Starting WAL storage ...") + if !wlog.SegmentVersion(cfg.agent.WALSegmentVersion).IsReleased() { + return fmt.Errorf("flag 'storage.agent.wal-version' was set to unsupported WAL segment version %v; supported versions %v", cfg.tsdb.WALSegmentVersion, wlog.ReleasedSupportedSegmentVersions()) + } + if cfg.agent.WALSegmentSize != 0 { if cfg.agent.WALSegmentSize < 10*1024*1024 || cfg.agent.WALSegmentSize > 256*1024*1024 { return errors.New("flag 'storage.agent.wal-segment-size' must be set between 10MB and 256MB") @@ -1492,7 +1506,7 @@ func reloadConfig(filename string, enableExemplarStorage bool, logger *slog.Logg func startsOrEndsWithQuote(s string) bool { return strings.HasPrefix(s, "\"") || strings.HasPrefix(s, "'") || - strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'") + strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'") } // compileCORSRegexString compiles given string and adds anchors. @@ -1780,6 +1794,7 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) { // This is required as tsdb.Option fields are unit agnostic (time). type tsdbOptions struct { WALSegmentSize units.Base2Bytes + WALSegmentVersion uint32 MaxBlockChunkSegmentSize units.Base2Bytes RetentionDuration model.Duration MaxBytes units.Base2Bytes @@ -1804,12 +1819,15 @@ type tsdbOptions struct { func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { return tsdb.Options{ - WALSegmentSize: int(opts.WALSegmentSize), + WALSegment: wlog.SegmentOptions{ + Version: wlog.SegmentVersion(opts.WALSegmentVersion), + Compression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), + Size: int(opts.WALSegmentSize), + }, MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize), RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), MaxBytes: int64(opts.MaxBytes), NoLockfile: opts.NoLockfile, - WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, SamplesPerChunk: opts.SamplesPerChunk, StripeSize: opts.StripeSize, @@ -1833,6 +1851,7 @@ type agentOptions struct { WALSegmentSize units.Base2Bytes WALCompression bool WALCompressionType string + WALSegmentVersion uint32 StripeSize int TruncateFrequency model.Duration MinWALTime, MaxWALTime model.Duration @@ -1845,8 +1864,11 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option outOfOrderTimeWindow = 0 } return agent.Options{ - WALSegmentSize: int(opts.WALSegmentSize), - WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), + WALSegment: wlog.SegmentOptions{ + Version: wlog.SegmentVersion(opts.WALSegmentVersion), + Compression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), + Size: int(opts.WALSegmentSize), + }, StripeSize: opts.StripeSize, TruncateFrequency: time.Duration(opts.TruncateFrequency), MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 3863e6cd99..9f528c2c22 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -60,13 +60,8 @@ var ( // Options of the WAL storage. type Options struct { - // Segments (wal files) max size. - // WALSegmentSize <= 0, segment size is default size. - // WALSegmentSize > 0, segment size is WALSegmentSize. - WALSegmentSize int - - // WALCompression configures the compression type to use on records in the WAL. - WALCompression wlog.CompressionType + // WALSegment represents WAL segment options. + WALSegment wlog.SegmentOptions // StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance. StripeSize int @@ -89,8 +84,7 @@ type Options struct { // millisecond-precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wlog.DefaultSegmentSize, - WALCompression: wlog.CompressionNone, + WALSegment: wlog.SegmentOptions{}, // wlog.New will set the correct defaults StripeSize: tsdb.DefaultStripeSize, TruncateFrequency: DefaultTruncateFrequency, MinWALTime: DefaultMinWALTime, @@ -266,7 +260,7 @@ func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir str // remote_write expects WAL to be stored in a "wal" subdirectory of the main storage. dir = filepath.Join(dir, "wal") - w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression) + w, err := wlog.New(l, reg, dir, opts.WALSegment) if err != nil { return nil, fmt.Errorf("creating WAL: %w", err) } @@ -326,14 +320,6 @@ func validateOptions(opts *Options) *Options { if opts == nil { opts = DefaultOptions() } - if opts.WALSegmentSize <= 0 { - opts.WALSegmentSize = wlog.DefaultSegmentSize - } - - if opts.WALCompression == "" { - opts.WALCompression = wlog.CompressionNone - } - // Revert StripeSize to DefaultStripeSize if StripeSize is either 0 or not a power of 2. if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) { opts.StripeSize = tsdb.DefaultStripeSize @@ -389,14 +375,14 @@ func (db *DB) replayWAL() error { } // Find the last segment. - _, last, err := wlog.Segments(db.wal.Dir()) + _, last, err := wlog.SegmentsRange(db.wal.Dir()) if err != nil { return fmt.Errorf("finding WAL segments: %w", err) } // Backfill segments from the most recent checkpoint onwards. for i := startFrom; i <= last; i++ { - seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i)) + seg, err := wlog.OpenReadSegmentByIndex(db.wal.Dir(), i) if err != nil { return fmt.Errorf("open WAL segment: %d: %w", i, err) } @@ -443,10 +429,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H series := seriesPool.Get()[:0] series, err = dec.Series(rec, series) if err != nil { + i, v := r.Segment() errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("decode series: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), + Err: fmt.Errorf("decode series: %w", err), + SegmentIndex: i, + SegmentVersion: v, + Offset: r.Offset(), } return } @@ -455,10 +443,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H samples := samplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { + i, v := r.Segment() errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("decode samples: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), + Err: fmt.Errorf("decode samples: %w", err), + SegmentIndex: i, + SegmentVersion: v, + Offset: r.Offset(), } return } @@ -467,10 +457,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H histograms := histogramsPool.Get()[:0] histograms, err = dec.HistogramSamples(rec, histograms) if err != nil { + i, v := r.Segment() errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("decode histogram samples: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), + Err: fmt.Errorf("decode histogram samples: %w", err), + SegmentIndex: i, + SegmentVersion: v, + Offset: r.Offset(), } return } @@ -479,10 +471,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H floatHistograms := floatHistogramsPool.Get()[:0] floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) if err != nil { + i, v := r.Segment() errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("decode float histogram samples: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), + Err: fmt.Errorf("decode float histogram samples: %w", err), + SegmentIndex: i, + SegmentVersion: v, + Offset: r.Offset(), } return } @@ -493,10 +487,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H // stripeSeries.exemplars in the next block by using setLatestExemplar. continue default: + i, v := r.Segment() errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("invalid record type %v", dec.Type(rec)), - Segment: r.Segment(), - Offset: r.Offset(), + Err: fmt.Errorf("invalid record type %v", dec.Type(rec)), + SegmentIndex: i, + SegmentVersion: v, + Offset: r.Offset(), } } } @@ -632,7 +628,7 @@ func (db *DB) truncate(mint int64) error { db.gc(mint) db.logger.Info("series GC completed", "duration", time.Since(start)) - first, last, err := wlog.Segments(db.wal.Dir()) + first, last, err := wlog.SegmentsRange(db.wal.Dir()) if err != nil { return fmt.Errorf("get segment range: %w", err) } @@ -711,7 +707,7 @@ func (db *DB) gc(mint int64) { deleted := db.series.GC(mint) db.metrics.numActiveSeries.Sub(float64(len(deleted))) - _, last, _ := wlog.Segments(db.wal.Dir()) + _, last, _ := wlog.SegmentsRange(db.wal.Dir()) // We want to keep series records for any newly deleted series // until we've passed the last recorded segment. This prevents diff --git a/tsdb/db.go b/tsdb/db.go index 2cfa4f3638..710f3ce500 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -73,7 +73,7 @@ var ErrNotReady = errors.New("TSDB not ready") // millisecond precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wlog.DefaultSegmentSize, + WALSegment: wlog.SegmentOptions{}, // wlog.New sets the correct defaults. MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), MinBlockDuration: DefaultBlockDuration, @@ -97,11 +97,10 @@ func DefaultOptions() *Options { // Options of the DB storage. type Options struct { - // Segments (wal files) max size. - // WALSegmentSize = 0, segment size is default size. - // WALSegmentSize > 0, segment size is WALSegmentSize. - // WALSegmentSize < 0, wal is disabled. - WALSegmentSize int + // WALSegment represents segment options. + WALSegment wlog.SegmentOptions + // DisableWAL disables WAL. + DisableWAL bool // MaxBlockChunkSegmentSize is the max size of block chunk segment files. // MaxBlockChunkSegmentSize = 0, chunk segment size is default size. @@ -929,14 +928,10 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn } var wal, wbl *wlog.WL - segmentSize := wlog.DefaultSegmentSize + // Wal is enabled. - if opts.WALSegmentSize >= 0 { - // Wal is set to a custom size. - if opts.WALSegmentSize > 0 { - segmentSize = opts.WALSegmentSize - } - wal, err = wlog.NewSize(l, r, walDir, segmentSize, opts.WALCompression) + if !opts.DisableWAL { + wal, err = wlog.New(l, r, walDir, opts.WALSegment) if err != nil { return nil, err } @@ -946,7 +941,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn return nil, err } if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 { - wbl, err = wlog.NewSize(l, r, wblDir, segmentSize, opts.WALCompression) + wbl, err = wlog.New(l, r, wblDir, opts.WALSegment) if err != nil { return nil, err } @@ -1159,14 +1154,9 @@ func (db *DB) ApplyConfig(conf *config.Config) error { case db.head.wbl != nil: // The existing WBL from the disk might have been replayed while OOO was disabled. wblog = db.head.wbl - case !db.oooWasEnabled.Load() && oooTimeWindow > 0 && db.opts.WALSegmentSize >= 0: - segmentSize := wlog.DefaultSegmentSize - // Wal is set to a custom size. - if db.opts.WALSegmentSize > 0 { - segmentSize = db.opts.WALSegmentSize - } + case !db.oooWasEnabled.Load() && oooTimeWindow > 0 && !db.opts.DisableWAL: oooWalDir := filepath.Join(db.dir, wlog.WblDirName) - wblog, err = wlog.NewSize(db.logger, db.registerer, oooWalDir, segmentSize, db.opts.WALCompression) + wblog, err = wlog.New(db.logger, db.registerer, oooWalDir, db.opts.WALSegment) if err != nil { return err } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 306dc4579e..f77f593f11 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -240,7 +240,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { // TestNoPanicAfterWALCorruption ensures that querying the db after a WAL corruption doesn't cause a panic. // https://github.com/prometheus/prometheus/issues/7548 func TestNoPanicAfterWALCorruption(t *testing.T) { - db := openTestDB(t, &Options{WALSegmentSize: 32 * 1024}, nil) + db := openTestDB(t, &Options{WALSegment: wlog.SegmentOptions{Size: 32 * 1024}}, nil) // Append until the first mmapped head chunk. // This is to ensure that all samples can be read from the mmapped chunks when the WAL is corrupted. @@ -1023,10 +1023,10 @@ func TestWALSegmentSizeOptions(t *testing.T) { } // All the full segment files (all but the last) should match the segment size option. for _, f := range files[:len(files)-1] { - require.Equal(t, int64(DefaultOptions().WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) + require.Equal(t, int64(wlog.DefaultSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) } lastFile := files[len(files)-1] - require.Greater(t, int64(DefaultOptions().WALSegmentSize), lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name()) + require.Greater(t, int64(wlog.DefaultSegmentVersion), lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name()) }, // Custom Wal Size. 2 * 32 * 1024: func(dbDir string, segmentSize int) { @@ -1061,7 +1061,7 @@ func TestWALSegmentSizeOptions(t *testing.T) { for segmentSize, testFunc := range tests { t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) { opts := DefaultOptions() - opts.WALSegmentSize = segmentSize + opts.WALSegment = wlog.SegmentOptions{Size: segmentSize} db := openTestDB(t, opts, nil) for i := int64(0); i < 155; i++ { @@ -1077,7 +1077,7 @@ func TestWALSegmentSizeOptions(t *testing.T) { dbDir := db.Dir() require.NoError(t, db.Close()) - testFunc(dbDir, opts.WALSegmentSize) + testFunc(dbDir, opts.WALSegment.Size) }) } } @@ -1604,7 +1604,7 @@ func TestSizeRetention(t *testing.T) { require.Equal(t, expSize, actSize, "registered size doesn't match actual disk size") // Create a WAL checkpoint, and compare sizes. - first, last, err := wlog.Segments(db.Head().wal.Dir()) + first, last, err := wlog.SegmentsRange(db.Head().wal.Dir()) require.NoError(t, err) _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(x chunks.HeadSeriesRef) bool { return false }, 0) require.NoError(t, err) @@ -2015,7 +2015,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.SegmentOptions{}) require.NoError(t, err) var enc record.Encoder @@ -2059,7 +2059,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 1000, 6000)) require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.SegmentOptions{}) require.NoError(t, err) var enc record.Encoder @@ -2458,7 +2458,7 @@ func TestDBReadOnly(t *testing.T) { } // Add head to test DBReadOnly WAL reading capabilities. - w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), wlog.CompressionSnappy) + w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), wlog.SegmentOptions{Compression: wlog.CompressionSnappy}) require.NoError(t, err) h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir) require.NoError(t, h.Close()) @@ -3353,7 +3353,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.NoError(t, app.Commit()) // Check the existing WAL files. - first, last, err := wlog.Segments(db.head.wal.Dir()) + first, last, err := wlog.SegmentsRange(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 0, first) require.Equal(t, 60, last) @@ -3368,7 +3368,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.Equal(t, 58.0, prom_testutil.ToFloat64(db.head.metrics.headTruncateTotal)) // The compaction should have only truncated first 2/3 of WAL (while also rotating the files). - first, last, err = wlog.Segments(db.head.wal.Dir()) + first, last, err = wlog.SegmentsRange(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 40, first) require.Equal(t, 61, last) @@ -3411,7 +3411,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.Equal(t, newBlockMaxt, db.head.MinTime()) // Another WAL file was rotated. - first, last, err = wlog.Segments(db.head.wal.Dir()) + first, last, err = wlog.SegmentsRange(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 40, first) require.Equal(t, 62, last) @@ -3424,7 +3424,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.Len(t, db.Blocks(), 59) // The compaction should have only truncated first 2/3 of WAL (while also rotating the files). - first, last, err = wlog.Segments(db.head.wal.Dir()) + first, last, err = wlog.SegmentsRange(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 55, first) require.Equal(t, 63, last) @@ -4590,7 +4590,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { require.NoError(t, app.Commit()) // Let's create a checkpoint. - first, last, err := wlog.Segments(w.Dir()) + first, last, err := wlog.SegmentsRange(w.Dir()) require.NoError(t, err) keep := func(id chunks.HeadSeriesRef) bool { return id != 3 diff --git a/tsdb/docs/format/wal.md b/tsdb/docs/format/wal.md index 5d93eb6d87..8eb52586eb 100644 --- a/tsdb/docs/format/wal.md +++ b/tsdb/docs/format/wal.md @@ -2,18 +2,48 @@ This document describes the official Prometheus WAL format. -The write ahead log operates in segments that are numbered and sequential, +The write aheacond log operates in segments that are versioned, numbered and sequential, and are limited to 128MB by default. ## Segment filename -The sequence number is captured in the segment filename, -e.g. `000000`, `000001`, `000002`, etc. The first unsigned integer represents -the sequence number of the segment, typically encoded with six digits. +Both the sequence number and version are captured in the segment filename, +e.g. `000000`, `000001-v2`, `000002-v4`, etc. The exact format: -## Segment encoding +``` +[-v]` +``` -This section describes the segment encoding. +The first unsigned integer represents the sequence number of the segment, +typically encoded with six digits. The second unsigned integer, +after `-v` string represents the segment version. If the segment does not +contain `-v`, it means a `1` version. + +## Segment `v1` + +This section describes the encoding of the version 1 of the segment encoding. + +A segment encodes an array of records. It does not contain any header. A segment +is written to pages of 32KB. Only the last page of the most recent segment +This document describes the official Prometheus WAL format. + +The write ahead log operates in segments that are versioned, numbered and sequential, +and are limited to 128MB by default. + +## Segment filename + +Both the sequence number and version are captured in the segment filename, +e.g. `000000`, `000001-v2`, `000002-v4`, etc. The exact format: + +``` +[-v]` +``` + +The first unsigned integer represents the sequence number of the segment, typically encoded with six digits. The second unsigned integer, after `-v` string represents the segment version. If the segment does not contain `-v`, it means a `1` version. + +## Segment `v1` + +This section describes the encoding of the version 1 of the segment encoding. A segment encodes an array of records. It does not contain any header. A segment is written to pages of 32KB. Only the last page of the most recent segment @@ -58,7 +88,8 @@ All float values are represented using the [IEEE 754 format](https://en.wikipedi ### Record types In the following sections, all the known record types are described. New types, -can be added in the future. +can be added in the future, in the same version. Removal or breaking change of +an existing type require another segment version. #### Series records diff --git a/tsdb/head.go b/tsdb/head.go index c67c438e52..c58a7da1b4 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -640,7 +640,7 @@ func (h *Head) Init(minValidTime int64) error { // to be outdated. loadSnapshot := true if h.wal != nil { - _, endAt, err := wlog.Segments(h.wal.Dir()) + _, endAt, err := wlog.SegmentsRange(h.wal.Dir()) if err != nil { return fmt.Errorf("finding WAL segments: %w", err) } @@ -730,7 +730,7 @@ func (h *Head) Init(minValidTime int64) error { } // Find the last segment. - _, endAt, e := wlog.Segments(h.wal.Dir()) + _, endAt, e := wlog.SegmentsRange(h.wal.Dir()) if e != nil { return fmt.Errorf("finding WAL segments: %w", e) } @@ -800,7 +800,7 @@ func (h *Head) Init(minValidTime int64) error { wblReplayStart := time.Now() if h.wbl != nil { // Replay WBL. - startFrom, endAt, e = wlog.Segments(h.wbl.Dir()) + startFrom, endAt, e = wlog.SegmentsRange(h.wbl.Dir()) if e != nil { return &errLoadWbl{fmt.Errorf("finding WBL segments: %w", e)} } @@ -1263,7 +1263,7 @@ func (h *Head) truncateWAL(mint int64) error { start := time.Now() h.lastWALTruncationTime.Store(mint) - first, last, err := wlog.Segments(h.wal.Dir()) + first, last, err := wlog.SegmentsRange(h.wal.Dir()) if err != nil { return fmt.Errorf("get segment range: %w", err) } @@ -1584,7 +1584,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { h.tombstones.TruncateBefore(mint) if h.wal != nil { - _, last, _ := wlog.Segments(h.wal.Dir()) + _, last, _ := wlog.SegmentsRange(h.wal.Dir()) h.deletedMtx.Lock() // Keep series records until we're past segment 'last' // because the WAL will still have samples records with diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2ca3aeffc7..df5afcda0e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2388,19 +2388,19 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } add(0) - _, last, err := wlog.Segments(wal.Dir()) + _, last, err := wlog.SegmentsRange(wal.Dir()) require.NoError(t, err) require.Equal(t, 0, last) add(1) require.NoError(t, h.Truncate(1)) - _, last, err = wlog.Segments(wal.Dir()) + _, last, err = wlog.SegmentsRange(wal.Dir()) require.NoError(t, err) require.Equal(t, 1, last) add(2) require.NoError(t, h.Truncate(2)) - _, last, err = wlog.Segments(wal.Dir()) + _, last, err = wlog.SegmentsRange(wal.Dir()) require.NoError(t, err) require.Equal(t, 2, last) } diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 58e11c770e..a083b24398 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -133,7 +133,11 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return nil, fmt.Errorf("create checkpoint dir: %w", err) } - cp, err := New(nil, nil, cpdirtmp, w.CompressionType()) + cp, err := New(nil, nil, cpdirtmp, SegmentOptions{ + Compression: w.CompressionType(), + Version: w.opts.Version, + Size: DefaultSegmentSize, + }) if err != nil { return nil, fmt.Errorf("open checkpoint: %w", err) } diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 8ee193f5ac..5ad25033d8 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -141,205 +141,215 @@ func TestCheckpoint(t *testing.T) { PositiveBuckets: []float64{float64(i + 1), 1, -1, 0}, } } + for _, ver := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", ver, compress), func(t *testing.T) { + dir := t.TempDir() - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - - var enc record.Encoder - // Create a dummy segment to bump the initial number. - seg, err := CreateSegment(dir, 100) - require.NoError(t, err) - require.NoError(t, seg.Close()) - - // Manually create checkpoint for 99 and earlier. - w, err := New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress) - require.NoError(t, err) - - // Add some data we expect to be around later. - err = w.Log(enc.Series([]record.RefSeries{ - {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, - {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, - }, nil)) - require.NoError(t, err) - // Log an unknown record, that might have come from a future Prometheus version. - require.NoError(t, w.Log([]byte{255})) - require.NoError(t, w.Close()) - - // Start a WAL and write records to it as usual. - w, err = NewSize(nil, nil, dir, 64*1024, compress) - require.NoError(t, err) - - samplesInWAL, histogramsInWAL, floatHistogramsInWAL := 0, 0, 0 - var last int64 - for i := 0; ; i++ { - _, n, err := Segments(w.Dir()) + var enc record.Encoder + // Create a dummy segment to bump the initial number. + seg, err := CreateSegment(dir, 100, ver) require.NoError(t, err) - if n >= 106 { - break - } - // Write some series initially. - if i == 0 { - b := enc.Series([]record.RefSeries{ - {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, - {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, - {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, - {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, + require.NoError(t, seg.Close()) + + // Manually create checkpoint for 99 and earlier. + w, err := New(nil, nil, filepath.Join(dir, "checkpoint.0099"), SegmentOptions{ + Compression: compress, + Version: ver, + }) + require.NoError(t, err) + + // Add some data we expect to be around later. + err = w.Log(enc.Series([]record.RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, + }, nil)) + require.NoError(t, err) + // Log an unknown record, that might have come from a future Prometheus version. + require.NoError(t, w.Log([]byte{255})) + require.NoError(t, w.Close()) + + // Start a WAL and write records to it as usual. + w, err = New(nil, nil, dir, SegmentOptions{ + Compression: compress, + Version: ver, + Size: 64 * 1024, + }) + require.NoError(t, err) + + samplesInWAL, histogramsInWAL, floatHistogramsInWAL := 0, 0, 0 + var last int64 + for i := 0; ; i++ { + _, n, err := SegmentsRange(w.Dir()) + require.NoError(t, err) + if n >= 106 { + break + } + // Write some series initially. + if i == 0 { + b := enc.Series([]record.RefSeries{ + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, + }, nil) + require.NoError(t, w.Log(b)) + + b = enc.Metadata([]record.RefMetadata{ + {Ref: 2, Unit: "unit", Help: "help"}, + {Ref: 3, Unit: "unit", Help: "help"}, + {Ref: 4, Unit: "unit", Help: "help"}, + {Ref: 5, Unit: "unit", Help: "help"}, + }, nil) + require.NoError(t, w.Log(b)) + } + // Write samples until the WAL has enough segments. + // Make them have drifting timestamps within a record to see that they + // get filtered properly. + b := enc.Samples([]record.RefSample{ + {Ref: 0, T: last, V: float64(i)}, + {Ref: 1, T: last + 10000, V: float64(i)}, + {Ref: 2, T: last + 20000, V: float64(i)}, + {Ref: 3, T: last + 30000, V: float64(i)}, + }, nil) + require.NoError(t, w.Log(b)) + samplesInWAL += 4 + h := makeHistogram(i) + b = enc.HistogramSamples([]record.RefHistogramSample{ + {Ref: 0, T: last, H: h}, + {Ref: 1, T: last + 10000, H: h}, + {Ref: 2, T: last + 20000, H: h}, + {Ref: 3, T: last + 30000, H: h}, + }, nil) + require.NoError(t, w.Log(b)) + histogramsInWAL += 4 + fh := makeFloatHistogram(i) + b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ + {Ref: 0, T: last, FH: fh}, + {Ref: 1, T: last + 10000, FH: fh}, + {Ref: 2, T: last + 20000, FH: fh}, + {Ref: 3, T: last + 30000, FH: fh}, + }, nil) + require.NoError(t, w.Log(b)) + floatHistogramsInWAL += 4 + + b = enc.Exemplars([]record.RefExemplar{ + {Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i))}, }, nil) require.NoError(t, w.Log(b)) + // Write changing metadata for each series. In the end, only the latest + // version should end up in the checkpoint. b = enc.Metadata([]record.RefMetadata{ - {Ref: 2, Unit: "unit", Help: "help"}, - {Ref: 3, Unit: "unit", Help: "help"}, - {Ref: 4, Unit: "unit", Help: "help"}, - {Ref: 5, Unit: "unit", Help: "help"}, + {Ref: 0, Unit: strconv.FormatInt(last, 10), Help: strconv.FormatInt(last, 10)}, + {Ref: 1, Unit: strconv.FormatInt(last, 10), Help: strconv.FormatInt(last, 10)}, + {Ref: 2, Unit: strconv.FormatInt(last, 10), Help: strconv.FormatInt(last, 10)}, + {Ref: 3, Unit: strconv.FormatInt(last, 10), Help: strconv.FormatInt(last, 10)}, }, nil) require.NoError(t, w.Log(b)) + + last += 100 } - // Write samples until the WAL has enough segments. - // Make them have drifting timestamps within a record to see that they - // get filtered properly. - b := enc.Samples([]record.RefSample{ - {Ref: 0, T: last, V: float64(i)}, - {Ref: 1, T: last + 10000, V: float64(i)}, - {Ref: 2, T: last + 20000, V: float64(i)}, - {Ref: 3, T: last + 30000, V: float64(i)}, - }, nil) - require.NoError(t, w.Log(b)) - samplesInWAL += 4 - h := makeHistogram(i) - b = enc.HistogramSamples([]record.RefHistogramSample{ - {Ref: 0, T: last, H: h}, - {Ref: 1, T: last + 10000, H: h}, - {Ref: 2, T: last + 20000, H: h}, - {Ref: 3, T: last + 30000, H: h}, - }, nil) - require.NoError(t, w.Log(b)) - histogramsInWAL += 4 - fh := makeFloatHistogram(i) - b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ - {Ref: 0, T: last, FH: fh}, - {Ref: 1, T: last + 10000, FH: fh}, - {Ref: 2, T: last + 20000, FH: fh}, - {Ref: 3, T: last + 30000, FH: fh}, - }, nil) - require.NoError(t, w.Log(b)) - floatHistogramsInWAL += 4 + require.NoError(t, w.Close()) - b = enc.Exemplars([]record.RefExemplar{ - {Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i))}, - }, nil) - require.NoError(t, w.Log(b)) + stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool { + return x%2 == 0 + }, last/2) + require.NoError(t, err) + require.NoError(t, w.Truncate(107)) + require.NoError(t, DeleteCheckpoints(w.Dir(), 106)) + require.Equal(t, histogramsInWAL+floatHistogramsInWAL+samplesInWAL, stats.TotalSamples) + require.Positive(t, stats.DroppedSamples) - // Write changing metadata for each series. In the end, only the latest - // version should end up in the checkpoint. - b = enc.Metadata([]record.RefMetadata{ - {Ref: 0, Unit: strconv.FormatInt(last, 10), Help: strconv.FormatInt(last, 10)}, - {Ref: 1, Unit: strconv.FormatInt(last, 10), Help: strconv.FormatInt(last, 10)}, - {Ref: 2, Unit: strconv.FormatInt(last, 10), Help: strconv.FormatInt(last, 10)}, - {Ref: 3, Unit: strconv.FormatInt(last, 10), Help: strconv.FormatInt(last, 10)}, - }, nil) - require.NoError(t, w.Log(b)) + // Only the new checkpoint should be left. + files, err := os.ReadDir(dir) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "checkpoint.00000106", files[0].Name()) - last += 100 - } - require.NoError(t, w.Close()) + sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.00000106")) + require.NoError(t, err) + defer sr.Close() - stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool { - return x%2 == 0 - }, last/2) - require.NoError(t, err) - require.NoError(t, w.Truncate(107)) - require.NoError(t, DeleteCheckpoints(w.Dir(), 106)) - require.Equal(t, histogramsInWAL+floatHistogramsInWAL+samplesInWAL, stats.TotalSamples) - require.Positive(t, stats.DroppedSamples) + dec := record.NewDecoder(labels.NewSymbolTable()) + var series []record.RefSeries + var metadata []record.RefMetadata + r := NewReader(sr) - // Only the new checkpoint should be left. - files, err := os.ReadDir(dir) - require.NoError(t, err) - require.Len(t, files, 1) - require.Equal(t, "checkpoint.00000106", files[0].Name()) + samplesInCheckpoint, histogramsInCheckpoint, floatHistogramsInCheckpoint := 0, 0, 0 + for r.Next() { + rec := r.Record() - sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.00000106")) - require.NoError(t, err) - defer sr.Close() - - dec := record.NewDecoder(labels.NewSymbolTable()) - var series []record.RefSeries - var metadata []record.RefMetadata - r := NewReader(sr) - - samplesInCheckpoint, histogramsInCheckpoint, floatHistogramsInCheckpoint := 0, 0, 0 - for r.Next() { - rec := r.Record() - - switch dec.Type(rec) { - case record.Series: - series, err = dec.Series(rec, series) - require.NoError(t, err) - case record.Samples: - samples, err := dec.Samples(rec, nil) - require.NoError(t, err) - for _, s := range samples { - require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp") + switch dec.Type(rec) { + case record.Series: + series, err = dec.Series(rec, series) + require.NoError(t, err) + case record.Samples: + samples, err := dec.Samples(rec, nil) + require.NoError(t, err) + for _, s := range samples { + require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp") + } + samplesInCheckpoint += len(samples) + case record.HistogramSamples: + histograms, err := dec.HistogramSamples(rec, nil) + require.NoError(t, err) + for _, h := range histograms { + require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp") + } + histogramsInCheckpoint += len(histograms) + case record.FloatHistogramSamples: + floatHistograms, err := dec.FloatHistogramSamples(rec, nil) + require.NoError(t, err) + for _, h := range floatHistograms { + require.GreaterOrEqual(t, h.T, last/2, "float histogram with wrong timestamp") + } + floatHistogramsInCheckpoint += len(floatHistograms) + case record.Exemplars: + exemplars, err := dec.Exemplars(rec, nil) + require.NoError(t, err) + for _, e := range exemplars { + require.GreaterOrEqual(t, e.T, last/2, "exemplar with wrong timestamp") + } + case record.Metadata: + metadata, err = dec.Metadata(rec, metadata) + require.NoError(t, err) } - samplesInCheckpoint += len(samples) - case record.HistogramSamples: - histograms, err := dec.HistogramSamples(rec, nil) - require.NoError(t, err) - for _, h := range histograms { - require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp") - } - histogramsInCheckpoint += len(histograms) - case record.FloatHistogramSamples: - floatHistograms, err := dec.FloatHistogramSamples(rec, nil) - require.NoError(t, err) - for _, h := range floatHistograms { - require.GreaterOrEqual(t, h.T, last/2, "float histogram with wrong timestamp") - } - floatHistogramsInCheckpoint += len(floatHistograms) - case record.Exemplars: - exemplars, err := dec.Exemplars(rec, nil) - require.NoError(t, err) - for _, e := range exemplars { - require.GreaterOrEqual(t, e.T, last/2, "exemplar with wrong timestamp") - } - case record.Metadata: - metadata, err = dec.Metadata(rec, metadata) - require.NoError(t, err) } - } - require.NoError(t, r.Err()) - // Making sure we replayed some samples. We expect >50% samples to be still present. - require.Greater(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.5) - require.Less(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.8) - require.Greater(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.5) - require.Less(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.8) - require.Greater(t, float64(floatHistogramsInCheckpoint)/float64(floatHistogramsInWAL), 0.5) - require.Less(t, float64(floatHistogramsInCheckpoint)/float64(floatHistogramsInWAL), 0.8) + require.NoError(t, r.Err()) + // Making sure we replayed some samples. We expect >50% samples to be still present. + require.Greater(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.5) + require.Less(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.8) + require.Greater(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.5) + require.Less(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.8) + require.Greater(t, float64(floatHistogramsInCheckpoint)/float64(floatHistogramsInWAL), 0.5) + require.Less(t, float64(floatHistogramsInCheckpoint)/float64(floatHistogramsInWAL), 0.8) - expectedRefSeries := []record.RefSeries{ - {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, - {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, - {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, - } - testutil.RequireEqual(t, expectedRefSeries, series) + expectedRefSeries := []record.RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + } + testutil.RequireEqual(t, expectedRefSeries, series) - expectedRefMetadata := []record.RefMetadata{ - {Ref: 0, Unit: strconv.FormatInt(last-100, 10), Help: strconv.FormatInt(last-100, 10)}, - {Ref: 2, Unit: strconv.FormatInt(last-100, 10), Help: strconv.FormatInt(last-100, 10)}, - {Ref: 4, Unit: "unit", Help: "help"}, - } - sort.Slice(metadata, func(i, j int) bool { return metadata[i].Ref < metadata[j].Ref }) - require.Equal(t, expectedRefMetadata, metadata) - }) + expectedRefMetadata := []record.RefMetadata{ + {Ref: 0, Unit: strconv.FormatInt(last-100, 10), Help: strconv.FormatInt(last-100, 10)}, + {Ref: 2, Unit: strconv.FormatInt(last-100, 10), Help: strconv.FormatInt(last-100, 10)}, + {Ref: 4, Unit: "unit", Help: "help"}, + } + sort.Slice(metadata, func(i, j int) bool { return metadata[i].Ref < metadata[j].Ref }) + require.Equal(t, expectedRefMetadata, metadata) + }) + } } } func TestCheckpointNoTmpFolderAfterError(t *testing.T) { // Create a new wlog with invalid data. dir := t.TempDir() - w, err := NewSize(nil, nil, dir, 64*1024, CompressionNone) + w, err := New(nil, nil, dir, SegmentOptions{ + Size: 64 * 1024, + }) require.NoError(t, err) var enc record.Encoder require.NoError(t, w.Log(enc.Series([]record.RefSeries{ diff --git a/tsdb/wlog/live_reader.go b/tsdb/wlog/live_reader.go index a017d362d1..98ccc0722c 100644 --- a/tsdb/wlog/live_reader.go +++ b/tsdb/wlog/live_reader.go @@ -25,6 +25,7 @@ import ( "github.com/golang/snappy" "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/promslog" ) // LiveReaderMetrics holds all metrics exposed by the LiveReader. @@ -51,6 +52,9 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics { // NewLiveReader returns a new live reader. func NewLiveReader(logger *slog.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader { + if logger == nil { + logger = promslog.NewNopLogger() + } // Calling zstd.NewReader with a nil io.Reader and no options cannot return an error. zstdReader, _ := zstd.NewReader(nil) diff --git a/tsdb/wlog/reader.go b/tsdb/wlog/reader.go index a744b0cc4b..d0f5fb937f 100644 --- a/tsdb/wlog/reader.go +++ b/tsdb/wlog/reader.go @@ -173,16 +173,17 @@ func (r *Reader) Err() error { } if b, ok := r.rdr.(*segmentBufReader); ok { return &CorruptionErr{ - Err: r.err, - Dir: b.segs[b.cur].Dir(), - Segment: b.segs[b.cur].Index(), - Offset: int64(b.off), + Err: r.err, + Dir: b.segs[b.cur].Dir(), + SegmentIndex: b.segs[b.cur].Index(), + SegmentVersion: b.segs[b.cur].Version(), + Offset: int64(b.off), } } return &CorruptionErr{ - Err: r.err, - Segment: -1, - Offset: r.total, + Err: r.err, + SegmentIndex: -1, + Offset: r.total, } } @@ -193,11 +194,11 @@ func (r *Reader) Record() []byte { } // Segment returns the current segment being read. -func (r *Reader) Segment() int { +func (r *Reader) Segment() (int, SegmentVersion) { if b, ok := r.rdr.(*segmentBufReader); ok { - return b.segs[b.cur].Index() + return b.segs[b.cur].Index(), b.segs[b.cur].Version() } - return -1 + return -1, 0 } // Offset returns the current position of the segment being read. diff --git a/tsdb/wlog/reader_test.go b/tsdb/wlog/reader_test.go index 2ac63cbf15..9c1a48f8ac 100644 --- a/tsdb/wlog/reader_test.go +++ b/tsdb/wlog/reader_test.go @@ -299,7 +299,7 @@ func allSegments(dir string) (io.ReadCloser, error) { var readers []io.Reader var closers []io.Closer for _, r := range seg { - f, err := os.Open(filepath.Join(dir, r.name)) + f, err := os.Open(filepath.Join(dir, r.fName)) if err != nil { return nil, err } @@ -315,136 +315,145 @@ func allSegments(dir string) (io.ReadCloser, error) { func TestReaderFuzz(t *testing.T) { for name, fn := range readerConstructors { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) { - dir := t.TempDir() - - w, err := NewSize(nil, nil, dir, 128*pageSize, compress) - require.NoError(t, err) - - // Buffering required as we're not reading concurrently. - input := make(chan []byte, fuzzLen) - err = generateRandomEntries(w, input) - require.NoError(t, err) - close(input) - - err = w.Close() - require.NoError(t, err) - - sr, err := allSegments(w.Dir()) - require.NoError(t, err) - defer sr.Close() - - reader := fn(sr) - for expected := range input { - require.True(t, reader.Next(), "expected record: %v", reader.Err()) - r := reader.Record() - // Expected value may come as nil or empty slice, so it requires special comparison. - if len(expected) == 0 { - require.Empty(t, r) - } else { - require.Equal(t, expected, r, "read wrong record") + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("%s/v=%v/compress=%s", name, v, compress), func(t *testing.T) { + dir := t.TempDir() + opts := SegmentOptions{ + Size: 128 * pageSize, } - } - require.False(t, reader.Next(), "unexpected record") - }) + + w, err := New(nil, nil, dir, opts) + require.NoError(t, err) + + // Buffering required as we're not reading concurrently. + input := make(chan []byte, fuzzLen) + err = generateRandomEntries(w, input) + require.NoError(t, err) + close(input) + + err = w.Close() + require.NoError(t, err) + + sr, err := allSegments(w.Dir()) + require.NoError(t, err) + defer sr.Close() + + reader := fn(sr) + for expected := range input { + require.True(t, reader.Next(), "expected record: %v", reader.Err()) + r := reader.Record() + // Expected value may come as nil or empty slice, so it requires special comparison. + if len(expected) == 0 { + require.Empty(t, r) + } else { + require.Equal(t, expected, r, "read wrong record") + } + } + require.False(t, reader.Next(), "unexpected record") + }) + } } } } func TestReaderFuzz_Live(t *testing.T) { - logger := promslog.NewNopLogger() - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", v, compress), func(t *testing.T) { + dir := t.TempDir() + opts := SegmentOptions{ + Size: 128 * pageSize, + Version: v, + } - w, err := NewSize(nil, nil, dir, 128*pageSize, compress) - require.NoError(t, err) - defer w.Close() - - // In the background, generate a stream of random records and write them - // to the WAL. - input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes. - done := make(chan struct{}) - go func() { - err := generateRandomEntries(w, input) + w, err := New(nil, nil, dir, opts) require.NoError(t, err) - time.Sleep(100 * time.Millisecond) - close(done) - }() + defer w.Close() - // Tail the WAL and compare the results. - m, _, err := Segments(w.Dir()) - require.NoError(t, err) + // In the background, generate a stream of random records and write them + // to the WAL. + input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes. + done := make(chan struct{}) + go func() { + err := generateRandomEntries(w, input) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + close(done) + }() - seg, err := OpenReadSegment(SegmentName(dir, m)) - require.NoError(t, err) - defer seg.Close() + // Tail the WAL and compare the results. + m, _, err := SegmentsRange(w.Dir()) + require.NoError(t, err) - r := NewLiveReader(logger, nil, seg) - segmentTicker := time.NewTicker(100 * time.Millisecond) - readTicker := time.NewTicker(10 * time.Millisecond) + seg, err := OpenReadSegmentByIndex(w.Dir(), m) + require.NoError(t, err) + defer seg.Close() - readSegment := func(r *LiveReader) bool { - for r.Next() { - rec := r.Record() - expected, ok := <-input - require.True(t, ok, "unexpected record") - // Expected value may come as nil or empty slice, so it requires special comparison. - if len(expected) == 0 { - require.Empty(t, rec) - } else { - require.Equal(t, expected, rec, "record does not match expected") + r := NewLiveReader(nil, nil, seg) + segmentTicker := time.NewTicker(100 * time.Millisecond) + readTicker := time.NewTicker(10 * time.Millisecond) + + readSegment := func(r *LiveReader) bool { + for r.Next() { + rec := r.Record() + expected, ok := <-input + require.True(t, ok, "unexpected record") + // Expected value may come as nil or empty slice, so it requires special comparison. + if len(expected) == 0 { + require.Empty(t, rec) + } else { + require.Equal(t, expected, rec, "record does not match expected") + } + } + require.Equal(t, io.EOF, r.Err(), "expected EOF, got: %v", r.Err()) + return true + } + + outer: + for { + select { + case <-segmentTicker.C: + // check if new segments exist + _, last, err := SegmentsRange(w.Dir()) + require.NoError(t, err) + if last <= seg.i { + continue + } + + // read to end of segment. + readSegment(r) + + fi, err := os.Stat(SegmentName(dir, seg.i, v)) + require.NoError(t, err) + require.Equal(t, r.Offset(), fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size()) + + seg, err = OpenReadSegmentByIndex(dir, seg.i+1) + require.NoError(t, err) + defer seg.Close() + r = NewLiveReader(nil, nil, seg) + + case <-readTicker.C: + readSegment(r) + + case <-done: + readSegment(r) + break outer } } - require.Equal(t, io.EOF, r.Err(), "expected EOF, got: %v", r.Err()) - return true - } - outer: - for { - select { - case <-segmentTicker.C: - // check if new segments exist - _, last, err := Segments(w.Dir()) - require.NoError(t, err) - if last <= seg.i { - continue - } - - // read to end of segment. - readSegment(r) - - fi, err := os.Stat(SegmentName(dir, seg.i)) - require.NoError(t, err) - require.Equal(t, r.Offset(), fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size()) - - seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) - require.NoError(t, err) - defer seg.Close() - r = NewLiveReader(logger, nil, seg) - - case <-readTicker.C: - readSegment(r) - - case <-done: - readSegment(r) - break outer - } - } - - require.Equal(t, io.EOF, r.Err(), "expected EOF") - }) + require.Equal(t, io.EOF, r.Err(), "expected EOF") + }) + } } } func TestLiveReaderCorrupt_ShortFile(t *testing.T) { // Write a corrupt WAL segment, there is one record of pageSize in length, // but the segment is only half written. - logger := promslog.NewNopLogger() dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize, CompressionNone) + w, err := New(nil, nil, dir, SegmentOptions{Size: pageSize}) require.NoError(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -467,24 +476,23 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { require.NoError(t, err) // Try and LiveReader it. - m, _, err := Segments(w.Dir()) + m, _, err := SegmentsRange(w.Dir()) require.NoError(t, err) - seg, err := OpenReadSegment(SegmentName(dir, m)) + seg, err := OpenReadSegmentByIndex(dir, m) require.NoError(t, err) defer seg.Close() - r := NewLiveReader(logger, nil, seg) + r := NewLiveReader(nil, nil, seg) require.False(t, r.Next(), "expected no records") require.Equal(t, io.EOF, r.Err(), "expected error, got: %v", r.Err()) } func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { // Write a corrupt WAL segment, when record len > page size. - logger := promslog.NewNopLogger() dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize*2, CompressionNone) + w, err := New(nil, nil, dir, SegmentOptions{Size: 2 * pageSize}) require.NoError(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -511,14 +519,14 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { require.NoError(t, err) // Try and LiveReader it. - m, _, err := Segments(w.Dir()) + m, _, err := SegmentsRange(w.Dir()) require.NoError(t, err) - seg, err := OpenReadSegment(SegmentName(dir, m)) + seg, err := OpenReadSegmentByIndex(dir, m) require.NoError(t, err) defer seg.Close() - r := NewLiveReader(logger, NewLiveReaderMetrics(nil), seg) + r := NewLiveReader(nil, NewLiveReaderMetrics(nil), seg) require.False(t, r.Next(), "expected no records") require.EqualError(t, r.Err(), "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err()) } @@ -531,7 +539,7 @@ func TestReaderData(t *testing.T) { for name, fn := range readerConstructors { t.Run(name, func(t *testing.T) { - w, err := New(nil, nil, dir, CompressionSnappy) + w, err := New(nil, nil, dir, SegmentOptions{Compression: CompressionSnappy}) require.NoError(t, err) sr, err := allSegments(dir) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 89db5d2dd7..89053d4191 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -265,9 +265,9 @@ func (w *Watcher) loop() { // Run the watcher, which will tail the WAL until the quit channel is closed // or an error case is hit. func (w *Watcher) Run() error { - _, lastSegment, err := Segments(w.walDir) + _, lastSegment, err := SegmentsRange(w.walDir) if err != nil { - return fmt.Errorf("Segments: %w", err) + return fmt.Errorf("segments range: %w", err) } // We want to ensure this is false across iterations since @@ -296,51 +296,54 @@ func (w *Watcher) Run() error { w.logger.Debug("Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) for !isClosed(w.quit) { - w.currentSegmentMetric.Set(float64(currentSegment)) + w.currentSegmentMetric.Set(float64(currentSegment.index)) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. w.logger.Debug("Processing segment", "currentSegment", currentSegment) - if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) { - return err + if err := w.watch(currentSegment, currentSegment.index >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) { + return fmt.Errorf("watching the last segment %v: %w", currentSegment, err) } // For testing: stop when you hit a specific segment. - if currentSegment == w.MaxSegment { + if currentSegment.index == w.MaxSegment { return nil } - currentSegment++ + currentSegment, err = segmentByIndex(w.walDir, currentSegment.index+1) + if err != nil { + return err + } } return nil } // findSegmentForIndex finds the first segment greater than or equal to index. -func (w *Watcher) findSegmentForIndex(index int) (int, error) { +func (w *Watcher) findSegmentForIndex(index int) (segmentRef, error) { refs, err := listSegments(w.walDir) if err != nil { - return -1, err + return segmentRef{}, err } for _, r := range refs { if r.index >= index { - return r.index, nil + return r, nil } } - return -1, errors.New("failed to find segment for index") + return segmentRef{}, fmt.Errorf("failed to find segment for index %v", index) } -func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error { - err := w.readSegment(r, segmentNum, tail) +func (w *Watcher) readAndHandleError(r *LiveReader, s segmentRef, tail bool, size int64) error { + err := w.readSegment(r, s.index, tail) // Ignore all errors reading to end of segment whilst replaying the WAL. if !tail { if err != nil && !errors.Is(err, io.EOF) { - w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) + w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", s, "err", err) } else if r.Offset() != size { - w.logger.Warn("Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size) + w.logger.Warn("Expected to have read whole segment, may have dropped data", "segment", s, "read", r.Offset(), "size", size) } return ErrIgnorable } @@ -355,8 +358,8 @@ func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, s // Use tail true to indicate that the reader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. -func (w *Watcher) watch(segmentNum int, tail bool) error { - segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum)) +func (w *Watcher) watch(s segmentRef, tail bool) error { + segment, err := OpenReadSegment(filepath.Join(w.walDir, s.fName)) if err != nil { return err } @@ -367,12 +370,12 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { size := int64(math.MaxInt64) if !tail { var err error - size, err = getSegmentSize(w.walDir, segmentNum) + size, err = getFileSize(filepath.Join(w.walDir, s.fName)) if err != nil { - return fmt.Errorf("getSegmentSize: %w", err) + return fmt.Errorf("get segment file size: %w", err) } - return w.readAndHandleError(reader, segmentNum, tail, size) + return w.readAndHandleError(reader, s, tail, size) } checkpointTicker := time.NewTicker(checkpointPeriod) @@ -402,7 +405,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { defer func() { <-gcSem }() - if err := w.garbageCollectSeries(segmentNum); err != nil { + if err := w.garbageCollectSeries(s); err != nil { w.logger.Warn("Error process checkpoint", "err", err) } }() @@ -412,20 +415,20 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { // if a newer segment is produced, read the current one until the end and move on. case <-segmentTicker.C: - _, last, err := Segments(w.walDir) + _, last, err := SegmentsRange(w.walDir) if err != nil { - return fmt.Errorf("Segments: %w", err) + return fmt.Errorf("list segments: %w", err) } - if last > segmentNum { - return w.readAndHandleError(reader, segmentNum, tail, size) + if last > s.index { + return w.readAndHandleError(reader, s, tail, size) } continue // we haven't read due to a notification in quite some time, try reading anyways case <-readTicker.C: w.logger.Debug("Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout) - err := w.readAndHandleError(reader, segmentNum, tail, size) + err := w.readAndHandleError(reader, s, tail, size) if err != nil { return err } @@ -433,7 +436,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { readTicker.Reset(readTimeout) case <-w.readNotify: - err := w.readAndHandleError(reader, segmentNum, tail, size) + err := w.readAndHandleError(reader, s, tail, size) if err != nil { return err } @@ -443,7 +446,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { } } -func (w *Watcher) garbageCollectSeries(segmentNum int) error { +func (w *Watcher) garbageCollectSeries(s segmentRef) error { dir, _, err := LastCheckpoint(w.walDir) if err != nil && !errors.Is(err, record.ErrNotFound) { return fmt.Errorf("tsdb.LastCheckpoint: %w", err) @@ -459,12 +462,12 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { return fmt.Errorf("error parsing checkpoint filename: %w", err) } - if index >= segmentNum { - w.logger.Debug("Current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir) + if index >= s.index { + w.logger.Debug("Current segment is behind the checkpoint, skipping reading of checkpoint", "current", s, "checkpoint", dir) return nil } - w.logger.Debug("New checkpoint detected", "new", dir, "currentSegment", segmentNum) + w.logger.Debug("New checkpoint detected", "new", dir, "currentSegment", s) if err = w.readCheckpoint(dir, (*Watcher).readSegmentForGC); err != nil { return fmt.Errorf("readCheckpoint: %w", err) @@ -477,7 +480,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { // Read from a segment and pass the details to w.writer. // Also used with readCheckpoint - implements segmentReadFn. -func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { +func (w *Watcher) readSegment(r *LiveReader, index int, tail bool) error { var ( dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely. series []record.RefSeries @@ -501,7 +504,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.recordDecodeFailsMetric.Inc() return err } - w.writer.StoreSeries(series, segmentNum) + w.writer.StoreSeries(series, index) case record.Samples: // If we're not tailing a segment we can ignore any samples records we see. @@ -622,14 +625,14 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } } if err := r.Err(); err != nil { - return fmt.Errorf("segment %d: %w", segmentNum, err) + return fmt.Errorf("segment %d: %w", index, err) } return nil } -// Go through all series in a segment updating the segmentNum, so we can delete older series. +// Go through all series in a segment updating the index, so we can delete older series. // Used with readCheckpoint - implements segmentReadFn. -func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error { +func (w *Watcher) readSegmentForGC(r *LiveReader, index int, _ bool) error { var ( dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function. series []record.RefSeries @@ -645,7 +648,7 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error w.recordDecodeFailsMetric.Inc() return err } - w.writer.UpdateSeriesSegment(series, segmentNum) + w.writer.UpdateSeriesSegment(series, index) case record.Unknown: // Could be corruption, or reading from a WAL from a newer Prometheus. @@ -656,7 +659,7 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error } } if err := r.Err(); err != nil { - return fmt.Errorf("segment %d: %w", segmentNum, err) + return fmt.Errorf("segment %d: %w", index, err) } return nil } @@ -666,7 +669,7 @@ func (w *Watcher) SetStartTime(t time.Time) { w.startTimestamp = timestamp.FromTime(t) } -type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error +type segmentReadFn func(w *Watcher, r *LiveReader, index int, tail bool) error // Read all the series records from a Checkpoint directory. func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error { @@ -679,15 +682,16 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err // Ensure we read the whole contents of every segment in the checkpoint dir. segs, err := listSegments(checkpointDir) if err != nil { - return fmt.Errorf("Unable to get segments checkpoint dir: %w", err) + return fmt.Errorf("list segments in the checkpoint dir: %w", err) } for _, segRef := range segs { - size, err := getSegmentSize(checkpointDir, segRef.index) + fn := filepath.Join(checkpointDir, segRef.fName) + size, err := getFileSize(fn) if err != nil { - return fmt.Errorf("getSegmentSize: %w", err) + return fmt.Errorf("segment size: %w", err) } - sr, err := OpenReadSegment(SegmentName(checkpointDir, segRef.index)) + sr, err := OpenReadSegment(fn) if err != nil { return fmt.Errorf("unable to open segment: %w", err) } @@ -700,7 +704,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err } if r.Offset() != size { - return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, segRef.index, size, r.Offset()) + return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%v, size: %d, totalRead: %d", checkpointDir, segRef, size, r.Offset()) } } @@ -724,10 +728,10 @@ func checkpointNum(dir string) (int, error) { return result, nil } -// Get size of segment. -func getSegmentSize(dir string, index int) (int64, error) { +// Get size of a file. +func getFileSize(fname string) (int64, error) { i := int64(-1) - fi, err := os.Stat(SegmentName(dir, index)) + fi, err := os.Stat(fname) if err == nil { i = fi.Size() } diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 398b0f4414..5007162222 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -137,380 +137,31 @@ func newWriteToMock(delay time.Duration) *writeToMock { } func TestTailSamples(t *testing.T) { - pageSize := 32 * 1024 const seriesCount = 10 const samplesCount = 250 const exemplarsCount = 25 const histogramsCount = 50 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - now := time.Now() - dir := t.TempDir() + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", v, compress), func(t *testing.T) { + now := time.Now() - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) - require.NoError(t, err) - defer func() { - require.NoError(t, w.Close()) - }() - - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - - for j := 0; j < exemplarsCount; j++ { - inner := rand.Intn(ref + 1) - exemplar := enc.Exemplars([]record.RefExemplar{ - { - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - V: float64(i), - Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", inner)), - }, - }, nil) - require.NoError(t, w.Log(exemplar)) - } - - for j := 0; j < histogramsCount; j++ { - inner := rand.Intn(ref + 1) - hist := &histogram.Histogram{ - Schema: 2, - ZeroThreshold: 1e-128, - ZeroCount: 0, - Count: 2, - Sum: 0, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, - PositiveBuckets: []int64{int64(i) + 1}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, - NegativeBuckets: []int64{int64(-i) - 1}, - } - - histogram := enc.HistogramSamples([]record.RefHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - H: hist, - }}, nil) - require.NoError(t, w.Log(histogram)) - - floatHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - FH: hist.ToFloat(nil), - }}, nil) - require.NoError(t, w.Log(floatHistogram)) - } - } - - // Start read after checkpoint, no more data written. - first, last, err := Segments(w.Dir()) - require.NoError(t, err) - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true, true) - watcher.SetStartTime(now) - - // Set the Watcher's metrics so they're not nil pointers. - watcher.SetMetrics() - for i := first; i <= last; i++ { - segment, err := OpenReadSegment(SegmentName(watcher.walDir, i)) + dir := t.TempDir() + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) require.NoError(t, err) - reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment) - // Use tail true so we can ensure we got the right number of samples. - watcher.readSegment(reader, i, true) - require.NoError(t, segment.Close()) - } + enc := record.Encoder{} + w, err := New(nil, nil, wdir, SegmentOptions{Compression: compress, Version: v}) + require.NoError(t, err) + defer func() { + require.NoError(t, w.Close()) + }() - expectedSeries := seriesCount - expectedSamples := seriesCount * samplesCount - expectedExemplars := seriesCount * exemplarsCount - expectedHistograms := seriesCount * histogramsCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expectedSeries - }) - require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series") - require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") - require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") - require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms") - require.Equal(t, expectedHistograms, wt.floatHistogramsAppended, "did not receive the expected number of float histograms") - }) - } -} - -func TestReadToEndNoCheckpoint(t *testing.T) { - pageSize := 32 * 1024 - const seriesCount = 10 - const samplesCount = 250 - - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) - require.NoError(t, err) - defer func() { - require.NoError(t, w.Close()) - }() - - var recs [][]byte - - enc := record.Encoder{} - - for i := 0; i < seriesCount; i++ { - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(i), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - recs = append(recs, series) - for j := 0; j < samplesCount; j++ { - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(j), - T: int64(i), - V: float64(i), - }, - }, nil) - - recs = append(recs, sample) - - // Randomly batch up records. - if rand.Intn(4) < 3 { - require.NoError(t, w.Log(recs...)) - recs = recs[:0] - } - } - } - require.NoError(t, w.Log(recs...)) - overwriteReadTimeout(t, time.Second) - _, _, err = Segments(w.Dir()) - require.NoError(t, err) - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - go watcher.Start() - - expected := seriesCount - require.Eventually(t, func() bool { - return wt.checkNumSeries() == expected - }, 20*time.Second, 1*time.Second) - watcher.Stop() - }) - } -} - -func TestReadToEndWithCheckpoint(t *testing.T) { - segmentSize := 32 * 1024 - // We need something similar to this # of series and samples - // in order to get enough segments for us to checkpoint. - const seriesCount = 10 - const samplesCount = 250 - - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, segmentSize, compress) - require.NoError(t, err) - defer func() { - require.NoError(t, w.Close()) - }() - - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - // Add in an unknown record type, which should be ignored. - require.NoError(t, w.Log([]byte{255})) - - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - - Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(x chunks.HeadSeriesRef) bool { return true }, 0) - w.Truncate(1) - - // Write more records after checkpointing. - for i := 0; i < seriesCount; i++ { - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(i), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(j), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - - _, _, err = Segments(w.Dir()) - require.NoError(t, err) - overwriteReadTimeout(t, time.Second) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - go watcher.Start() - - expected := seriesCount * 2 - - require.Eventually(t, func() bool { - return wt.checkNumSeries() == expected - }, 10*time.Second, 1*time.Second) - watcher.Stop() - }) - } -} - -func TestReadCheckpoint(t *testing.T) { - pageSize := 32 * 1024 - const seriesCount = 10 - const samplesCount = 250 - - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - f, err := os.Create(SegmentName(wdir, 30)) - require.NoError(t, err) - require.NoError(t, f.Close()) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, w.Close()) - }) - - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - _, err = w.NextSegmentSync() - require.NoError(t, err) - _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(x chunks.HeadSeriesRef) bool { return true }, 0) - require.NoError(t, err) - require.NoError(t, w.Truncate(32)) - - // Start read after checkpoint, no more data written. - _, _, err = Segments(w.Dir()) - require.NoError(t, err) - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - go watcher.Start() - - expectedSeries := seriesCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expectedSeries - }) - watcher.Stop() - require.Equal(t, expectedSeries, wt.checkNumSeries()) - }) - } -} - -func TestReadCheckpointMultipleSegments(t *testing.T) { - pageSize := 32 * 1024 - - const segments = 1 - const seriesCount = 20 - const samplesCount = 300 - - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, pageSize, compress) - require.NoError(t, err) - - // Write a bunch of data. - for i := 0; i < segments; i++ { - for j := 0; j < seriesCount; j++ { - ref := j + (i * 100) + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 series := enc.Series([]record.RefSeries{ { Ref: chunks.HeadSeriesRef(ref), @@ -519,7 +170,205 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { }, nil) require.NoError(t, w.Log(series)) - for k := 0; k < samplesCount; k++ { + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: now.UnixNano() + 1, + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + + for j := 0; j < exemplarsCount; j++ { + inner := rand.Intn(ref + 1) + exemplar := enc.Exemplars([]record.RefExemplar{ + { + Ref: chunks.HeadSeriesRef(inner), + T: now.UnixNano() + 1, + V: float64(i), + Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", inner)), + }, + }, nil) + require.NoError(t, w.Log(exemplar)) + } + + for j := 0; j < histogramsCount; j++ { + inner := rand.Intn(ref + 1) + hist := &histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{int64(i) + 1}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{int64(-i) - 1}, + } + + histogram := enc.HistogramSamples([]record.RefHistogramSample{{ + Ref: chunks.HeadSeriesRef(inner), + T: now.UnixNano() + 1, + H: hist, + }}, nil) + require.NoError(t, w.Log(histogram)) + + floatHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ + Ref: chunks.HeadSeriesRef(inner), + T: now.UnixNano() + 1, + FH: hist.ToFloat(nil), + }}, nil) + require.NoError(t, w.Log(floatHistogram)) + } + } + + // Start read after checkpoint, no more data written. + first, last, err := SegmentsRange(w.Dir()) + require.NoError(t, err) + + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true, true) + watcher.SetStartTime(now) + + // Set the Watcher's metrics so they're not nil pointers. + watcher.SetMetrics() + for i := first; i <= last; i++ { + segment, err := OpenReadSegmentByIndex(watcher.walDir, i) + require.NoError(t, err) + + reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment) + // Use tail true so we can ensure we got the right number of samples. + watcher.readSegment(reader, i, true) + require.NoError(t, segment.Close()) + } + + expectedSeries := seriesCount + expectedSamples := seriesCount * samplesCount + expectedExemplars := seriesCount * exemplarsCount + expectedHistograms := seriesCount * histogramsCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumSeries() >= expectedSeries + }) + require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series") + require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") + require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") + require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms") + require.Equal(t, expectedHistograms, wt.floatHistogramsAppended, "did not receive the expected number of float histograms") + }) + } + } +} + +func TestReadToEndNoCheckpoint(t *testing.T) { + const ( + seriesCount = 10 + samplesCount = 250 + ) + + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", v, compress), func(t *testing.T) { + dir := t.TempDir() + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) + + w, err := New(nil, nil, wdir, SegmentOptions{Compression: compress, Version: v}) + require.NoError(t, err) + defer func() { + require.NoError(t, w.Close()) + }() + + var ( + recs [][]byte + enc = record.Encoder{} + ) + for i := 0; i < seriesCount; i++ { + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(i), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + recs = append(recs, series) + for j := 0; j < samplesCount; j++ { + sample := enc.Samples([]record.RefSample{ + {Ref: chunks.HeadSeriesRef(j), T: int64(i), V: float64(i)}, + }, nil) + recs = append(recs, sample) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + require.NoError(t, w.Log(recs...)) + recs = recs[:0] + } + } + } + require.NoError(t, w.Log(recs...)) + overwriteReadTimeout(t, time.Second) + + _, _, err = SegmentsRange(w.Dir()) + require.NoError(t, err) + + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + go watcher.Start() + + expected := seriesCount + require.Eventually(t, func() bool { + return wt.checkNumSeries() == expected + }, 20*time.Second, 1*time.Second) + watcher.Stop() + }) + } + } +} + +func TestReadToEndWithCheckpoint(t *testing.T) { + const ( + // We need something similar to this # of series and samples + // in order to get enough segments for us to checkpoint. + seriesCount = 10 + samplesCount = 250 + segmentSize = 32 * 1024 + ) + + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", v, compress), func(t *testing.T) { + dir := t.TempDir() + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) + + enc := record.Encoder{} + w, err := New(nil, nil, wdir, SegmentOptions{ + Compression: compress, + Version: v, + Size: segmentSize, + }) + require.NoError(t, err) + defer func() { + require.NoError(t, w.Close()) + }() + + // Write to the initial segment then checkpoint it. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + // Add in an unknown record type, which should be ignored. + require.NoError(t, w.Log([]byte{255})) + + for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) sample := enc.Samples([]record.RefSample{ { @@ -531,31 +380,195 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { require.NoError(t, w.Log(sample)) } } - } - require.NoError(t, w.Close()) - // At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5. - checkpointDir := dir + "/wal/checkpoint.000004" - err = os.Mkdir(checkpointDir, 0o777) - require.NoError(t, err) - for i := 0; i <= 4; i++ { - err := os.Rename(SegmentName(dir+"/wal", i), SegmentName(checkpointDir, i)) + _, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(x chunks.HeadSeriesRef) bool { return true }, 0) require.NoError(t, err) - } + require.NoError(t, w.Truncate(1)) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - watcher.MaxSegment = -1 + // Write more records after checkpointing. + for i := 0; i < seriesCount; i++ { + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(i), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) - // Set the Watcher's metrics so they're not nil pointers. - watcher.SetMetrics() + for j := 0; j < samplesCount; j++ { + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(j), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + } - lastCheckpoint, _, err := LastCheckpoint(watcher.walDir) - require.NoError(t, err) + _, _, err = SegmentsRange(w.Dir()) + require.NoError(t, err) + overwriteReadTimeout(t, time.Second) + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + go watcher.Start() - err = watcher.readCheckpoint(lastCheckpoint, (*Watcher).readSegment) - require.NoError(t, err) - }) + expected := seriesCount * 2 + + require.Eventually(t, func() bool { + fmt.Println(wt.checkNumSeries()) + return wt.checkNumSeries() == expected + }, 10*time.Second, 1*time.Second) + watcher.Stop() + }) + } + } +} + +func TestReadCheckpoint(t *testing.T) { + const seriesCount = 10 + const samplesCount = 250 + + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", v, compress), func(t *testing.T) { + dir := t.TempDir() + + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) + + f, err := os.Create(SegmentName(wdir, 30, v)) + require.NoError(t, err) + require.NoError(t, f.Close()) + + enc := record.Encoder{} + w, err := New(nil, nil, wdir, SegmentOptions{Compression: compress, Version: v}) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, w.Close()) + }) + + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + {Ref: chunks.HeadSeriesRef(inner), T: int64(i), V: float64(i)}, + }, nil) + require.NoError(t, w.Log(sample)) + } + } + _, err = w.NextSegmentSync() + require.NoError(t, err) + _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(x chunks.HeadSeriesRef) bool { return true }, 0) + require.NoError(t, err) + require.NoError(t, w.Truncate(32)) + + // Start read after checkpoint, no more data written. + _, _, err = SegmentsRange(w.Dir()) + require.NoError(t, err) + + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + go watcher.Start() + + expectedSeries := seriesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumSeries() >= expectedSeries + }) + watcher.Stop() + require.Equal(t, expectedSeries, wt.checkNumSeries()) + }) + } + } +} + +func TestReadCheckpointMultipleSegments(t *testing.T) { + pageSize := 32 * 1024 + + const segments = 1 + const seriesCount = 20 + const samplesCount = 300 + + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", v, compress), func(t *testing.T) { + + dir := t.TempDir() + + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) + + enc := record.Encoder{} + w, err := New(nil, nil, wdir, SegmentOptions{ + Compression: compress, + Version: v, + Size: pageSize, + }) + require.NoError(t, err) + + // Write a bunch of data. + for i := 0; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + } + } + require.NoError(t, w.Close()) + + // At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5. + checkpointDir := dir + "/wal/checkpoint.000004" + err = os.Mkdir(checkpointDir, 0o777) + require.NoError(t, err) + for i := 0; i <= 4; i++ { + err := os.Rename(SegmentName(dir+"/wal", i, v), SegmentName(checkpointDir, i, v)) + require.NoError(t, err) + } + + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher.MaxSegment = -1 + + // Set the Watcher's metrics so they're not nil pointers. + watcher.SetMetrics() + + lastCheckpoint, _, err := LastCheckpoint(watcher.walDir) + require.NoError(t, err) + + err = watcher.readCheckpoint(lastCheckpoint, (*Watcher).readSegment) + require.NoError(t, err) + }) + } } } @@ -565,15 +578,14 @@ func TestCheckpointSeriesReset(t *testing.T) { // in order to get enough segments for us to checkpoint. const seriesCount = 20 const samplesCount = 350 - testCases := []struct { + + for _, tc := range []struct { compress CompressionType segments int }{ {compress: CompressionNone, segments: 14}, {compress: CompressionSnappy, segments: 13}, - } - - for _, tc := range testCases { + } { t.Run(fmt.Sprintf("compress=%s", tc.compress), func(t *testing.T) { dir := t.TempDir() @@ -582,7 +594,10 @@ func TestCheckpointSeriesReset(t *testing.T) { require.NoError(t, err) enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, segmentSize, tc.compress) + w, err := New(nil, nil, wdir, SegmentOptions{ + Compression: tc.compress, + Size: segmentSize, + }) require.NoError(t, err) defer func() { require.NoError(t, w.Close()) @@ -612,7 +627,7 @@ func TestCheckpointSeriesReset(t *testing.T) { } } - _, _, err = Segments(w.Dir()) + _, _, err = SegmentsRange(w.Dir()) require.NoError(t, err) overwriteReadTimeout(t, time.Second) @@ -637,7 +652,11 @@ func TestCheckpointSeriesReset(t *testing.T) { _, cpi, err := LastCheckpoint(path.Join(dir, "wal")) require.NoError(t, err) - err = watcher.garbageCollectSeries(cpi + 1) + + s, err := segmentByIndex(path.Join(dir, "wal"), cpi+1) + require.NoError(t, err) + + err = watcher.garbageCollectSeries(s) require.NoError(t, err) watcher.Stop() @@ -657,55 +676,61 @@ func TestRun_StartupTime(t *testing.T) { const seriesCount = 20 const samplesCount = 300 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(string(compress), func(t *testing.T) { - dir := t.TempDir() + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", v, compress), func(t *testing.T) { + dir := t.TempDir() - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, pageSize, compress) - require.NoError(t, err) + enc := record.Encoder{} + w, err := New(nil, nil, wdir, SegmentOptions{ + Compression: compress, + Version: v, + Size: pageSize, + }) + require.NoError(t, err) - for i := 0; i < segments; i++ { - for j := 0; j < seriesCount; j++ { - ref := j + (i * 100) - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for k := 0; k < samplesCount; k++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ + for i := 0; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ { - Ref: chunks.HeadSeriesRef(inner), - T: int64(i), - V: float64(i), + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), }, }, nil) - require.NoError(t, w.Log(sample)) + require.NoError(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } } } - } - require.NoError(t, w.Close()) + require.NoError(t, w.Close()) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - watcher.MaxSegment = segments + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher.MaxSegment = segments - watcher.SetMetrics() - startTime := time.Now() + watcher.SetMetrics() + startTime := time.Now() - err = watcher.Run() - require.Less(t, time.Since(startTime), readTimeout) - require.NoError(t, err) - }) + err = watcher.Run() + require.Less(t, time.Since(startTime), readTimeout) + require.NoError(t, err) + }) + } } } @@ -750,61 +775,67 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { const seriesCount = 10 const samplesCount = 50 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(string(compress), func(t *testing.T) { - dir := t.TempDir() + for _, v := range supportedSegmentVersions() { + for _, compress := range SupportedCompressions() { + t.Run(fmt.Sprintf("v=%v/compress=%s", v, compress), func(t *testing.T) { + dir := t.TempDir() - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) - w, err := NewSize(nil, nil, wdir, segmentSize, compress) - require.NoError(t, err) - // Write to 00000000, the watcher will read series from it. - require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount)) - // Create 00000001, the watcher will tail it once started. - w.NextSegment() - - // Set up the watcher and run it in the background. - wt := newWriteToMock(time.Millisecond) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - watcher.SetMetrics() - watcher.MaxSegment = segmentsToRead - - var g errgroup.Group - g.Go(func() error { - startTime := time.Now() - err = watcher.Run() - if err != nil { - return err - } - // If the watcher was to wait for readTicker to read every new segment, it would need readTimeout * segmentsToRead. - d := time.Since(startTime) - if d > readTimeout { - return fmt.Errorf("watcher ran for %s, it shouldn't rely on readTicker=%s to read the new segments", d, readTimeout) - } - return nil - }) - - // The watcher went through 00000000 and is tailing the next one. - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() == seriesCount - }) - - // In the meantime, add some new segments in bulk. - // We should end up with segmentsToWrite + 1 segments now. - for i := 1; i < segmentsToWrite; i++ { - require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount)) + w, err := New(nil, nil, wdir, SegmentOptions{ + Compression: compress, + Version: v, + Size: pageSize, + }) + require.NoError(t, err) + // Write to 00000000, the watcher will read series from it. + require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount)) + // Create 00000001, the watcher will tail it once started. w.NextSegment() - } - // Wait for the watcher. - require.NoError(t, g.Wait()) + // Set up the watcher and run it in the background. + wt := newWriteToMock(time.Millisecond) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher.SetMetrics() + watcher.MaxSegment = segmentsToRead - // All series and samples were read. - require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read. - require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended) - require.NoError(t, w.Close()) - }) + var g errgroup.Group + g.Go(func() error { + startTime := time.Now() + err = watcher.Run() + if err != nil { + return err + } + // If the watcher was to wait for readTicker to read every new segment, it would need readTimeout * segmentsToRead. + d := time.Since(startTime) + if d > readTimeout { + return fmt.Errorf("watcher ran for %s, it shouldn't rely on readTicker=%s to read the new segments", d, readTimeout) + } + return nil + }) + + // The watcher went through 00000000 and is tailing the next one. + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumSeries() == seriesCount + }) + + // In the meantime, add some new segments in bulk. + // We should end up with segmentsToWrite + 1 segments now. + for i := 1; i < segmentsToWrite; i++ { + require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount)) + w.NextSegment() + } + + // Wait for the watcher. + require.NoError(t, g.Wait()) + + // All series and samples were read. + require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read. + require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended) + require.NoError(t, w.Close()) + }) + } } } diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index 54c257d61a..5ce8381dd8 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -26,6 +26,7 @@ import ( "path/filepath" "slices" "strconv" + "strings" "sync" "time" @@ -38,12 +39,94 @@ import ( ) const ( - DefaultSegmentSize = 128 * 1024 * 1024 // DefaultSegmentSize is 128 MB. - pageSize = 32 * 1024 // pageSize is 32KB. - recordHeaderSize = 7 - WblDirName = "wbl" + DefaultCompression = CompressionNone + DefaultSegmentSize = 128 * 1024 * 1024 // DefaultSegmentSize is 128 MB. + DefaultSegmentVersion = SegmentFormatV1 + + pageSize = 32 * 1024 // pageSize is 32KB. + recordHeaderSize = 7 + + WblDirName = "wbl" ) +// SegmentVersion represents the WAL segment format version. +// See: https://github.com/prometheus/proposals/pull/40 +// Version is encoded in the segment filename, following the <...>-v2 format. +type SegmentVersion uint32 + +const ( + // SegmentFormatV1 represents version 1 of the WAL segment format. + SegmentFormatV1 SegmentVersion = 1 + // SegmentFormatV2 represents version 2 of the WAL segment format. + SegmentFormatV2 SegmentVersion = 2 +) + +func (v SegmentVersion) isSupported() bool { + for _, supported := range supportedSegmentVersions() { + if v == supported { + return true + } + } + return false +} + +// IsReleased returns true if the version is in ReleasedSupportedSegmentVersions. +func (v SegmentVersion) IsReleased() bool { + for _, released := range ReleasedSupportedSegmentVersions() { + if v == released { + return true + } + } + return false +} + +// ReleasedSupportedSegmentVersions returns the segment versions this package supports, +// and which are officially released and can be announced to Prometheus users. +// This is meant to be used in Prometheus flag help. +// +// This allows developing on the new WAL breaking changes in the main branch. +func ReleasedSupportedSegmentVersions() (ret []SegmentVersion) { + for _, v := range supportedSegmentVersions() { + // TODO(bwplotka): v2 is still in development, remove this once officially released. + if v == SegmentFormatV2 { + continue + } + ret = append(ret, v) + } + return ret +} + +// supportedSegmentVersions returns the segment versions this package supports, +// in ascending order. +func supportedSegmentVersions() []SegmentVersion { + return []SegmentVersion{SegmentFormatV1, SegmentFormatV2} +} + +type CompressionType string + +const ( + CompressionNone CompressionType = "none" + CompressionSnappy CompressionType = "snappy" + CompressionZstd CompressionType = "zstd" +) + +// SupportedCompressions returns the compressions this package supports. +func SupportedCompressions() []CompressionType { + return []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} +} + +// ParseCompressionType parses the two compression-related configuration values and returns the CompressionType. If +// compression is enabled but the compressType is unrecognized, we default to Snappy compression. +func ParseCompressionType(compress bool, compressType string) CompressionType { + if compress { + if compressType == "zstd" { + return CompressionZstd + } + return CompressionSnappy + } + return CompressionNone +} + // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it // before. @@ -89,6 +172,7 @@ type Segment struct { SegmentFile dir string i int + v SegmentVersion } // Index returns the index of the segment. @@ -96,6 +180,11 @@ func (s *Segment) Index() int { return s.i } +// Version returns segment version. +func (s *Segment) Version() SegmentVersion { + return s.v +} + // Dir returns the directory of the segment. func (s *Segment) Dir() string { return s.dir @@ -103,90 +192,67 @@ func (s *Segment) Dir() string { // CorruptionErr is an error that's returned when corruption is encountered. type CorruptionErr struct { - Dir string - Segment int - Offset int64 - Err error + Dir string + SegmentIndex int + SegmentVersion SegmentVersion + Offset int64 + Err error } func (e *CorruptionErr) Error() string { - if e.Segment < 0 { + if e.SegmentIndex < 0 { return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err) } - return fmt.Sprintf("corruption in segment %s at %d: %s", SegmentName(e.Dir, e.Segment), e.Offset, e.Err) + return fmt.Sprintf("corruption in segment %s at %d: %s", e.SegmentName(), e.Offset, e.Err) +} + +func (e *CorruptionErr) SegmentName() string { + if e.SegmentIndex < 0 { + return "" + } + return SegmentName(e.Dir, e.SegmentIndex, e.SegmentVersion) } func (e *CorruptionErr) Unwrap() error { return e.Err } -// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. -func OpenWriteSegment(logger *slog.Logger, dir string, k int) (*Segment, error) { - segName := SegmentName(dir, k) - f, err := os.OpenFile(segName, os.O_WRONLY|os.O_APPEND, 0o666) +// CreateSegment creates a new segment k in dir. +func CreateSegment(dir string, k int, v SegmentVersion) (*Segment, error) { + f, err := os.OpenFile(SegmentName(dir, k, v), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o666) if err != nil { return nil, err } - stat, err := f.Stat() - if err != nil { - f.Close() - return nil, err - } - // If the last page is torn, fill it with zeros. - // In case it was torn after all records were written successfully, this - // will just pad the page and everything will be fine. - // If it was torn mid-record, a full read (which the caller should do anyway - // to ensure integrity) will detect it as a corruption by the end. - if d := stat.Size() % pageSize; d != 0 { - logger.Warn("Last page of the wlog is torn, filling it with zeros", "segment", segName) - if _, err := f.Write(make([]byte, pageSize-d)); err != nil { - f.Close() - return nil, fmt.Errorf("zero-pad torn page: %w", err) - } - } - return &Segment{SegmentFile: f, i: k, dir: dir}, nil + return &Segment{SegmentFile: f, i: k, v: v, dir: dir}, nil } -// CreateSegment creates a new segment k in dir. -func CreateSegment(dir string, k int) (*Segment, error) { - f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o666) +// OpenReadSegmentByIndex opens the segment with the given sequence (index) prefix. +// It returns error if there are multiple files with the same sequence, but e.g. +// different version. +func OpenReadSegmentByIndex(dir string, i int) (*Segment, error) { + s, err := segmentByIndex(dir, i) if err != nil { return nil, err } - return &Segment{SegmentFile: f, i: k, dir: dir}, nil + return OpenReadSegment(filepath.Join(dir, s.fName)) } // OpenReadSegment opens the segment with the given filename. func OpenReadSegment(fn string) (*Segment, error) { - k, err := strconv.Atoi(filepath.Base(fn)) + k, v, err := ParseSegmentName(filepath.Base(fn)) if err != nil { - return nil, errors.New("not a valid filename") + return nil, err } + + if !v.isSupported() { + return nil, fmt.Errorf("unsupported WAL segment version %v; supported versions %v", v, supportedSegmentVersions()) + } + f, err := os.Open(fn) if err != nil { return nil, err } - return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil -} - -type CompressionType string - -const ( - CompressionNone CompressionType = "none" - CompressionSnappy CompressionType = "snappy" - CompressionZstd CompressionType = "zstd" -) - -// ParseCompressionType parses the two compression-related configuration values and returns the CompressionType. If -// compression is enabled but the compressType is unrecognized, we default to Snappy compression. -func ParseCompressionType(compress bool, compressType string) CompressionType { - if compress { - if compressType == "zstd" { - return CompressionZstd - } - return CompressionSnappy - } - return CompressionNone + return &Segment{SegmentFile: f, i: k, v: v, dir: filepath.Dir(fn)}, nil } // WL is a write log that stores records in segment files. @@ -200,9 +266,10 @@ func ParseCompressionType(compress bool, compressType string) CompressionType { // safely truncated. It also ensures that torn writes never corrupt records // beyond the most recent segment. type WL struct { - dir string + dir string + opts SegmentOptions + logger *slog.Logger - segmentSize int mtx sync.RWMutex segment *Segment // Active segment. donePages int // Pages written to the segment. @@ -210,7 +277,6 @@ type WL struct { stopc chan chan struct{} actorc chan func() closed bool // To allow calling Close() more than once without blocking. - compress CompressionType compressBuf []byte zstdWriter *zstd.Encoder @@ -308,26 +374,60 @@ func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics { return m } -// New returns a new WAL over the given directory. -func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress CompressionType) (*WL, error) { - return NewSize(logger, reg, dir, DefaultSegmentSize, compress) +// SegmentOptions defines the options for WAL segment. +type SegmentOptions struct { + // Compression to use when writing (or repairing) records in segments. + // Defaults to DefaultCompression. + Compression CompressionType + + // Version to use when writing (or repairing) segments. + // Must be within the SupportedSegmentVersions() slice. + // Defaults to DefaultSegmentVersion. + Version SegmentVersion + + // Size specifies the maximum segment size. + // Has to be a multiplication of 32KB. + // Defaults to DefaultSegmentSize. + Size int } -// NewSize returns a new write log over the given directory. -// New segments are created with the specified size. -func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress CompressionType) (*WL, error) { - if segmentSize%pageSize != 0 { - return nil, errors.New("invalid segment size") +func (opts *SegmentOptions) validateAndDefault() error { + if opts.Version == 0 { + opts.Version = DefaultSegmentVersion + } else if !opts.Version.isSupported() { + return fmt.Errorf("invalid segment version: %v, supported: %v", opts.Version, supportedSegmentVersions()) } - if err := os.MkdirAll(dir, 0o777); err != nil { - return nil, fmt.Errorf("create dir: %w", err) + if opts.Size <= 0 { + opts.Size = DefaultSegmentSize } - if logger == nil { - logger = promslog.NewNopLogger() + if opts.Size%pageSize != 0 { + return fmt.Errorf("invalid segment size: %v; not a multiplication of %v", opts.Size, pageSize) } + if opts.Compression == "" { + opts.Compression = DefaultCompression + } else { + var supported bool + for _, c := range SupportedCompressions() { + if opts.Compression == c { + supported = true + break + } + } + if !supported { + return fmt.Errorf("invalid compression: %v, supported: %v", opts.Compression, SupportedCompressions()) + } + } + return nil +} + +// New returns a new WAL over the given directory +func New(logger *slog.Logger, reg prometheus.Registerer, dir string, opts SegmentOptions) (*WL, error) { + if err := opts.validateAndDefault(); err != nil { + return nil, err + } var zstdWriter *zstd.Encoder - if compress == CompressionZstd { + if opts.Compression == CompressionZstd { var err error zstdWriter, err = zstd.NewWriter(nil) if err != nil { @@ -335,15 +435,20 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment } } + if err := os.MkdirAll(dir, 0o777); err != nil { + return nil, fmt.Errorf("create dir: %w", err) + } + if logger == nil { + logger = promslog.NewNopLogger() + } w := &WL{ - dir: dir, - logger: logger, - segmentSize: segmentSize, - page: &page{}, - actorc: make(chan func(), 100), - stopc: make(chan chan struct{}), - compress: compress, - zstdWriter: zstdWriter, + dir: dir, + opts: opts, + logger: logger, + page: &page{}, + actorc: make(chan func(), 100), + stopc: make(chan chan struct{}), + zstdWriter: zstdWriter, } prefix := "prometheus_tsdb_wal_" if filepath.Base(dir) == WblDirName { @@ -351,7 +456,7 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment } w.metrics = newWLMetrics(w, prometheus.WrapRegistererWithPrefix(prefix, reg)) - _, last, err := Segments(w.Dir()) + _, last, err := SegmentsRange(w.Dir()) if err != nil { return nil, fmt.Errorf("get segment range: %w", err) } @@ -363,7 +468,7 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment writeSegmentIndex = last + 1 } - segment, err := CreateSegment(w.Dir(), writeSegmentIndex) + segment, err := CreateSegment(w.Dir(), writeSegmentIndex, w.opts.Version) if err != nil { return nil, err } @@ -378,6 +483,7 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment } // Open an existing WAL. +// TODO(bwplotka): It seems this is a read only WAL, safe-guard this or accept all options? func Open(logger *slog.Logger, dir string) (*WL, error) { if logger == nil { logger = promslog.NewNopLogger() @@ -387,18 +493,22 @@ func Open(logger *slog.Logger, dir string) (*WL, error) { return nil, err } - w := &WL{ - dir: dir, + return &WL{ + dir: dir, + opts: SegmentOptions{ + // TODO(bwplotka): If it's read-only it's not needed, but nothing safe-guards read-only aspect? + Compression: CompressionZstd, + Version: DefaultSegmentVersion, + Size: DefaultSegmentSize, + }, logger: logger, zstdWriter: zstdWriter, - } - - return w, nil + }, nil } // CompressionType returns if compression is enabled on this WAL. func (w *WL) CompressionType() CompressionType { - return w.compress + return w.opts.Compression } // Dir returns the directory of the WAL. @@ -440,18 +550,18 @@ func (w *WL) Repair(origErr error) error { if !errors.As(origErr, &cerr) { return fmt.Errorf("cannot handle error: %w", origErr) } - if cerr.Segment < 0 { + if cerr.SegmentIndex < 0 { return errors.New("corruption error does not specify position") } w.logger.Warn("Starting corruption repair", - "segment", cerr.Segment, "offset", cerr.Offset) + "segment", cerr.SegmentName(), "offset", cerr.Offset) // All segments behind the corruption can no longer be used. segs, err := listSegments(w.Dir()) if err != nil { return fmt.Errorf("list segments: %w", err) } - w.logger.Warn("Deleting all segments newer than corrupted segment", "segment", cerr.Segment) + w.logger.Warn("Deleting all segments newer than corrupted segment", "segment", cerr.SegmentName()) for _, s := range segs { if w.segment.i == s.index { @@ -463,26 +573,26 @@ func (w *WL) Repair(origErr error) error { return fmt.Errorf("close active segment: %w", err) } } - if s.index <= cerr.Segment { + if s.index <= cerr.SegmentIndex { continue } - if err := os.Remove(filepath.Join(w.Dir(), s.name)); err != nil { + if err := os.Remove(filepath.Join(w.Dir(), s.fName)); err != nil { return fmt.Errorf("delete segment:%v: %w", s.index, err) } } // Regardless of the corruption offset, no record reaches into the previous segment. // So we can safely repair the WAL by removing the segment and re-inserting all // its records up to the corruption. - w.logger.Warn("Rewrite corrupted segment", "segment", cerr.Segment) + w.logger.Warn("Rewrite corrupted segment", "segment", cerr.SegmentName()) - fn := SegmentName(w.Dir(), cerr.Segment) + fn := SegmentName(w.Dir(), cerr.SegmentIndex, w.opts.Version) tmpfn := fn + ".repair" if err := fileutil.Rename(fn, tmpfn); err != nil { return err } // Create a clean segment and make it the active one. - s, err := CreateSegment(w.Dir(), cerr.Segment) + s, err := CreateSegment(w.Dir(), cerr.SegmentIndex, w.opts.Version) if err != nil { return err } @@ -525,21 +635,63 @@ func (w *WL) Repair(origErr error) error { } // Explicitly close the segment we just repaired to avoid issues with Windows. - s.Close() + _ = s.Close() // We always want to start writing to a new Segment rather than an existing // Segment, which is handled by NewSize, but earlier in Repair we're deleting // all segments that come after the corrupted Segment. Recreate a new Segment here. - s, err = CreateSegment(w.Dir(), cerr.Segment+1) + s, err = CreateSegment(w.Dir(), cerr.SegmentIndex+1, w.opts.Version) if err != nil { return err } return w.setSegment(s) } -// SegmentName builds a segment name for the directory. -func SegmentName(dir string, i int) string { - return filepath.Join(dir, fmt.Sprintf("%08d", i)) +// SegmentName builds a segment filename for the directory. +func SegmentName(dir string, i int, v SegmentVersion) string { + if v == SegmentFormatV1 { + return filepath.Join(dir, fmt.Sprintf("%08d", i)) + } + return filepath.Join(dir, fmt.Sprintf("%08d-v%d", i, v)) +} + +func segmentByIndex(dir string, i int) (segmentRef, error) { + glob := filepath.Join(dir, fmt.Sprintf("%08d*", i)) + matches, err := filepath.Glob(glob) + if err != nil { + return segmentRef{}, fmt.Errorf("failed to glob %v: %w", glob, err) + } + if len(matches) != 1 { + return segmentRef{}, fmt.Errorf("unexpected multiple segments for the same index after globbing %v", matches) + } + return segmentRef{ + fName: filepath.Base(matches[0]), + index: i, + }, nil +} + +// ParseSegmentName parses segment sequence and version from the segment filename. +func ParseSegmentName(name string) (int, SegmentVersion, error) { + s := strings.Split(name, "-v") + if len(s) > 2 { + // Parsing is strict, changing filename syntax requires another version. + return 0, 0, fmt.Errorf("invalid segment filename %v; expected one or two parts between -v", name) + } + + k, err := strconv.Atoi(s[0]) + if err != nil { + return 0, 0, fmt.Errorf("invalid segment filename %v; first part between -v is not a number: %w", name, err) + } + v := SegmentFormatV1 + if len(s) == 1 { + return k, v, nil + } + + ver, err := strconv.Atoi(s[1]) + if err != nil { + return 0, 0, fmt.Errorf("invalid segment filename %v; second -v part is not a number: %w", name, err) + } + return k, SegmentVersion(ver), nil } // NextSegment creates the next segment and closes the previous one asynchronously. @@ -571,7 +723,7 @@ func (w *WL) nextSegment(async bool) (int, error) { return 0, err } } - next, err := CreateSegment(w.Dir(), w.segment.Index()+1) + next, err := CreateSegment(w.Dir(), w.segment.Index()+1, w.opts.Version) if err != nil { return 0, fmt.Errorf("create new segment file: %w", err) } @@ -682,7 +834,7 @@ func (t recType) String() string { } func (w *WL) pagesPerSegment() int { - return w.segmentSize / pageSize + return w.opts.Size / pageSize } // Log writes the records into the log. @@ -716,7 +868,7 @@ func (w *WL) log(rec []byte, final bool) error { // Compress the record before calculating if a new segment is needed. compressed := false - if w.compress == CompressionSnappy && len(rec) > 0 { + if w.opts.Compression == CompressionSnappy && len(rec) > 0 { // If MaxEncodedLen is less than 0 the record is too large to be compressed. if len(rec) > 0 && snappy.MaxEncodedLen(len(rec)) >= 0 { // The snappy library uses `len` to calculate if we need a new buffer. @@ -729,7 +881,7 @@ func (w *WL) log(rec []byte, final bool) error { compressed = true } } - } else if w.compress == CompressionZstd && len(rec) > 0 { + } else if w.opts.Compression == CompressionZstd && len(rec) > 0 { w.compressBuf = w.zstdWriter.EncodeAll(rec, w.compressBuf[:0]) if len(w.compressBuf) < len(rec) { rec = w.compressBuf @@ -773,9 +925,9 @@ func (w *WL) log(rec []byte, final bool) error { typ = recMiddle } if compressed { - if w.compress == CompressionSnappy { + if w.opts.Compression == CompressionSnappy { typ |= snappyMask - } else if w.compress == CompressionZstd { + } else if w.opts.Compression == CompressionZstd { typ |= zstdMask } } @@ -815,7 +967,7 @@ func (w *WL) LastSegmentAndOffset() (seg, offset int, err error) { w.mtx.Lock() defer w.mtx.Unlock() - _, seg, err = Segments(w.Dir()) + _, seg, err = SegmentsRange(w.Dir()) if err != nil { return } @@ -841,7 +993,7 @@ func (w *WL) Truncate(i int) (err error) { if r.index >= i { break } - if err = os.Remove(filepath.Join(w.Dir(), r.name)); err != nil { + if err = os.Remove(filepath.Join(w.Dir(), r.fName)); err != nil { return err } } @@ -901,12 +1053,12 @@ func (w *WL) Close() (err error) { return nil } -// Segments returns the range [first, n] of currently existing segments. -// If no segments are found, first and n are -1. -func Segments(wlDir string) (first, last int, err error) { +// SegmentsRange returns the range [first, n] of currently existing segments. +// If no segments are found, first and last are -1. +func SegmentsRange(wlDir string) (first, last int, err error) { refs, err := listSegments(wlDir) if err != nil { - return 0, 0, err + return -1, -1, err } if len(refs) == 0 { return -1, -1, nil @@ -914,11 +1066,16 @@ func Segments(wlDir string) (first, last int, err error) { return refs[0].index, refs[len(refs)-1].index, nil } +// segmentRef references segment file, while providing its sequence (index) name. type segmentRef struct { - name string + fName string index int } +func (s segmentRef) String() string { + return s.fName +} + func listSegments(dir string) (refs []segmentRef, err error) { files, err := os.ReadDir(dir) if err != nil { @@ -926,18 +1083,22 @@ func listSegments(dir string) (refs []segmentRef, err error) { } for _, f := range files { fn := f.Name() - k, err := strconv.Atoi(fn) + + k, _, err := ParseSegmentName(f.Name()) if err != nil { + // Malformed segment filenames are skipped. + // TODO(bwplotka): Log? continue } - refs = append(refs, segmentRef{name: fn, index: k}) + refs = append(refs, segmentRef{fName: fn, index: k}) } slices.SortFunc(refs, func(a, b segmentRef) int { return a.index - b.index }) for i := 0; i < len(refs)-1; i++ { if refs[i].index+1 != refs[i+1].index { - return nil, errors.New("segments are not sequential") + // TODO(bwplotka): That feels like a recoverable error if it's just a gap? + return nil, fmt.Errorf("segments are not sequential, found %v after %v", refs[i+1].index, refs[i].index) } } return refs, nil @@ -972,9 +1133,9 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { if sgmRange.Last >= 0 && r.index > sgmRange.Last { break } - s, err := OpenReadSegment(filepath.Join(sgmRange.Dir, r.name)) + s, err := OpenReadSegment(filepath.Join(sgmRange.Dir, r.fName)) if err != nil { - return nil, fmt.Errorf("open segment:%v in dir:%v: %w", r.name, sgmRange.Dir, err) + return nil, fmt.Errorf("open segment:%v in dir:%v: %w", r.fName, sgmRange.Dir, err) } segs = append(segs, s) } diff --git a/tsdb/wlog/wlog_test.go b/tsdb/wlog/wlog_test.go index d195aaee2f..d4815015d1 100644 --- a/tsdb/wlog/wlog_test.go +++ b/tsdb/wlog/wlog_test.go @@ -25,7 +25,6 @@ import ( "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -39,170 +38,179 @@ func TestMain(m *testing.M) { // TestWALRepair_ReadingError ensures that a repair is run for an error // when reading a record. func TestWALRepair_ReadingError(t *testing.T) { - for name, test := range map[string]struct { - corrSgm int // Which segment to corrupt. - corrFunc func(f *os.File) // Func that applies the corruption. - intactRecs int // Total expected records left after the repair. - }{ - "torn_last_record": { - 2, - func(f *os.File) { - _, err := f.Seek(pageSize*2, 0) - require.NoError(t, err) - _, err = f.Write([]byte{byte(recFirst)}) - require.NoError(t, err) - }, - 8, - }, - // Ensures that the page buffer is big enough to fit - // an entire page size without panicking. - // https://github.com/prometheus/tsdb/pull/414 - "bad_header": { - 1, - func(f *os.File) { - _, err := f.Seek(pageSize, 0) - require.NoError(t, err) - _, err = f.Write([]byte{byte(recPageTerm)}) - require.NoError(t, err) - }, - 4, - }, - "bad_fragment_sequence": { - 1, - func(f *os.File) { - _, err := f.Seek(pageSize, 0) - require.NoError(t, err) - _, err = f.Write([]byte{byte(recLast)}) - require.NoError(t, err) - }, - 4, - }, - "bad_fragment_flag": { - 1, - func(f *os.File) { - _, err := f.Seek(pageSize, 0) - require.NoError(t, err) - _, err = f.Write([]byte{123}) - require.NoError(t, err) - }, - 4, - }, - "bad_checksum": { - 1, - func(f *os.File) { - _, err := f.Seek(pageSize+4, 0) - require.NoError(t, err) - _, err = f.Write([]byte{0}) - require.NoError(t, err) - }, - 4, - }, - "bad_length": { - 1, - func(f *os.File) { - _, err := f.Seek(pageSize+2, 0) - require.NoError(t, err) - _, err = f.Write([]byte{0}) - require.NoError(t, err) - }, - 4, - }, - "bad_content": { - 1, - func(f *os.File) { - _, err := f.Seek(pageSize+100, 0) - require.NoError(t, err) - _, err = f.Write([]byte("beef")) - require.NoError(t, err) - }, - 4, - }, - } { - t.Run(name, func(t *testing.T) { - dir := t.TempDir() + for _, ver := range supportedSegmentVersions() { + t.Run("v=%v", func(t *testing.T) { - // We create 3 segments with 3 records each and - // then corrupt a given record in a given segment. - // As a result we want a repaired WAL with given intact records. - segSize := 3 * pageSize - w, err := NewSize(nil, nil, dir, segSize, CompressionNone) - require.NoError(t, err) + for name, test := range map[string]struct { + corrSgm int // Which segment to corrupt. + corrFunc func(f *os.File) // Func that applies the corruption. + intactRecs int // Total expected records left after the repair. + }{ + "torn_last_record": { + 2, + func(f *os.File) { + _, err := f.Seek(pageSize*2, 0) + require.NoError(t, err) + _, err = f.Write([]byte{byte(recFirst)}) + require.NoError(t, err) + }, + 8, + }, + // Ensures that the page buffer is big enough to fit + // an entire page size without panicking. + // https://github.com/prometheus/tsdb/pull/414 + "bad_header": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize, 0) + require.NoError(t, err) + _, err = f.Write([]byte{byte(recPageTerm)}) + require.NoError(t, err) + }, + 4, + }, + "bad_fragment_sequence": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize, 0) + require.NoError(t, err) + _, err = f.Write([]byte{byte(recLast)}) + require.NoError(t, err) + }, + 4, + }, + "bad_fragment_flag": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize, 0) + require.NoError(t, err) + _, err = f.Write([]byte{123}) + require.NoError(t, err) + }, + 4, + }, + "bad_checksum": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize+4, 0) + require.NoError(t, err) + _, err = f.Write([]byte{0}) + require.NoError(t, err) + }, + 4, + }, + "bad_length": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize+2, 0) + require.NoError(t, err) + _, err = f.Write([]byte{0}) + require.NoError(t, err) + }, + 4, + }, + "bad_content": { + 1, + func(f *os.File) { + _, err := f.Seek(pageSize+100, 0) + require.NoError(t, err) + _, err = f.Write([]byte("beef")) + require.NoError(t, err) + }, + 4, + }, + } { + t.Run(name, func(t *testing.T) { + dir := t.TempDir() - var records [][]byte + // We create 3 segments with 3 records each and + // then corrupt a given record in a given segment. + // As a result we want a repaired WAL with given intact records. + opts := SegmentOptions{ + Size: 3 * pageSize, + Version: ver, + } - for i := 1; i <= 9; i++ { - b := make([]byte, pageSize-recordHeaderSize) - b[0] = byte(i) - records = append(records, b) - require.NoError(t, w.Log(b)) + w, err := New(nil, nil, dir, opts) + require.NoError(t, err) + + var records [][]byte + + for i := 1; i <= 9; i++ { + b := make([]byte, pageSize-recordHeaderSize) + b[0] = byte(i) + records = append(records, b) + require.NoError(t, w.Log(b)) + } + first, last, err := SegmentsRange(w.Dir()) + require.NoError(t, err) + require.Equal(t, 3, 1+last-first, "wlog creation didn't result in expected number of segments") + + require.NoError(t, w.Close()) + + f, err := os.OpenFile(SegmentName(dir, test.corrSgm, ver), os.O_RDWR, 0o666) + require.NoError(t, err) + + // Apply corruption function. + test.corrFunc(f) + + require.NoError(t, f.Close()) + + w, err = New(nil, nil, dir, opts) + require.NoError(t, err) + defer w.Close() + + first, last, err = SegmentsRange(w.Dir()) + require.NoError(t, err) + + // Backfill segments from the most recent checkpoint onwards. + for i := first; i <= last; i++ { + s, err := OpenReadSegment(SegmentName(w.Dir(), i, ver)) + require.NoError(t, err) + + sr := NewSegmentBufReader(s) + require.NoError(t, err) + r := NewReader(sr) + for r.Next() { + } + + // Close the segment so we don't break things on Windows. + s.Close() + + // No corruption in this segment. + if r.Err() == nil { + continue + } + require.NoError(t, w.Repair(r.Err())) + break + } + + sr, err := NewSegmentsReader(dir) + require.NoError(t, err) + defer sr.Close() + r := NewReader(sr) + + var result [][]byte + for r.Next() { + var b []byte + result = append(result, append(b, r.Record()...)) + } + require.NoError(t, r.Err()) + require.Len(t, result, test.intactRecs, "Wrong number of intact records") + + for i, r := range result { + require.True(t, bytes.Equal(records[i], r), "record %d diverges: want %x, got %x", i, records[i][:10], r[:10]) + } + + // Make sure there is a new 0 size Segment after the corrupted Segment. + _, last, err = SegmentsRange(w.Dir()) + require.NoError(t, err) + require.Equal(t, test.corrSgm+1, last) + fi, err := os.Stat(SegmentName(dir, last, ver)) + require.NoError(t, err) + require.Equal(t, int64(0), fi.Size()) + }) } - first, last, err := Segments(w.Dir()) - require.NoError(t, err) - require.Equal(t, 3, 1+last-first, "wlog creation didn't result in expected number of segments") - - require.NoError(t, w.Close()) - - f, err := os.OpenFile(SegmentName(dir, test.corrSgm), os.O_RDWR, 0o666) - require.NoError(t, err) - - // Apply corruption function. - test.corrFunc(f) - - require.NoError(t, f.Close()) - - w, err = NewSize(nil, nil, dir, segSize, CompressionNone) - require.NoError(t, err) - defer w.Close() - - first, last, err = Segments(w.Dir()) - require.NoError(t, err) - - // Backfill segments from the most recent checkpoint onwards. - for i := first; i <= last; i++ { - s, err := OpenReadSegment(SegmentName(w.Dir(), i)) - require.NoError(t, err) - - sr := NewSegmentBufReader(s) - require.NoError(t, err) - r := NewReader(sr) - for r.Next() { - } - - // Close the segment so we don't break things on Windows. - s.Close() - - // No corruption in this segment. - if r.Err() == nil { - continue - } - require.NoError(t, w.Repair(r.Err())) - break - } - - sr, err := NewSegmentsReader(dir) - require.NoError(t, err) - defer sr.Close() - r := NewReader(sr) - - var result [][]byte - for r.Next() { - var b []byte - result = append(result, append(b, r.Record()...)) - } - require.NoError(t, r.Err()) - require.Len(t, result, test.intactRecs, "Wrong number of intact records") - - for i, r := range result { - require.True(t, bytes.Equal(records[i], r), "record %d diverges: want %x, got %x", i, records[i][:10], r[:10]) - } - - // Make sure there is a new 0 size Segment after the corrupted Segment. - _, last, err = Segments(w.Dir()) - require.NoError(t, err) - require.Equal(t, test.corrSgm+1, last) - fi, err := os.Stat(SegmentName(dir, last)) - require.NoError(t, err) - require.Equal(t, int64(0), fi.Size()) }) } } @@ -211,18 +219,18 @@ func TestWALRepair_ReadingError(t *testing.T) { // ensures that an error during reading that segment are correctly repaired before // moving to write more records to the WAL. func TestCorruptAndCarryOn(t *testing.T) { - dir := t.TempDir() - var ( - logger = promslog.NewNopLogger() - segmentSize = pageSize * 3 - recordSize = (pageSize / 3) - recordHeaderSize + dir = t.TempDir() + opts = SegmentOptions{ + Size: 3 * pageSize, + } + recordSize = (pageSize / 3) - recordHeaderSize ) // Produce a WAL with a two segments of 3 pages with 3 records each, // so when we truncate the file we're guaranteed to split a record. { - w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone) + w, err := New(nil, nil, dir, opts) require.NoError(t, err) for i := 0; i < 18; i++ { @@ -233,9 +241,7 @@ func TestCorruptAndCarryOn(t *testing.T) { err = w.Log(buf) require.NoError(t, err) } - - err = w.Close() - require.NoError(t, err) + require.NoError(t, w.Close()) } // Check all the segments are the correct size. @@ -250,7 +256,7 @@ func TestCorruptAndCarryOn(t *testing.T) { require.NoError(t, err) t.Log("segment", segment.index, "size", fi.Size()) - require.Equal(t, int64(segmentSize), fi.Size()) + require.Equal(t, int64(opts.Size), fi.Size()) err = f.Close() require.NoError(t, err) @@ -265,9 +271,9 @@ func TestCorruptAndCarryOn(t *testing.T) { fi, err := f.Stat() require.NoError(t, err) - require.Equal(t, int64(segmentSize), fi.Size()) + require.Equal(t, int64(opts.Size), fi.Size()) - err = f.Truncate(int64(segmentSize / 2)) + err = f.Truncate(int64(opts.Size / 2)) require.NoError(t, err) err = f.Close() @@ -293,7 +299,7 @@ func TestCorruptAndCarryOn(t *testing.T) { err = sr.Close() require.NoError(t, err) - w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone) + w, err := New(nil, nil, dir, opts) require.NoError(t, err) err = w.Repair(corruptionErr) @@ -335,8 +341,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // TestClose ensures that calling Close more than once doesn't panic and doesn't block. func TestClose(t *testing.T) { - dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize, CompressionNone) + w, err := New(nil, nil, t.TempDir(), SegmentOptions{}) require.NoError(t, err) require.NoError(t, w.Close()) require.Error(t, w.Close()) @@ -344,12 +349,14 @@ func TestClose(t *testing.T) { func TestSegmentMetric(t *testing.T) { var ( - segmentSize = pageSize - recordSize = (pageSize / 2) - recordHeaderSize + dir = t.TempDir() + opts = SegmentOptions{ + Size: pageSize, + } + recordSize = (pageSize / 2) - recordHeaderSize ) - dir := t.TempDir() - w, err := NewSize(nil, nil, dir, segmentSize, CompressionNone) + w, err := New(nil, nil, dir, opts) require.NoError(t, err) initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment) @@ -368,16 +375,19 @@ func TestSegmentMetric(t *testing.T) { } func TestCompression(t *testing.T) { - bootstrap := func(compressed CompressionType) string { + createTestData := func(compressed CompressionType) string { const ( - segmentSize = pageSize - recordSize = (pageSize / 2) - recordHeaderSize - records = 100 + recordSize = (pageSize / 2) - recordHeaderSize + records = 100 ) - dirPath := t.TempDir() + dir := t.TempDir() + opts := SegmentOptions{ + Size: pageSize, + Compression: compressed, + } - w, err := NewSize(nil, nil, dirPath, segmentSize, compressed) + w, err := New(nil, nil, dir, opts) require.NoError(t, err) buf := make([]byte, recordSize) @@ -385,35 +395,24 @@ func TestCompression(t *testing.T) { require.NoError(t, w.Log(buf)) } require.NoError(t, w.Close()) - - return dirPath + return dir } - tmpDirs := make([]string, 0, 3) - defer func() { - for _, dir := range tmpDirs { - require.NoError(t, os.RemoveAll(dir)) - } - }() - - dirUnCompressed := bootstrap(CompressionNone) - tmpDirs = append(tmpDirs, dirUnCompressed) - + uncompressedDir := createTestData(CompressionNone) for _, compressionType := range []CompressionType{CompressionSnappy, CompressionZstd} { - dirCompressed := bootstrap(compressionType) - tmpDirs = append(tmpDirs, dirCompressed) + dir := createTestData(compressionType) - uncompressedSize, err := fileutil.DirSize(dirUnCompressed) + uncompressedSize, err := fileutil.DirSize(uncompressedDir) require.NoError(t, err) - compressedSize, err := fileutil.DirSize(dirCompressed) + compressedSize, err := fileutil.DirSize(dir) require.NoError(t, err) - require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize) + require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), + "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize) } } func TestLogPartialWrite(t *testing.T) { - const segmentSize = pageSize * 2 record := []byte{1, 2, 3, 4, 5} tests := map[string]struct { @@ -441,9 +440,12 @@ func TestLogPartialWrite(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - dirPath := t.TempDir() + dir := t.TempDir() + opts := SegmentOptions{ + Size: pageSize * 2, + } - w, err := NewSize(nil, nil, dirPath, segmentSize, CompressionNone) + w, err := New(nil, nil, dir, opts) require.NoError(t, err) // Replace the underlying segment file with a mocked one that injects a failure. @@ -464,7 +466,7 @@ func TestLogPartialWrite(t *testing.T) { require.NoError(t, w.Close()) // Read it back. We expect no corruption. - s, err := OpenReadSegment(SegmentName(dirPath, 0)) + s, err := OpenReadSegmentByIndex(dir, 0) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() @@ -510,11 +512,14 @@ func (f *faultySegmentFile) Write(p []byte) (int, error) { } func BenchmarkWAL_LogBatched(b *testing.B) { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range SupportedCompressions() { b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) { dir := b.TempDir() + opts := SegmentOptions{ + Compression: compress, + } - w, err := New(nil, nil, dir, compress) + w, err := New(nil, nil, dir, opts) require.NoError(b, err) defer w.Close() @@ -540,11 +545,14 @@ func BenchmarkWAL_LogBatched(b *testing.B) { } func BenchmarkWAL_Log(b *testing.B) { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range SupportedCompressions() { b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) { dir := b.TempDir() + opts := SegmentOptions{ + Compression: compress, + } - w, err := New(nil, nil, dir, compress) + w, err := New(nil, nil, dir, opts) require.NoError(b, err) defer w.Close() @@ -567,8 +575,54 @@ func TestUnregisterMetrics(t *testing.T) { reg := prometheus.NewRegistry() for i := 0; i < 2; i++ { - wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), CompressionNone) + wl, err := New(nil, reg, t.TempDir(), SegmentOptions{}) require.NoError(t, err) require.NoError(t, wl.Close()) } } + +func TestSegmentsVersion_ReadCompatibility(t *testing.T) { + createTestData := func(v SegmentVersion) string { + const ( + recordSize = (pageSize / 2) - recordHeaderSize + records = 100 + ) + + dir := t.TempDir() + opts := SegmentOptions{ + Version: v, + } + + w, err := New(nil, nil, dir, opts) + require.NoError(t, err) + + buf := make([]byte, recordSize) + for i := 0; i < records; i++ { + require.NoError(t, w.Log(buf)) + } + require.NoError(t, w.Close()) + return dir + } + + t.Run("unknown version", func(t *testing.T) { + dir := createTestData(91221848) + + first, endAt, err := SegmentsRange(dir) + require.NoError(t, err) + fmt.Println(first, endAt) + }) + t.Run("v1 version", func(t *testing.T) { + dir := createTestData(SegmentFormatV1) + + first, endAt, err := SegmentsRange(dir) + require.NoError(t, err) + fmt.Println(first, endAt) + }) + t.Run("v2 version", func(t *testing.T) { + dir := createTestData(SegmentFormatV2) + + first, endAt, err := SegmentsRange(dir) + require.NoError(t, err) + fmt.Println(first, endAt) + }) +}