diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4559d51837..ca1e46967b 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -73,7 +73,7 @@ import ( "github.com/prometheus/prometheus/tracing" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/agent" - "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/documentcli" "github.com/prometheus/prometheus/util/logging" "github.com/prometheus/prometheus/util/notifications" @@ -287,6 +287,20 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { return nil } +// parseCompressionType parses the two compression-related configuration values and returns the CompressionType. If +// compression is enabled but the compressType is unrecognized, we default to Snappy compression. +func parseCompressionType(compress bool, compressType compression.Type) (compression.Type, error) { + if compress { + if compressType == compression.None { + return compression.None, errors.New("got --storage.tsdb.wal-compression=true, " + + "but --storage.*.wal-compression-type was set to None; " + + "set --storage.tsdb.wal-compression=false and remove --storage.*.wal-compression-type for no compression") + } + return compressType, nil + } + return compression.None, nil +} + func main() { if os.Getenv("DEBUG") != "" { runtime.SetBlockProfileRate(20) @@ -425,11 +439,15 @@ func main() { serverOnlyFlag(a, "storage.tsdb.allow-overlapping-compaction", "Allow compaction of overlapping blocks. If set to false, TSDB stops vertical compaction and leaves overlapping blocks there. The use case is to let another component handle the compaction of overlapping blocks."). Default("true").Hidden().BoolVar(&cfg.tsdb.EnableOverlappingCompaction) - serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL."). - Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression) + var ( + tsdbWALCompression bool + tsdbWALCompressionType string + ) + serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL. If false, the --storage.tsdb.wal-compression-type flag is ignored."). + Hidden().Default("true").BoolVar(&tsdbWALCompression) - serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL."). - Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd)) + serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL, used when --storage.tsdb.wal-compression is true."). + Hidden().Default(compression.Snappy).EnumVar(&tsdbWALCompressionType, compression.Types()...) serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental."). Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize) @@ -447,11 +465,15 @@ func main() { "Size at which to split WAL segment files. Example: 100MB"). Hidden().PlaceHolder("").BytesVar(&cfg.agent.WALSegmentSize) - agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL."). - Default("true").BoolVar(&cfg.agent.WALCompression) + var ( + agentWALCompression bool + agentWALCompressionType string + ) + agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL. If false, the --storage.agent.wal-compression-type flag is ignored."). + Default("true").BoolVar(&agentWALCompression) - agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL."). - Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.agent.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd)) + agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL, used when --storage.agent.wal-compression is true."). + Hidden().Default(compression.Snappy).EnumVar(&agentWALCompressionType, compression.Types()...) agentOnlyFlag(a, "storage.agent.wal-truncate-frequency", "The frequency at which to truncate the WAL and remove old data."). @@ -669,6 +691,18 @@ func main() { logger.Warn("The --storage.tsdb.delayed-compaction.max-percent should have a value between 1 and 100. Using default", "default", tsdb.DefaultCompactionDelayMaxPercent) cfg.tsdb.CompactionDelayMaxPercent = tsdb.DefaultCompactionDelayMaxPercent } + + cfg.tsdb.WALCompressionType, err = parseCompressionType(tsdbWALCompression, tsdbWALCompressionType) + if err != nil { + logger.Error(err.Error()) + os.Exit(2) + } + } else { + cfg.agent.WALCompressionType, err = parseCompressionType(agentWALCompression, agentWALCompressionType) + if err != nil { + logger.Error(err.Error()) + os.Exit(2) + } } noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{} @@ -1257,7 +1291,7 @@ func main() { "NoLockfile", cfg.tsdb.NoLockfile, "RetentionDuration", cfg.tsdb.RetentionDuration, "WALSegmentSize", cfg.tsdb.WALSegmentSize, - "WALCompression", cfg.tsdb.WALCompression, + "WALCompressionType", cfg.tsdb.WALCompressionType, ) startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) @@ -1308,7 +1342,7 @@ func main() { logger.Info("Agent WAL storage started") logger.Debug("Agent WAL storage options", "WALSegmentSize", cfg.agent.WALSegmentSize, - "WALCompression", cfg.agent.WALCompression, + "WALCompressionType", cfg.agent.WALCompressionType, "StripeSize", cfg.agent.StripeSize, "TruncateFrequency", cfg.agent.TruncateFrequency, "MinWALTime", cfg.agent.MinWALTime, @@ -1781,8 +1815,7 @@ type tsdbOptions struct { RetentionDuration model.Duration MaxBytes units.Base2Bytes NoLockfile bool - WALCompression bool - WALCompressionType string + WALCompressionType compression.Type HeadChunksWriteQueueSize int SamplesPerChunk int StripeSize int @@ -1806,7 +1839,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), MaxBytes: int64(opts.MaxBytes), NoLockfile: opts.NoLockfile, - WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), + WALCompression: opts.WALCompressionType, HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, SamplesPerChunk: opts.SamplesPerChunk, StripeSize: opts.StripeSize, @@ -1828,8 +1861,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { // as agent.Option fields are unit agnostic (time). type agentOptions struct { WALSegmentSize units.Base2Bytes - WALCompression bool - WALCompressionType string + WALCompressionType compression.Type StripeSize int TruncateFrequency model.Duration MinWALTime, MaxWALTime model.Duration @@ -1843,7 +1875,7 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option } return agent.Options{ WALSegmentSize: int(opts.WALSegmentSize), - WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), + WALCompression: opts.WALCompressionType, StripeSize: opts.StripeSize, TruncateFrequency: time.Duration(opts.TruncateFrequency), MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 4c91d1d6fe..e900ad32e2 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -23,11 +23,11 @@ import ( "os" "time" - "github.com/golang/snappy" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/fmtutil" ) @@ -116,7 +116,12 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s } // Encode the request body into snappy encoding. - compressed := snappy.Encode(nil, raw) + compressed, _, err := compression.Encode(compression.Snappy, raw, nil) + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return false + } + _, err = client.Store(context.Background(), compressed, 0) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 9b4ec8b736..0f58ff4b18 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -42,7 +42,7 @@ The Prometheus monitoring server | --storage.tsdb.no-lockfile | Do not create lockfile in data directory. Use with server mode only. | `false` | | --storage.tsdb.head-chunks-write-queue-size | Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental. Use with server mode only. | `0` | | --storage.agent.path | Base path for metrics storage. Use with agent mode only. | `data-agent/` | -| --storage.agent.wal-compression | Compress the agent WAL. Use with agent mode only. | `true` | +| --storage.agent.wal-compression | Compress the agent WAL. If false, the --storage.agent.wal-compression-type flag is ignored. Use with agent mode only. | `true` | | --storage.agent.retention.min-time | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | | | --storage.agent.retention.max-time | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | | | --storage.agent.no-lockfile | Do not create lockfile in data directory. Use with agent mode only. | `false` | diff --git a/storage/remote/client.go b/storage/remote/client.go index aadf15307c..b25d64529b 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -42,6 +42,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote/azuread" "github.com/prometheus/prometheus/storage/remote/googleiam" + "github.com/prometheus/prometheus/util/compression" ) const ( @@ -53,17 +54,6 @@ const ( appProtoContentType = "application/x-protobuf" ) -// Compression represents the encoding. Currently remote storage supports only -// one, but we experiment with more, thus leaving the compression scaffolding -// for now. -// NOTE(bwplotka): Keeping it public, as a non-stable help for importers to use. -type Compression string - -const ( - // SnappyBlockCompression represents https://github.com/google/snappy/blob/2c94e11145f0b7b184b831577c93e5a41c4c0346/format_description.txt - SnappyBlockCompression Compression = "snappy" -) - var ( // UserAgent represents Prometheus version to use for user agent header. UserAgent = version.PrometheusUserAgent() @@ -130,7 +120,7 @@ type Client struct { readQueriesDuration prometheus.ObserverVec writeProtoMsg config.RemoteWriteProtoMsg - writeCompression Compression // Not exposed by ClientConfig for now. + writeCompression compression.Type // Not exposed by ClientConfig for now. } // ClientConfig configures a client. @@ -232,7 +222,7 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { retryOnRateLimit: conf.RetryOnRateLimit, timeout: time.Duration(conf.Timeout), writeProtoMsg: writeProtoMsg, - writeCompression: SnappyBlockCompression, + writeCompression: compression.Snappy, }, nil } @@ -269,7 +259,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) (WriteRespo return WriteResponseStats{}, err } - httpReq.Header.Add("Content-Encoding", string(c.writeCompression)) + httpReq.Header.Add("Content-Encoding", c.writeCompression) httpReq.Header.Set("Content-Type", remoteWriteContentTypeHeaders[c.writeProtoMsg]) httpReq.Header.Set("User-Agent", UserAgent) if c.writeProtoMsg == config.RemoteWriteProtoMsgV1 { diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b274707bff..751007c5b2 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -24,7 +24,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" @@ -45,6 +44,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/compression" ) const ( @@ -421,7 +421,7 @@ type QueueManager struct { clientMtx sync.RWMutex storeClient WriteClient protoMsg config.RemoteWriteProtoMsg - enc Compression + compr compression.Type seriesMtx sync.Mutex // Covers seriesLabels, seriesMetadata, droppedSeries and builder. seriesLabels map[chunks.HeadSeriesRef]labels.Labels @@ -512,7 +512,7 @@ func NewQueueManager( highestRecvTimestamp: highestRecvTimestamp, protoMsg: protoMsg, - enc: SnappyBlockCompression, // Hardcoded for now, but scaffolding exists for likely future use. + compr: compression.Snappy, // Hardcoded for now, but scaffolding exists for likely future use. } walMetadata := false @@ -574,7 +574,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { // Build the WriteRequest with no samples (v1 flow). - req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, t.enc) + req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, t.compr) if err != nil { return err } @@ -1502,7 +1502,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { pBuf = proto.NewBuffer(nil) pBufRaw []byte - buf []byte + encBuf = compression.NewSyncEncodeBuffer() ) // TODO(@tpaschalis) Should we also raise the max if we have WAL metadata? if s.qm.sendExemplars { @@ -1534,7 +1534,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } defer stop() - sendBatch := func(batch []timeSeries, protoMsg config.RemoteWriteProtoMsg, enc Compression, timer bool) { + sendBatch := func(batch []timeSeries, protoMsg config.RemoteWriteProtoMsg, compr compression.Type, timer bool) { switch protoMsg { case config.RemoteWriteProtoMsgV1: nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) @@ -1543,11 +1543,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { s.qm.logger.Debug("runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) } - _ = s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf, enc) + _ = s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, encBuf, compr) case config.RemoteWriteProtoMsgV2: nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms - _ = s.sendV2Samples(ctx, pendingDataV2[:n], symbolTable.Symbols(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf, enc) + _ = s.sendV2Samples(ctx, pendingDataV2[:n], symbolTable.Symbols(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, encBuf, compr) symbolTable.Reset() } } @@ -1576,7 +1576,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { return } - sendBatch(batch, s.qm.protoMsg, s.qm.enc, false) + sendBatch(batch, s.qm.protoMsg, s.qm.compr, false) // TODO(bwplotka): Previously the return was between popular and send. // Consider this when DRY-ing https://github.com/prometheus/prometheus/issues/14409 queue.ReturnForReuse(batch) @@ -1587,7 +1587,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - sendBatch(batch, s.qm.protoMsg, s.qm.enc, true) + sendBatch(batch, s.qm.protoMsg, s.qm.compr, true) } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1636,18 +1636,18 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen return nPendingSamples, nPendingExemplars, nPendingHistograms } -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf compression.EncodeBuffer, compr compression.Type) error { begin := time.Now() - rs, err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc) + rs, err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compr) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, rs, time.Since(begin)) return err } // TODO(bwplotka): DRY this (have one logic for both v1 and v2). // See https://github.com/prometheus/prometheus/issues/14409 -func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error { +func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *[]byte, buf compression.EncodeBuffer, compr compression.Type) error { begin := time.Now() - rs, err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc) + rs, err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compr) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, rs, time.Since(begin)) return err } @@ -1689,9 +1689,9 @@ func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exempl } // sendSamplesWithBackoff to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) (WriteResponseStats, error) { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf compression.EncodeBuffer, compr compression.Type) (WriteResponseStats, error) { // Build the WriteRequest with no metadata. - req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, enc) + req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, nil, buf, compr) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will @@ -1700,7 +1700,6 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } reqSize := len(req) - *buf = req // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need // to track the total amount of accepted data across the various attempts. @@ -1720,20 +1719,20 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti lowest := s.qm.buildRequestLimitTimestamp.Load() if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) { // This will filter out old samples during retries. - req, _, lowest, err := buildWriteRequest( + req2, _, lowest, err := buildWriteRequest( s.qm.logger, samples, nil, pBuf, - buf, isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)), - enc, + buf, + compr, ) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { return err } - *buf = req + req = req2 } ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") @@ -1761,7 +1760,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.metadataTotal.Add(float64(metadataCount)) // Technically for v1, we will likely have empty response stats, but for // newer Receivers this might be not, so used it in a best effort. - rs, err := s.qm.client().Store(ctx, *buf, try) + rs, err := s.qm.client().Store(ctx, req, try) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) // TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error // so far we don't have those, so it's ok to potentially skew statistics. @@ -1803,9 +1802,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } // sendV2SamplesWithBackoff to the remote storage with backoff for recoverable errors. -func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) (WriteResponseStats, error) { +func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *[]byte, buf compression.EncodeBuffer, compr compression.Type) (WriteResponseStats, error) { // Build the WriteRequest with no metadata. - req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc) + req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, nil, buf, compr) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will @@ -1814,7 +1813,6 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 } reqSize := len(req) - *buf = req // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need // to track the total amount of accepted data across the various attempts. @@ -1834,20 +1832,20 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 lowest := s.qm.buildRequestLimitTimestamp.Load() if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) { // This will filter out old samples during retries. - req, _, lowest, err := buildV2WriteRequest( + req2, _, lowest, err := buildV2WriteRequest( s.qm.logger, samples, labels, pBuf, - buf, isV2TimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)), - enc, + buf, + compr, ) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { return err } - *buf = req + req = req2 } ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") @@ -1873,7 +1871,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) s.qm.metrics.metadataTotal.Add(float64(metadataCount)) - rs, err := s.qm.client().Store(ctx, *buf, try) + rs, err := s.qm.client().Store(ctx, req, try) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) // TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error // so far we don't have those, so it's ok to potentially skew statistics. @@ -2114,21 +2112,7 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms } -func compressPayload(tmpbuf *[]byte, inp []byte, enc Compression) (compressed []byte, _ error) { - switch enc { - case SnappyBlockCompression: - compressed = snappy.Encode(*tmpbuf, inp) - if n := snappy.MaxEncodedLen(len(inp)); n > len(*tmpbuf) { - // grow the buffer for the next time - *tmpbuf = make([]byte, n) - } - return compressed, nil - default: - return compressed, fmt.Errorf("unknown compression scheme [%v]", enc) - } -} - -func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) { +func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, filter func(prompb.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (compressed []byte, highest, lowest int64, _ error) { highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) @@ -2151,22 +2135,14 @@ func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, meta return nil, highest, lowest, err } - // snappy uses len() to see if it needs to allocate a new slice. Make the - // buffer as long as possible. - if buf != nil { - *buf = (*buf)[0:cap(*buf)] - } else { - buf = &[]byte{} - } - - compressed, err = compressPayload(buf, pBuf.Bytes(), enc) + compressed, _, err = compression.Encode(compr, pBuf.Bytes(), buf) if err != nil { return nil, highest, lowest, err } return compressed, highest, lowest, nil } -func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) { +func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labels []string, pBuf *[]byte, filter func(writev2.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (compressed []byte, highest, lowest int64, _ error) { highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter) if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { @@ -2188,15 +2164,7 @@ func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labe } *pBuf = data - // snappy uses len() to see if it needs to allocate a new slice. Make the - // buffer as long as possible. - if buf != nil { - *buf = (*buf)[0:cap(*buf)] - } else { - buf = &[]byte{} - } - - compressed, err = compressPayload(buf, data, enc) + compressed, _, err = compression.Encode(compr, *pBuf, buf) if err != nil { return nil, highest, lowest, err } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 7143059edf..785700f091 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -30,7 +30,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" "github.com/google/go-cmp/cmp" "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" @@ -39,6 +38,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "github.com/prometheus/prometheus/util/compression" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -963,7 +964,6 @@ type TestWriteClient struct { receivedMetadata map[string][]prompb.MetricMetadata writesReceived int mtx sync.Mutex - buf []byte protoMsg config.RemoteWriteProtoMsg injectedErrs []error currErr int @@ -1119,13 +1119,8 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResp if c.returnError != nil { return WriteResponseStats{}, c.returnError } - // nil buffers are ok for snappy, ignore cast error. - if c.buf != nil { - c.buf = c.buf[:cap(c.buf)] - } - reqBuf, err := snappy.Decode(c.buf, req) - c.buf = reqBuf + reqBuf, err := compression.Decode(compression.Snappy, req, nil) if err != nil { return WriteResponseStats{}, err } @@ -1858,7 +1853,7 @@ func createDummyTimeSeries(instances int) []timeSeries { func BenchmarkBuildWriteRequest(b *testing.B) { noopLogger := promslog.NewNopLogger() bench := func(b *testing.B, batch []timeSeries) { - buff := make([]byte, 0) + cEnc := compression.NewSyncEncodeBuffer() seriesBuff := make([]prompb.TimeSeries, len(batch)) for i := range seriesBuff { seriesBuff[i].Samples = []prompb.Sample{{}} @@ -1869,7 +1864,7 @@ func BenchmarkBuildWriteRequest(b *testing.B) { totalSize := 0 for i := 0; i < b.N; i++ { populateTimeSeries(batch, seriesBuff, true, true) - req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy") + req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, nil, cEnc, compression.Snappy) if err != nil { b.Fatal(err) } @@ -1899,7 +1894,7 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) { noopLogger := promslog.NewNopLogger() bench := func(b *testing.B, batch []timeSeries) { symbolTable := writev2.NewSymbolTable() - buff := make([]byte, 0) + cEnc := compression.NewSyncEncodeBuffer() seriesBuff := make([]writev2.TimeSeries, len(batch)) for i := range seriesBuff { seriesBuff[i].Samples = []writev2.Sample{{}} @@ -1910,7 +1905,7 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) { totalSize := 0 for i := 0; i < b.N; i++ { populateV2TimeSeries(&symbolTable, batch, seriesBuff, true, true) - req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, &buff, nil, "snappy") + req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, nil, cEnc, "snappy") if err != nil { b.Fatal(err) } diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 21e3693e50..74d1e9c63c 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -24,11 +24,12 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/util/compression" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -150,8 +151,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Don't break yolo 1.0 clients if not needed. This is similar to what we did // before 2.0: https://github.com/prometheus/prometheus/blob/d78253319daa62c8f28ed47e40bafcad2dd8b586/storage/remote/write_handler.go#L62 // We could give http.StatusUnsupportedMediaType, but let's assume snappy by default. - } else if enc != string(SnappyBlockCompression) { - err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, SnappyBlockCompression) + } else if strings.ToLower(enc) != compression.Snappy { + err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, compression.Snappy) h.logger.Error("Error decoding remote write request", "err", err) http.Error(w, err.Error(), http.StatusUnsupportedMediaType) } @@ -164,7 +165,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - decompressed, err := snappy.Decode(nil, body) + decompressed, err := compression.Decode(compression.Snappy, body, nil) if err != nil { // TODO(bwplotka): Add more context to responded error? h.logger.Error("Error decompressing remote write request", "err", err.Error()) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5bf6f5632e..492a1c82a0 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -31,6 +31,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/util/compression" + "github.com/prometheus/common/promslog" "github.com/prometheus/prometheus/config" @@ -60,7 +62,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { name: "correct PRW 1.0 headers", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV1], - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusNoContent, @@ -69,7 +71,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { name: "missing remote write version", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV1], - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, }, expectedCode: http.StatusNoContent, }, @@ -81,7 +83,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { { name: "missing content-type", reqHeaders: map[string]string{ - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusNoContent, @@ -98,7 +100,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { name: "wrong content-type", reqHeaders: map[string]string{ "Content-Type": "yolo", - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, @@ -107,7 +109,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { name: "wrong content-type2", reqHeaders: map[string]string{ "Content-Type": appProtoContentType + ";proto=yolo", - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, @@ -157,7 +159,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { name: "correct PRW 2.0 headers", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2], - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusNoContent, @@ -166,7 +168,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { name: "missing remote write version", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2], - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, }, expectedCode: http.StatusNoContent, // We don't check for now. }, @@ -178,7 +180,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { { name: "missing content-type", reqHeaders: map[string]string{ - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, // This only gives 415, because we explicitly only support 2.0. If we supported both @@ -199,7 +201,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { name: "wrong content-type", reqHeaders: map[string]string{ "Content-Type": "yolo", - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, @@ -208,7 +210,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { name: "wrong content-type2", reqHeaders: map[string]string{ "Content-Type": appProtoContentType + ";proto=yolo", - "Content-Encoding": string(SnappyBlockCompression), + "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, @@ -445,7 +447,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { require.NoError(t, err) req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) - req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) + req.Header.Set("Content-Encoding", compression.Snappy) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{ @@ -715,7 +717,7 @@ func TestCommitErr_V2Message(t *testing.T) { require.NoError(t, err) req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) - req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) + req.Header.Set("Content-Encoding", compression.Snappy) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{commitErr: errors.New("commit error")} diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index cf4a977288..3d444e805c 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -41,6 +41,7 @@ import ( "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/zeropool" ) @@ -66,7 +67,7 @@ type Options struct { WALSegmentSize int // WALCompression configures the compression type to use on records in the WAL. - WALCompression wlog.CompressionType + WALCompression compression.Type // StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance. StripeSize int @@ -90,7 +91,7 @@ type Options struct { func DefaultOptions() *Options { return &Options{ WALSegmentSize: wlog.DefaultSegmentSize, - WALCompression: wlog.CompressionNone, + WALCompression: compression.None, StripeSize: tsdb.DefaultStripeSize, TruncateFrequency: DefaultTruncateFrequency, MinWALTime: DefaultMinWALTime, @@ -337,7 +338,7 @@ func validateOptions(opts *Options) *Options { } if opts.WALCompression == "" { - opts.WALCompression = wlog.CompressionNone + opts.WALCompression = compression.None } // Revert StripeSize to DefaultStripeSize if StripeSize is either 0 or not a power of 2. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 6e3db15eb4..9385d4f7fc 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -42,7 +42,7 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/compression" ) func TestSplitByRange(t *testing.T) { @@ -1447,7 +1447,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { func TestHeadCompactionWithHistograms(t *testing.T) { for _, floatTest := range []bool{true, false} { t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) require.NoError(t, head.Init(0)) t.Cleanup(func() { require.NoError(t, head.Close()) @@ -1627,11 +1627,11 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { c.numBuckets, ), func(t *testing.T) { - oldHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + oldHead, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) t.Cleanup(func() { require.NoError(t, oldHead.Close()) }) - sparseHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + sparseHead, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) t.Cleanup(func() { require.NoError(t, sparseHead.Close()) }) diff --git a/tsdb/db.go b/tsdb/db.go index 9ab150c5b4..228168169f 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -46,6 +46,7 @@ import ( _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met. "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/compression" ) const ( @@ -80,7 +81,7 @@ func DefaultOptions() *Options { MaxBlockDuration: DefaultBlockDuration, NoLockfile: false, SamplesPerChunk: DefaultSamplesPerChunk, - WALCompression: wlog.CompressionNone, + WALCompression: compression.None, StripeSize: DefaultStripeSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, IsolationDisabled: defaultIsolationDisabled, @@ -124,7 +125,7 @@ type Options struct { NoLockfile bool // WALCompression configures the compression type to use on records in the WAL. - WALCompression wlog.CompressionType + WALCompression compression.Type // Maximum number of CPUs that can simultaneously processes WAL replay. // If it is <=0, then GOMAXPROCS is used. diff --git a/tsdb/db_test.go b/tsdb/db_test.go index ab2ece9e3b..3fdd5f752a 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -37,6 +37,8 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" @@ -45,17 +47,13 @@ import ( "go.uber.org/atomic" "go.uber.org/goleak" - "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage/remote" - - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" @@ -65,6 +63,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/annotations" + "github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/testutil" ) @@ -1962,7 +1961,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None) require.NoError(t, err) var enc record.Encoder @@ -2006,7 +2005,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 1000, 6000)) require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None) require.NoError(t, err) var enc record.Encoder @@ -2407,7 +2406,7 @@ func TestDBReadOnly(t *testing.T) { } // Add head to test DBReadOnly WAL reading capabilities. - w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), wlog.CompressionSnappy) + w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), compression.Snappy) require.NoError(t, err) h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir) require.NoError(t, h.Close()) @@ -3058,7 +3057,7 @@ func TestCompactHead(t *testing.T) { NoLockfile: true, MinBlockDuration: int64(time.Hour * 2 / time.Millisecond), MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond), - WALCompression: wlog.CompressionSnappy, + WALCompression: compression.Snappy, } db, err := Open(dbDir, promslog.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) @@ -4662,7 +4661,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { ctx := context.Background() numSamples := 10000 - hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false) + hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false) // Add some series so we can append metadata to them. app := hb.Appender(ctx) @@ -7019,7 +7018,7 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) { resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above. // Removing m-map markers in WBL by rewriting it. - newWbl, err := wlog.New(promslog.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), wlog.CompressionNone) + newWbl, err := wlog.New(promslog.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), compression.None) require.NoError(t, err) sr, err := wlog.NewSegmentsReader(originalWblDir) require.NoError(t, err) diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index dc682602b1..0ffc75abaf 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -29,7 +29,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/compression" ) func BenchmarkHeadStripeSeriesCreate(b *testing.B) { @@ -132,7 +132,7 @@ func BenchmarkHead_WalCommit(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - h, w := newTestHead(b, 10000, wlog.CompressionNone, false) + h, w := newTestHead(b, 10000, compression.None, false) b.Cleanup(func() { if h != nil { h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 065e5ff008..b6ae81391e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -38,6 +38,8 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" + "github.com/prometheus/prometheus/util/compression" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -68,11 +70,11 @@ func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions { return opts } -func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) { +func newTestHead(t testing.TB, chunkRange int64, compressWAL compression.Type, oooEnabled bool) (*Head, *wlog.WL) { return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled)) } -func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts *HeadOptions) (*Head, *wlog.WL) { +func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *HeadOptions) (*Head, *wlog.WL) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) require.NoError(t, err) @@ -92,7 +94,7 @@ func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - h, _ := newTestHead(b, 10000, wlog.CompressionNone, false) + h, _ := newTestHead(b, 10000, compression.None, false) b.Cleanup(func() { require.NoError(b, h.Close()) }) @@ -113,7 +115,7 @@ func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) { b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) { for _, samplesPerAppend := range []int64{1, 2, 5, 100} { b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) { - h, _ := newTestHead(b, 10000, wlog.CompressionNone, false) + h, _ := newTestHead(b, 10000, compression.None, false) b.Cleanup(func() { require.NoError(b, h.Close()) }) ts := int64(1000) @@ -294,11 +296,11 @@ func BenchmarkLoadWLs(b *testing.B) { func(b *testing.B) { dir := b.TempDir() - wal, err := wlog.New(nil, nil, dir, wlog.CompressionNone) + wal, err := wlog.New(nil, nil, dir, compression.None) require.NoError(b, err) var wbl *wlog.WL if c.oooSeriesPct != 0 { - wbl, err = wlog.New(nil, nil, dir, wlog.CompressionNone) + wbl, err = wlog.New(nil, nil, dir, compression.None) require.NoError(b, err) } @@ -459,11 +461,11 @@ func BenchmarkLoadRealWLs(b *testing.B) { dir := b.TempDir() require.NoError(b, fileutil.CopyDirs(srcDir, dir)) - wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) + wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compression.None) require.NoError(b, err) b.Cleanup(func() { wal.Close() }) - wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) + wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), compression.None) require.NoError(b, err) b.Cleanup(func() { wbl.Close() }) b.StartTimer() @@ -484,7 +486,7 @@ func BenchmarkLoadRealWLs(b *testing.B) { // While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the // returned results are correct. func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -674,7 +676,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { } func TestHead_ReadWAL(t *testing.T) { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { entries := []interface{}{ []record.RefSeries{ @@ -756,7 +758,7 @@ func TestHead_ReadWAL(t *testing.T) { } func TestHead_WALMultiRef(t *testing.T) { - head, w := newTestHead(t, 1000, wlog.CompressionNone, false) + head, w := newTestHead(t, 1000, compression.None, false) require.NoError(t, head.Init(0)) @@ -791,7 +793,7 @@ func TestHead_WALMultiRef(t *testing.T) { require.NotEqual(t, ref1, ref2, "Refs are the same") require.NoError(t, head.Close()) - w, err = wlog.New(nil, nil, w.Dir(), wlog.CompressionNone) + w, err = wlog.New(nil, nil, w.Dir(), compression.None) require.NoError(t, err) opts := DefaultHeadOptions() @@ -816,7 +818,7 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_ActiveAppenders(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer head.Close() require.NoError(t, head.Init(0)) @@ -849,7 +851,7 @@ func TestHead_ActiveAppenders(t *testing.T) { } func TestHead_UnknownWALRecord(t *testing.T) { - head, w := newTestHead(t, 1000, wlog.CompressionNone, false) + head, w := newTestHead(t, 1000, compression.None, false) w.Log([]byte{255, 42}) require.NoError(t, head.Init(0)) require.NoError(t, head.Close()) @@ -861,7 +863,7 @@ func BenchmarkHead_Truncate(b *testing.B) { const total = 1e6 prepare := func(b *testing.B, churn int) *Head { - h, _ := newTestHead(b, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(b, 1000, compression.None, false) b.Cleanup(func() { require.NoError(b, h.Close()) }) @@ -930,7 +932,7 @@ func BenchmarkHead_Truncate(b *testing.B) { } func TestHead_Truncate(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -1240,7 +1242,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { entries := []interface{}{ []record.RefSeries{ @@ -1320,7 +1322,7 @@ func TestHeadDeleteSimple(t *testing.T) { }, } - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { for _, c := range cases { head, w := newTestHead(t, 1000, compress, false) @@ -1402,7 +1404,7 @@ func TestHeadDeleteSimple(t *testing.T) { } func TestDeleteUntilCurMax(t *testing.T) { - hb, _ := newTestHead(t, 1000000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1455,7 +1457,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { numSamples := 10000 // Enough samples to cause a checkpoint. - hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false) + hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false) for i := 0; i < numSamples; i++ { app := hb.Appender(context.Background()) @@ -1547,7 +1549,7 @@ func TestDelete_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []chunks.Sample{} } - hb, _ := newTestHead(t, 100000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 100000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1915,7 +1917,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. const chunkRange = 1000 - h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) + h, _ := newTestHead(t, chunkRange, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -1974,7 +1976,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. const chunkRange = 1000 - h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) + h, _ := newTestHead(t, chunkRange, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2033,7 +2035,7 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2063,7 +2065,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2094,7 +2096,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { } func TestHead_LogRollback(t *testing.T) { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { h, w := newTestHead(t, 1000, compress, false) defer func() { @@ -2118,7 +2120,7 @@ func TestHead_LogRollback(t *testing.T) { } func TestHead_ReturnsSortedLabelValues(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2182,7 +2184,7 @@ func TestWalRepair_DecodingError(t *testing.T) { 5, }, } { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) { dir := t.TempDir() @@ -2256,9 +2258,9 @@ func TestWblRepair_DecodingError(t *testing.T) { // Fill the wbl and corrupt it. { - wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) + wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compression.None) require.NoError(t, err) - wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) + wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), compression.None) require.NoError(t, err) for i := 1; i <= totalRecs; i++ { @@ -2322,7 +2324,7 @@ func TestHeadReadWriterRepair(t *testing.T) { walDir := filepath.Join(dir, "wal") // Fill the chunk segments and corrupt it. { - w, err := wlog.New(nil, nil, walDir, wlog.CompressionNone) + w, err := wlog.New(nil, nil, walDir, compression.None) require.NoError(t, err) opts := DefaultHeadOptions() @@ -2391,7 +2393,7 @@ func TestHeadReadWriterRepair(t *testing.T) { } func TestNewWalSegmentOnTruncate(t *testing.T) { - h, wal := newTestHead(t, 1000, wlog.CompressionNone, false) + h, wal := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2421,7 +2423,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } func TestAddDuplicateLabelName(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2504,7 +2506,7 @@ func TestMemSeriesIsolation(t *testing.T) { } // Test isolation without restart of Head. - hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000, compression.None, false) i := addSamples(hb) testIsolation(hb, i) @@ -2566,11 +2568,11 @@ func TestMemSeriesIsolation(t *testing.T) { require.NoError(t, hb.Close()) // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. - hb, w := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, w := newTestHead(t, 1000, compression.None, false) i = addSamples(hb) require.NoError(t, hb.Close()) - wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, wlog.CompressionNone) + wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, compression.None) require.NoError(t, err) opts := DefaultHeadOptions() opts.ChunkRange = 1000 @@ -2619,7 +2621,7 @@ func TestIsolationRollback(t *testing.T) { } // Rollback after a failed append and test if the low watermark has progressed anyway. - hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2650,7 +2652,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2687,7 +2689,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2718,7 +2720,7 @@ func TestIsolationWithoutAdd(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2843,7 +2845,7 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario, opti } func testHeadSeriesChunkRace(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2878,7 +2880,7 @@ func testHeadSeriesChunkRace(t *testing.T) { } func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -2939,7 +2941,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { } func TestHeadLabelValuesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) ctx := context.Background() @@ -3015,7 +3017,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { } func TestHeadLabelNamesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -3085,7 +3087,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) { func TestHeadShardedPostings(t *testing.T) { headOpts := newTestHeadDefaultOptions(1000, false) headOpts.EnableSharding = true - head, _ := newTestHeadWithOptions(t, wlog.CompressionNone, headOpts) + head, _ := newTestHeadWithOptions(t, compression.None, headOpts) defer func() { require.NoError(t, head.Close()) }() @@ -3148,7 +3150,7 @@ func TestHeadShardedPostings(t *testing.T) { } func TestErrReuseAppender(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -3184,7 +3186,7 @@ func TestErrReuseAppender(t *testing.T) { func TestHeadMintAfterTruncation(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) + head, _ := newTestHead(t, chunkRange, compression.None, false) app := head.Appender(context.Background()) _, err := app.Append(0, labels.FromStrings("a", "b"), 100, 100) @@ -3218,7 +3220,7 @@ func TestHeadMintAfterTruncation(t *testing.T) { func TestHeadExemplars(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) + head, _ := newTestHead(t, chunkRange, compression.None, false) app := head.Appender(context.Background()) l := labels.FromStrings("trace_id", "123") @@ -3240,7 +3242,7 @@ func TestHeadExemplars(t *testing.T) { func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { chunkRange := int64(2000) - head, _ := newTestHead(b, chunkRange, wlog.CompressionNone, false) + head, _ := newTestHead(b, chunkRange, compression.None, false) b.Cleanup(func() { require.NoError(b, head.Close()) }) ctx := context.Background() @@ -3685,7 +3687,7 @@ func TestAppendHistogram(t *testing.T) { l := labels.FromStrings("a", "b") for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} { t.Run(strconv.Itoa(numHistograms), func(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3789,7 +3791,7 @@ func TestAppendHistogram(t *testing.T) { } func TestHistogramInWALAndMmapChunk(t *testing.T) { - head, _ := newTestHead(t, 3000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 3000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3940,7 +3942,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { // Restart head. require.NoError(t, head.Close()) startHead := func() { - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -3969,7 +3971,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } func TestChunkSnapshot(t *testing.T) { - head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) + head, _ := newTestHead(t, 120*4, compression.None, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -4062,7 +4064,7 @@ func TestChunkSnapshot(t *testing.T) { } openHeadAndCheckReplay := func() { - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -4256,7 +4258,7 @@ func TestChunkSnapshot(t *testing.T) { } func TestSnapshotError(t *testing.T) { - head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) + head, _ := newTestHead(t, 120*4, compression.None, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -4316,7 +4318,7 @@ func TestSnapshotError(t *testing.T) { require.NoError(t, f.Close()) // Create new Head which should replay this snapshot. - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) // Testing https://github.com/prometheus/prometheus/issues/9437 with the registry. head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil) @@ -4345,7 +4347,7 @@ func TestSnapshotError(t *testing.T) { opts := head.opts opts.SeriesCallback = c - w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -4367,7 +4369,7 @@ func TestSnapshotError(t *testing.T) { func TestHistogramMetrics(t *testing.T) { numHistograms := 10 - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4397,7 +4399,7 @@ func TestHistogramMetrics(t *testing.T) { require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram))) require.NoError(t, head.Close()) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -4419,7 +4421,7 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { t.Helper() l := labels.FromStrings("a", "b") numHistograms := 20 - head, _ := newTestHead(t, 100000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 100000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4571,7 +4573,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { for _, floatHisto := range []bool{true} { // FIXME t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4692,7 +4694,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, wlog.CompressionNone, true) + head, _ := newTestHead(t, 1000, compression.None, true) head.opts.OutOfOrderCapMax.Store(5) head.opts.EnableOOONativeHistograms.Store(true) @@ -5003,7 +5005,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { // Tests https://github.com/prometheus/prometheus/issues/9725. func TestChunkSnapshotReplayBug(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) // Write few series records and samples such that the series references are not in order in the WAL @@ -5070,7 +5072,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) { func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { dir := t.TempDir() - wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) // Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots. @@ -5115,9 +5117,9 @@ func TestWBLReplay(t *testing.T) { func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5163,9 +5165,9 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { // Restart head. require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) h, err = NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -5209,9 +5211,9 @@ func TestOOOMmapReplay(t *testing.T) { func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5261,9 +5263,9 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { // Restart head. require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) h, err = NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -5292,7 +5294,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { } func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -5336,7 +5338,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { require.NoError(t, h.Close()) - wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, wlog.CompressionNone) + wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, compression.None) require.NoError(t, err) h, err = NewHead(nil, nil, wal, nil, h.opts, nil) require.NoError(t, err) @@ -5371,7 +5373,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding { // Tests https://github.com/prometheus/prometheus/issues/10277. func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5404,7 +5406,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { addChunks() require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) require.NoError(t, err) mmapFilePath := filepath.Join(dir, "chunks_head", "000001") @@ -5430,7 +5432,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) { var err error openHead := func() { - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5513,9 +5515,9 @@ func TestOOOAppendWithNoSeries(t *testing.T) { func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5606,9 +5608,9 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) { func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5653,7 +5655,7 @@ func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) { func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -5718,7 +5720,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, head.Close()) require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -5729,7 +5731,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -5794,7 +5796,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, head.Close()) require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -5804,7 +5806,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { } func TestSnapshotAheadOfWALError(t *testing.T) { - head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) + head, _ := newTestHead(t, 120*4, compression.None, false) head.opts.EnableMemorySnapshotOnShutdown = true // Add a sample to fill WAL. app := head.Appender(context.Background()) @@ -5827,7 +5829,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) { // to keep using the same snapshot directory instead of a random one. require.NoError(t, os.RemoveAll(head.wal.Dir())) head.opts.EnableMemorySnapshotOnShutdown = false - w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) // Add a sample to fill WAL. @@ -5846,7 +5848,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) { // Create new Head which should detect the incorrect index and delete the snapshot. head.opts.EnableMemorySnapshotOnShutdown = true - w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) require.NoError(t, head.Init(math.MinInt64)) @@ -5865,7 +5867,7 @@ func BenchmarkCuttingHeadHistogramChunks(b *testing.B) { ) samples := histogram.GenerateBigTestHistograms(numSamples, numBuckets) - h, _ := newTestHead(b, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(b, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(b, h.Close()) }() @@ -5982,7 +5984,7 @@ func TestCuttingNewHeadChunks(t *testing.T) { } for testName, tc := range testCases { t.Run(testName, func(t *testing.T) { - h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6050,7 +6052,7 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) { numSamples := 1000 baseTS := int64(1695209650) - h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6117,7 +6119,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) { for testName, tc := range testcases { t.Run(testName, func(t *testing.T) { - h, w := newTestHead(t, 1000, wlog.CompressionNone, false) + h, w := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6154,7 +6156,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) { // `signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0xbb03d1` // panic, that we have seen in the wild once. func TestHeadCompactionWhileAppendAndCommitExemplar(t *testing.T) { - h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) app := h.Appender(context.Background()) lbls := labels.FromStrings("foo", "bar") ref, err := app.Append(0, lbls, 1, 1) @@ -6267,7 +6269,7 @@ func TestPostingsCardinalityStats(t *testing.T) { } func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) t.Cleanup(func() { head.Close() }) ls := labels.FromStrings(labels.MetricName, "test") @@ -6483,7 +6485,7 @@ func TestHeadAppender_AppendCT(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6521,7 +6523,7 @@ func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) { // was compactable using default values for min and max times, `Head.compactable()` // would return true which is incorrect. This test verifies that we short-circuit // the check when the head has not yet had any samples added. - head, _ := newTestHead(t, 1, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -6563,7 +6565,7 @@ func TestHeadAppendHistogramAndCommitConcurrency(t *testing.T) { } func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(storage.Appender, int) error) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, head.Close()) }() diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ad03fa4766..89631a6bde 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -128,11 +128,10 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch var err error dec := record.NewDecoder(syms) for r.Next() { - rec := r.Record() - switch dec.Type(rec) { + switch dec.Type(r.Record()) { case record.Series: series := h.wlReplaySeriesPool.Get()[:0] - series, err = dec.Series(rec, series) + series, err = dec.Series(r.Record(), series) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode series: %w", err), @@ -144,7 +143,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch decoded <- series case record.Samples: samples := h.wlReplaySamplesPool.Get()[:0] - samples, err = dec.Samples(rec, samples) + samples, err = dec.Samples(r.Record(), samples) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode samples: %w", err), @@ -156,7 +155,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch decoded <- samples case record.Tombstones: tstones := h.wlReplaytStonesPool.Get()[:0] - tstones, err = dec.Tombstones(rec, tstones) + tstones, err = dec.Tombstones(r.Record(), tstones) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode tombstones: %w", err), @@ -168,7 +167,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch decoded <- tstones case record.Exemplars: exemplars := h.wlReplayExemplarsPool.Get()[:0] - exemplars, err = dec.Exemplars(rec, exemplars) + exemplars, err = dec.Exemplars(r.Record(), exemplars) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode exemplars: %w", err), @@ -180,7 +179,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch decoded <- exemplars case record.HistogramSamples, record.CustomBucketsHistogramSamples: hists := h.wlReplayHistogramsPool.Get()[:0] - hists, err = dec.HistogramSamples(rec, hists) + hists, err = dec.HistogramSamples(r.Record(), hists) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode histograms: %w", err), @@ -192,7 +191,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch decoded <- hists case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: hists := h.wlReplayFloatHistogramsPool.Get()[:0] - hists, err = dec.FloatHistogramSamples(rec, hists) + hists, err = dec.FloatHistogramSamples(r.Record(), hists) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode float histograms: %w", err), @@ -204,7 +203,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch decoded <- hists case record.Metadata: meta := h.wlReplayMetadataPool.Get()[:0] - meta, err := dec.Metadata(rec, meta) + meta, err := dec.Metadata(r.Record(), meta) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode metadata: %w", err), diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index adbd3278ba..1645595a98 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -28,7 +28,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/compression" ) type chunkInterval struct { @@ -300,7 +300,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { for perm, intervals := range permutations { for _, headChunk := range []bool{false, true} { t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, true) + h, _ := newTestHead(t, 1000, compression.None, true) defer func() { require.NoError(t, h.Close()) }() @@ -388,7 +388,7 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { //nolint:revive // unexported-return. func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true) + head, _ := newTestHead(t, chunkRange, compression.None, true) head.opts.EnableOOONativeHistograms.Store(true) t.Cleanup(func() { require.NoError(t, head.Close()) }) diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index a052de9258..1c1b44da29 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -23,14 +23,14 @@ import ( "strings" "testing" - "github.com/stretchr/testify/require" - "github.com/prometheus/common/promslog" + "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/testutil" ) @@ -170,7 +170,7 @@ func TestCheckpoint(t *testing.T) { } } - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -385,7 +385,7 @@ func TestCheckpoint(t *testing.T) { func TestCheckpointNoTmpFolderAfterError(t *testing.T) { // Create a new wlog with invalid data. dir := t.TempDir() - w, err := NewSize(nil, nil, dir, 64*1024, CompressionNone) + w, err := NewSize(nil, nil, dir, 64*1024, compression.None) require.NoError(t, err) var enc record.Encoder require.NoError(t, w.Log(enc.Series([]record.RefSeries{ diff --git a/tsdb/wlog/live_reader.go b/tsdb/wlog/live_reader.go index a017d362d1..88898958af 100644 --- a/tsdb/wlog/live_reader.go +++ b/tsdb/wlog/live_reader.go @@ -22,9 +22,9 @@ import ( "io" "log/slog" - "github.com/golang/snappy" - "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/prometheus/util/compression" ) // LiveReaderMetrics holds all metrics exposed by the LiveReader. @@ -51,14 +51,11 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics { // NewLiveReader returns a new live reader. func NewLiveReader(logger *slog.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader { - // Calling zstd.NewReader with a nil io.Reader and no options cannot return an error. - zstdReader, _ := zstd.NewReader(nil) - lr := &LiveReader{ - logger: logger, - rdr: r, - zstdReader: zstdReader, - metrics: metrics, + logger: logger, + rdr: r, + decBuf: compression.NewSyncDecodeBuffer(), + metrics: metrics, // Until we understand how they come about, make readers permissive // to records spanning pages. @@ -72,12 +69,13 @@ func NewLiveReader(logger *slog.Logger, metrics *LiveReaderMetrics, r io.Reader) // that are still in the process of being written, and returns records as soon // as they can be read. type LiveReader struct { - logger *slog.Logger - rdr io.Reader - err error - rec []byte - compressBuf []byte - zstdReader *zstd.Decoder + logger *slog.Logger + rdr io.Reader + err error + rec []byte + + precomprBuf []byte + decBuf compression.DecodeBuffer hdr [recordHeaderSize]byte buf [pageSize]byte readIndex int // Index in buf to start at for next read. @@ -195,39 +193,28 @@ func (r *LiveReader) buildRecord() (bool, error) { rt := recTypeFromHeader(r.hdr[0]) if rt == recFirst || rt == recFull { - r.rec = r.rec[:0] - r.compressBuf = r.compressBuf[:0] + r.precomprBuf = r.precomprBuf[:0] } - isSnappyCompressed := r.hdr[0]&snappyMask == snappyMask - isZstdCompressed := r.hdr[0]&zstdMask == zstdMask - - if isSnappyCompressed || isZstdCompressed { - r.compressBuf = append(r.compressBuf, temp...) - } else { - r.rec = append(r.rec, temp...) + // TODO(bwplotka): Handle unknown compressions. + compr := compression.None + if r.hdr[0]&snappyMask == snappyMask { + compr = compression.Snappy + } else if r.hdr[0]&zstdMask == zstdMask { + compr = compression.Zstd } + r.precomprBuf = append(r.precomprBuf, temp...) + if err := validateRecord(rt, r.index); err != nil { r.index = 0 return false, err } if rt == recLast || rt == recFull { r.index = 0 - if isSnappyCompressed && len(r.compressBuf) > 0 { - // The snappy library uses `len` to calculate if we need a new buffer. - // In order to allocate as few buffers as possible make the length - // equal to the capacity. - r.rec = r.rec[:cap(r.rec)] - r.rec, err = snappy.Decode(r.rec, r.compressBuf) - if err != nil { - return false, err - } - } else if isZstdCompressed && len(r.compressBuf) > 0 { - r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0]) - if err != nil { - return false, err - } + r.rec, err = compression.Decode(compr, r.precomprBuf, r.decBuf) + if err != nil { + return false, err } return true, nil } diff --git a/tsdb/wlog/reader.go b/tsdb/wlog/reader.go index a744b0cc4b..c559d85b89 100644 --- a/tsdb/wlog/reader.go +++ b/tsdb/wlog/reader.go @@ -21,17 +21,17 @@ import ( "hash/crc32" "io" - "github.com/golang/snappy" - "github.com/klauspost/compress/zstd" + "github.com/prometheus/prometheus/util/compression" ) // Reader reads WAL records from an io.Reader. type Reader struct { - rdr io.Reader - err error - rec []byte - compressBuf []byte - zstdReader *zstd.Decoder + rdr io.Reader + err error + rec []byte + + precomprBuf []byte + decBuf compression.DecodeBuffer buf [pageSize]byte total int64 // Total bytes processed. curRecTyp recType // Used for checking that the last record is not torn. @@ -39,15 +39,13 @@ type Reader struct { // NewReader returns a new reader. func NewReader(r io.Reader) *Reader { - // Calling zstd.NewReader with a nil io.Reader and no options cannot return an error. - zstdReader, _ := zstd.NewReader(nil) - return &Reader{rdr: r, zstdReader: zstdReader} + return &Reader{rdr: r, decBuf: compression.NewSyncDecodeBuffer()} } // Next advances the reader to the next records and returns true if it exists. // It must not be called again after it returned false. func (r *Reader) Next() bool { - err := r.next() + err := r.nextNew() if err != nil && errors.Is(err, io.EOF) { // The last WAL segment record shouldn't be torn(should be full or last). // The last record would be torn after a crash just before @@ -61,14 +59,13 @@ func (r *Reader) Next() bool { return r.err == nil } -func (r *Reader) next() (err error) { +func (r *Reader) nextNew() (err error) { // We have to use r.buf since allocating byte arrays here fails escape // analysis and ends up on the heap, even though it seemingly should not. hdr := r.buf[:recordHeaderSize] buf := r.buf[recordHeaderSize:] - r.rec = r.rec[:0] - r.compressBuf = r.compressBuf[:0] + r.precomprBuf = r.precomprBuf[:0] i := 0 for { @@ -77,8 +74,13 @@ func (r *Reader) next() (err error) { } r.total++ r.curRecTyp = recTypeFromHeader(hdr[0]) - isSnappyCompressed := hdr[0]&snappyMask == snappyMask - isZstdCompressed := hdr[0]&zstdMask == zstdMask + + compr := compression.None + if hdr[0]&snappyMask == snappyMask { + compr = compression.Snappy + } else if hdr[0]&zstdMask == zstdMask { + compr = compression.Zstd + } // Gobble up zero bytes. if r.curRecTyp == recPageTerm { @@ -133,29 +135,14 @@ func (r *Reader) next() (err error) { if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { return fmt.Errorf("unexpected checksum %x, expected %x", c, crc) } - - if isSnappyCompressed || isZstdCompressed { - r.compressBuf = append(r.compressBuf, buf[:length]...) - } else { - r.rec = append(r.rec, buf[:length]...) - } - if err := validateRecord(r.curRecTyp, i); err != nil { return err } + + r.precomprBuf = append(r.precomprBuf, buf[:length]...) if r.curRecTyp == recLast || r.curRecTyp == recFull { - if isSnappyCompressed && len(r.compressBuf) > 0 { - // The snappy library uses `len` to calculate if we need a new buffer. - // In order to allocate as few buffers as possible make the length - // equal to the capacity. - r.rec = r.rec[:cap(r.rec)] - r.rec, err = snappy.Decode(r.rec, r.compressBuf) - return err - } else if isZstdCompressed && len(r.compressBuf) > 0 { - r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0]) - return err - } - return nil + r.rec, err = compression.Decode(compr, r.precomprBuf, r.decBuf) + return err } // Only increment i for non-zero records since we use it diff --git a/tsdb/wlog/reader_test.go b/tsdb/wlog/reader_test.go index 2ac63cbf15..0cbe882844 100644 --- a/tsdb/wlog/reader_test.go +++ b/tsdb/wlog/reader_test.go @@ -29,11 +29,11 @@ import ( "testing" "time" + "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" - "github.com/prometheus/common/promslog" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/util/compression" ) type reader interface { @@ -315,7 +315,7 @@ func allSegments(dir string) (io.ReadCloser, error) { func TestReaderFuzz(t *testing.T) { for name, fn := range readerConstructors { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) { dir := t.TempDir() @@ -354,7 +354,7 @@ func TestReaderFuzz(t *testing.T) { func TestReaderFuzz_Live(t *testing.T) { logger := promslog.NewNopLogger() - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -444,7 +444,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { logger := promslog.NewNopLogger() dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize, CompressionNone) + w, err := NewSize(nil, nil, dir, pageSize, compression.None) require.NoError(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -484,7 +484,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { logger := promslog.NewNopLogger() dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize*2, CompressionNone) + w, err := NewSize(nil, nil, dir, pageSize*2, compression.None) require.NoError(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -531,7 +531,7 @@ func TestReaderData(t *testing.T) { for name, fn := range readerConstructors { t.Run(name, func(t *testing.T) { - w, err := New(nil, nil, dir, CompressionSnappy) + w, err := New(nil, nil, dir, compression.Snappy) require.NoError(t, err) sr, err := allSegments(dir) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 786912704e..77bb823375 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/util/compression" ) var ( @@ -142,7 +143,7 @@ func TestTailSamples(t *testing.T) { const samplesCount = 250 const exemplarsCount = 25 const histogramsCount = 50 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { now := time.Now() @@ -290,7 +291,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") @@ -358,7 +359,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -446,7 +447,7 @@ func TestReadCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -519,7 +520,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { const seriesCount = 20 const samplesCount = 300 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -590,11 +591,11 @@ func TestCheckpointSeriesReset(t *testing.T) { const seriesCount = 20 const samplesCount = 350 testCases := []struct { - compress CompressionType + compress compression.Type segments int }{ - {compress: CompressionNone, segments: 14}, - {compress: CompressionSnappy, segments: 13}, + {compress: compression.None, segments: 14}, + {compress: compression.Snappy, segments: 13}, } for _, tc := range testCases { @@ -681,8 +682,8 @@ func TestRun_StartupTime(t *testing.T) { const seriesCount = 20 const samplesCount = 300 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(string(compress), func(t *testing.T) { + for _, compress := range compression.Types() { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") @@ -774,8 +775,8 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { const seriesCount = 10 const samplesCount = 50 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { - t.Run(string(compress), func(t *testing.T) { + for _, compress := range compression.Types() { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index 54c257d61a..406a111103 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -29,12 +29,12 @@ import ( "sync" "time" - "github.com/golang/snappy" - "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/promslog" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/util/compression" ) const ( @@ -169,26 +169,6 @@ func OpenReadSegment(fn string) (*Segment, error) { return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil } -type CompressionType string - -const ( - CompressionNone CompressionType = "none" - CompressionSnappy CompressionType = "snappy" - CompressionZstd CompressionType = "zstd" -) - -// ParseCompressionType parses the two compression-related configuration values and returns the CompressionType. If -// compression is enabled but the compressType is unrecognized, we default to Snappy compression. -func ParseCompressionType(compress bool, compressType string) CompressionType { - if compress { - if compressType == "zstd" { - return CompressionZstd - } - return CompressionSnappy - } - return CompressionNone -} - // WL is a write log that stores records in segment files. // It must be read from start to end once before logging new data. // If an error occurs during read, the repair procedure must be called @@ -210,9 +190,8 @@ type WL struct { stopc chan chan struct{} actorc chan func() closed bool // To allow calling Close() more than once without blocking. - compress CompressionType - compressBuf []byte - zstdWriter *zstd.Encoder + compress compression.Type + cEnc compression.EncodeBuffer WriteNotified WriteNotified @@ -220,14 +199,17 @@ type WL struct { } type wlMetrics struct { - fsyncDuration prometheus.Summary - pageFlushes prometheus.Counter - pageCompletions prometheus.Counter - truncateFail prometheus.Counter - truncateTotal prometheus.Counter - currentSegment prometheus.Gauge - writesFailed prometheus.Counter - walFileSize prometheus.GaugeFunc + fsyncDuration prometheus.Summary + pageFlushes prometheus.Counter + pageCompletions prometheus.Counter + truncateFail prometheus.Counter + truncateTotal prometheus.Counter + currentSegment prometheus.Gauge + writesFailed prometheus.Counter + walFileSize prometheus.GaugeFunc + recordPartWrites prometheus.Counter + recordPartBytes prometheus.Counter + recordBytesSaved *prometheus.CounterVec r prometheus.Registerer } @@ -244,78 +226,78 @@ func (w *wlMetrics) Unregister() { w.r.Unregister(w.currentSegment) w.r.Unregister(w.writesFailed) w.r.Unregister(w.walFileSize) + w.r.Unregister(w.recordPartWrites) + w.r.Unregister(w.recordPartBytes) + w.r.Unregister(w.recordBytesSaved) } func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics { - m := &wlMetrics{ + return &wlMetrics{ r: r, + fsyncDuration: promauto.With(r).NewSummary(prometheus.SummaryOpts{ + Name: "fsync_duration_seconds", + Help: "Duration of write log fsync.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }), + pageFlushes: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "page_flushes_total", + Help: "Total number of page flushes.", + }), + pageCompletions: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "completed_pages_total", + Help: "Total number of completed pages.", + }), + truncateFail: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "truncations_failed_total", + Help: "Total number of write log truncations that failed.", + }), + truncateTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "truncations_total", + Help: "Total number of write log truncations attempted.", + }), + currentSegment: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "segment_current", + Help: "Write log segment index that TSDB is currently writing to.", + }), + writesFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "writes_failed_total", + Help: "Total number of write log writes that failed.", + }), + walFileSize: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "storage_size_bytes", + Help: "Size of the write log directory.", + }, func() float64 { + val, err := w.Size() + if err != nil { + w.logger.Error("Failed to calculate size of \"wal\" dir", "err", err.Error()) + } + return float64(val) + }), + recordPartWrites: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "record_part_writes_total", + Help: "Total number of record parts written before flushing.", + }), + recordPartBytes: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "record_parts_bytes_written_total", + Help: "Total number of record part bytes written before flushing, including" + + " CRC and compression headers.", + }), + recordBytesSaved: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "record_bytes_saved_total", + Help: "Total number of bytes saved by the optional record compression." + + " Use this metric to learn about the effectiveness compression.", + }, []string{"compression"}), } - - m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "fsync_duration_seconds", - Help: "Duration of write log fsync.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }) - m.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "page_flushes_total", - Help: "Total number of page flushes.", - }) - m.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "completed_pages_total", - Help: "Total number of completed pages.", - }) - m.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "truncations_failed_total", - Help: "Total number of write log truncations that failed.", - }) - m.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "truncations_total", - Help: "Total number of write log truncations attempted.", - }) - m.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "segment_current", - Help: "Write log segment index that TSDB is currently writing to.", - }) - m.writesFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "writes_failed_total", - Help: "Total number of write log writes that failed.", - }) - m.walFileSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "storage_size_bytes", - Help: "Size of the write log directory.", - }, func() float64 { - val, err := w.Size() - if err != nil { - w.logger.Error("Failed to calculate size of \"wal\" dir", - "err", err.Error()) - } - return float64(val) - }) - - if r != nil { - r.MustRegister( - m.fsyncDuration, - m.pageFlushes, - m.pageCompletions, - m.truncateFail, - m.truncateTotal, - m.currentSegment, - m.writesFailed, - m.walFileSize, - ) - } - - return m } // New returns a new WAL over the given directory. -func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress CompressionType) (*WL, error) { +func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress compression.Type) (*WL, error) { return NewSize(logger, reg, dir, DefaultSegmentSize, compress) } // NewSize returns a new write log over the given directory. // New segments are created with the specified size. -func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress CompressionType) (*WL, error) { +func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress compression.Type) (*WL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -326,15 +308,6 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment logger = promslog.NewNopLogger() } - var zstdWriter *zstd.Encoder - if compress == CompressionZstd { - var err error - zstdWriter, err = zstd.NewWriter(nil) - if err != nil { - return nil, err - } - } - w := &WL{ dir: dir, logger: logger, @@ -343,7 +316,7 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment actorc: make(chan func(), 100), stopc: make(chan chan struct{}), compress: compress, - zstdWriter: zstdWriter, + cEnc: compression.NewSyncEncodeBuffer(), } prefix := "prometheus_tsdb_wal_" if filepath.Base(dir) == WblDirName { @@ -382,22 +355,16 @@ func Open(logger *slog.Logger, dir string) (*WL, error) { if logger == nil { logger = promslog.NewNopLogger() } - zstdWriter, err := zstd.NewWriter(nil) - if err != nil { - return nil, err - } w := &WL{ - dir: dir, - logger: logger, - zstdWriter: zstdWriter, + dir: dir, + logger: logger, } - return w, nil } // CompressionType returns if compression is enabled on this WAL. -func (w *WL) CompressionType() CompressionType { +func (w *WL) CompressionType() compression.Type { return w.compress } @@ -715,26 +682,22 @@ func (w *WL) log(rec []byte, final bool) error { } // Compress the record before calculating if a new segment is needed. - compressed := false - if w.compress == CompressionSnappy && len(rec) > 0 { - // If MaxEncodedLen is less than 0 the record is too large to be compressed. - if len(rec) > 0 && snappy.MaxEncodedLen(len(rec)) >= 0 { - // The snappy library uses `len` to calculate if we need a new buffer. - // In order to allocate as few buffers as possible make the length - // equal to the capacity. - w.compressBuf = w.compressBuf[:cap(w.compressBuf)] - w.compressBuf = snappy.Encode(w.compressBuf, rec) - if len(w.compressBuf) < len(rec) { - rec = w.compressBuf - compressed = true - } - } - } else if w.compress == CompressionZstd && len(rec) > 0 { - w.compressBuf = w.zstdWriter.EncodeAll(rec, w.compressBuf[:0]) - if len(w.compressBuf) < len(rec) { - rec = w.compressBuf - compressed = true + enc, compressed, err := compression.Encode(w.compress, rec, w.cEnc) + if err != nil { + return err + } + if compressed { + savedBytes := len(rec) - len(enc) + + // Even if the compression was applied, skip it, if there's no benefit + // in the WAL record size (we have a choice). For small records e.g. snappy + // compression can yield larger records than the uncompressed. + if savedBytes <= 0 { + enc = rec + compressed = false + savedBytes = 0 } + w.metrics.recordBytesSaved.WithLabelValues(w.compress).Add(float64(savedBytes)) } // If the record is too big to fit within the active page in the current @@ -743,7 +706,7 @@ func (w *WL) log(rec []byte, final bool) error { left := w.page.remaining() - recordHeaderSize // Free space in the active page. left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages in the active segment. - if len(rec) > left { + if len(enc) > left { if _, err := w.nextSegment(true); err != nil { return err } @@ -751,32 +714,36 @@ func (w *WL) log(rec []byte, final bool) error { // Populate as many pages as necessary to fit the record. // Be careful to always do one pass to ensure we write zero-length records. - for i := 0; i == 0 || len(rec) > 0; i++ { + for i := 0; i == 0 || len(enc) > 0; i++ { p := w.page // Find how much of the record we can fit into the page. var ( - l = min(len(rec), (pageSize-p.alloc)-recordHeaderSize) - part = rec[:l] + l = min(len(enc), (pageSize-p.alloc)-recordHeaderSize) + part = enc[:l] buf = p.buf[p.alloc:] typ recType ) switch { - case i == 0 && len(part) == len(rec): + case i == 0 && len(part) == len(enc): typ = recFull - case len(part) == len(rec): + case len(part) == len(enc): typ = recLast case i == 0: typ = recFirst default: typ = recMiddle } + if compressed { - if w.compress == CompressionSnappy { + switch w.compress { + case compression.Snappy: typ |= snappyMask - } else if w.compress == CompressionZstd { + case compression.Zstd: typ |= zstdMask + default: + return fmt.Errorf("unsupported compression type: %v", w.compress) } } @@ -788,6 +755,9 @@ func (w *WL) log(rec []byte, final bool) error { copy(buf[recordHeaderSize:], part) p.alloc += len(part) + recordHeaderSize + w.metrics.recordPartWrites.Inc() + w.metrics.recordPartBytes.Add(float64(len(part) + recordHeaderSize)) + if w.page.full() { if err := w.flushPage(true); err != nil { // TODO When the flushing fails at this point and the record has not been @@ -796,7 +766,7 @@ func (w *WL) log(rec []byte, final bool) error { return err } } - rec = rec[l:] + enc = enc[l:] } // If it's the final record of the batch and the page is not empty, flush it. diff --git a/tsdb/wlog/wlog_test.go b/tsdb/wlog/wlog_test.go index d195aaee2f..2e73542508 100644 --- a/tsdb/wlog/wlog_test.go +++ b/tsdb/wlog/wlog_test.go @@ -30,6 +30,7 @@ import ( "go.uber.org/goleak" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/util/compression" ) func TestMain(m *testing.M) { @@ -125,7 +126,7 @@ func TestWALRepair_ReadingError(t *testing.T) { // then corrupt a given record in a given segment. // As a result we want a repaired WAL with given intact records. segSize := 3 * pageSize - w, err := NewSize(nil, nil, dir, segSize, CompressionNone) + w, err := NewSize(nil, nil, dir, segSize, compression.None) require.NoError(t, err) var records [][]byte @@ -150,7 +151,7 @@ func TestWALRepair_ReadingError(t *testing.T) { require.NoError(t, f.Close()) - w, err = NewSize(nil, nil, dir, segSize, CompressionNone) + w, err = NewSize(nil, nil, dir, segSize, compression.None) require.NoError(t, err) defer w.Close() @@ -222,7 +223,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // Produce a WAL with a two segments of 3 pages with 3 records each, // so when we truncate the file we're guaranteed to split a record. { - w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone) + w, err := NewSize(logger, nil, dir, segmentSize, compression.None) require.NoError(t, err) for i := 0; i < 18; i++ { @@ -293,7 +294,7 @@ func TestCorruptAndCarryOn(t *testing.T) { err = sr.Close() require.NoError(t, err) - w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone) + w, err := NewSize(logger, nil, dir, segmentSize, compression.None) require.NoError(t, err) err = w.Repair(corruptionErr) @@ -336,7 +337,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // TestClose ensures that calling Close more than once doesn't panic and doesn't block. func TestClose(t *testing.T) { dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize, CompressionNone) + w, err := NewSize(nil, nil, dir, pageSize, compression.None) require.NoError(t, err) require.NoError(t, w.Close()) require.Error(t, w.Close()) @@ -349,7 +350,7 @@ func TestSegmentMetric(t *testing.T) { ) dir := t.TempDir() - w, err := NewSize(nil, nil, dir, segmentSize, CompressionNone) + w, err := NewSize(nil, nil, dir, segmentSize, compression.None) require.NoError(t, err) initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment) @@ -368,7 +369,7 @@ func TestSegmentMetric(t *testing.T) { } func TestCompression(t *testing.T) { - bootstrap := func(compressed CompressionType) string { + bootstrap := func(compressed compression.Type) string { const ( segmentSize = pageSize recordSize = (pageSize / 2) - recordHeaderSize @@ -396,10 +397,10 @@ func TestCompression(t *testing.T) { } }() - dirUnCompressed := bootstrap(CompressionNone) + dirUnCompressed := bootstrap(compression.None) tmpDirs = append(tmpDirs, dirUnCompressed) - for _, compressionType := range []CompressionType{CompressionSnappy, CompressionZstd} { + for _, compressionType := range []compression.Type{compression.Snappy, compression.Zstd} { dirCompressed := bootstrap(compressionType) tmpDirs = append(tmpDirs, dirCompressed) @@ -443,7 +444,7 @@ func TestLogPartialWrite(t *testing.T) { t.Run(testName, func(t *testing.T) { dirPath := t.TempDir() - w, err := NewSize(nil, nil, dirPath, segmentSize, CompressionNone) + w, err := NewSize(nil, nil, dirPath, segmentSize, compression.None) require.NoError(t, err) // Replace the underlying segment file with a mocked one that injects a failure. @@ -510,7 +511,7 @@ func (f *faultySegmentFile) Write(p []byte) (int, error) { } func BenchmarkWAL_LogBatched(b *testing.B) { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) { dir := b.TempDir() @@ -540,7 +541,7 @@ func BenchmarkWAL_LogBatched(b *testing.B) { } func BenchmarkWAL_Log(b *testing.B) { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range compression.Types() { b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) { dir := b.TempDir() @@ -567,7 +568,7 @@ func TestUnregisterMetrics(t *testing.T) { reg := prometheus.NewRegistry() for i := 0; i < 2; i++ { - wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), CompressionNone) + wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), compression.None) require.NoError(t, err) require.NoError(t, wl.Close()) } diff --git a/util/compression/buffers.go b/util/compression/buffers.go new file mode 100644 index 0000000000..f3a5799fed --- /dev/null +++ b/util/compression/buffers.go @@ -0,0 +1,148 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compression + +import ( + "sync" + + "github.com/klauspost/compress/zstd" + + "github.com/prometheus/prometheus/util/zeropool" +) + +type EncodeBuffer interface { + zstdEncBuf() *zstd.Encoder + get() []byte + put([]byte) +} + +type syncEBuffer struct { + onceZstd sync.Once + w *zstd.Encoder + buf []byte +} + +// NewSyncEncodeBuffer returns synchronous buffer that can only be used +// on one encoding goroutine at once. Notably, the encoded byte slice returned +// by Encode is valid only until the next Encode call. +func NewSyncEncodeBuffer() EncodeBuffer { + return &syncEBuffer{} +} + +func (b *syncEBuffer) zstdEncBuf() *zstd.Encoder { + b.onceZstd.Do(func() { + // Without params this never returns error. + b.w, _ = zstd.NewWriter(nil) + }) + return b.w +} + +func (b *syncEBuffer) get() []byte { + return b.buf +} + +func (b *syncEBuffer) put(buf []byte) { + b.buf = buf +} + +type concurrentEBuffer struct { + onceZstd sync.Once + w *zstd.Encoder + pool zeropool.Pool[[]byte] +} + +// NewConcurrentEncodeBuffer returns a buffer that can be used concurrently. +// NOTE: For Zstd compression a concurrency limit, equal to GOMAXPROCS is implied. +func NewConcurrentEncodeBuffer() EncodeBuffer { + return &concurrentEBuffer{} +} + +func (b *concurrentEBuffer) zstdEncBuf() *zstd.Encoder { + b.onceZstd.Do(func() { + // Without params this never returns error. + b.w, _ = zstd.NewWriter(nil) + }) + return b.w +} + +func (b *concurrentEBuffer) get() []byte { + return b.pool.Get() +} + +func (b *concurrentEBuffer) put(buf []byte) { + b.pool.Put(buf) +} + +type DecodeBuffer interface { + zstdDecBuf() *zstd.Decoder + get() []byte + put([]byte) +} + +type syncDBuffer struct { + onceZstd sync.Once + r *zstd.Decoder + buf []byte +} + +// NewSyncDecodeBuffer returns synchronous buffer that can only be used +// on one decoding goroutine at once. Notably, the decoded byte slice returned +// by Decode is valid only until the next Decode call. +func NewSyncDecodeBuffer() DecodeBuffer { + return &syncDBuffer{} +} + +func (b *syncDBuffer) zstdDecBuf() *zstd.Decoder { + b.onceZstd.Do(func() { + // Without params this never returns error. + b.r, _ = zstd.NewReader(nil) + }) + return b.r +} + +func (b *syncDBuffer) get() []byte { + return b.buf +} + +func (b *syncDBuffer) put(buf []byte) { + b.buf = buf +} + +type concurrentDBuffer struct { + onceZstd sync.Once + r *zstd.Decoder + pool zeropool.Pool[[]byte] +} + +// NewConcurrentDecodeBuffer returns a buffer that can be used concurrently. +// NOTE: For Zstd compression a concurrency limit, equal to GOMAXPROCS is implied. +func NewConcurrentDecodeBuffer() DecodeBuffer { + return &concurrentDBuffer{} +} + +func (b *concurrentDBuffer) zstdDecBuf() *zstd.Decoder { + b.onceZstd.Do(func() { + // Without params this never returns error. + b.r, _ = zstd.NewReader(nil) + }) + return b.r +} + +func (b *concurrentDBuffer) get() []byte { + return b.pool.Get() +} + +func (b *concurrentDBuffer) put(buf []byte) { + b.pool.Put(buf) +} diff --git a/util/compression/compression.go b/util/compression/compression.go new file mode 100644 index 0000000000..526093428c --- /dev/null +++ b/util/compression/compression.go @@ -0,0 +1,125 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compression + +import ( + "errors" + "fmt" + + "github.com/golang/snappy" +) + +// Type represents a valid compression type supported by this package. +type Type = string + +const ( + // None represents no compression case. + // None it's a default when Type is empty. + None Type = "none" + // Snappy represents snappy block format. + Snappy Type = "snappy" + // Zstd represents "speed" mode of Zstd (Zstandard https://facebook.github.io/zstd/). + // This is roughly equivalent to the default Zstandard mode (level 3). + Zstd Type = "zstd" +) + +func Types() []Type { return []Type{None, Snappy, Zstd} } + +// Encode returns the encoded form of src for the given compression type. It also +// returns the indicator if the compression was performed. Encode may skip +// compressing for None type, but also when src is too large e.g. for Snappy block format. +// +// The buf allows passing various buffer implementations that make encoding more +// efficient. See NewSyncEncodeBuffer and NewConcurrentEncodeBuffer for further +// details. For non-zstd compression types, it is valid to pass nil buf. +// +// Encode is concurrency-safe, however note the concurrency limits for the +// buffer of your choice. +func Encode(t Type, src []byte, buf EncodeBuffer) (ret []byte, compressed bool, err error) { + if len(src) == 0 || t == "" || t == None { + return src, false, nil + } + if t == Snappy { + // If MaxEncodedLen is less than 0 the record is too large to be compressed. + if snappy.MaxEncodedLen(len(src)) < 0 { + return src, false, nil + } + var b []byte + if buf != nil { + b = buf.get() + defer func() { + buf.put(ret) + }() + } + + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + b = b[:cap(b)] + return snappy.Encode(b, src), true, nil + } + if t == Zstd { + if buf == nil { + return nil, false, errors.New("zstd requested but EncodeBuffer was not provided") + } + b := buf.get() + defer func() { + buf.put(ret) + }() + + return buf.zstdEncBuf().EncodeAll(src, b[:0]), true, nil + } + return nil, false, fmt.Errorf("unsupported compression type: %s", t) +} + +// Decode returns the decoded form of src for the given compression type. +// +// The buf allows passing various buffer implementations that make decoding more +// efficient. See NewSyncDecodeBuffer and NewConcurrentDecodeBuffer for further +// details. For non-zstd compression types, it is valid to pass nil buf. +// +// Decode is concurrency-safe, however note the concurrency limits for the +// buffer of your choice. +func Decode(t Type, src []byte, buf DecodeBuffer) (ret []byte, err error) { + if len(src) == 0 || t == "" || t == None { + return src, nil + } + if t == Snappy { + var b []byte + if buf != nil { + b = buf.get() + defer func() { + // Save/put reusable return buffer. + buf.put(ret) + }() + } + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + b = b[:cap(b)] + return snappy.Decode(b, src) + } + if t == Zstd { + if buf == nil { + return nil, errors.New("zstd requested but DecodeBuffer was not provided") + } + b := buf.get() + defer func() { + // Save/put reusable return buffer. + buf.put(ret) + }() + return buf.zstdDecBuf().DecodeAll(src, b[:0]) + } + return nil, fmt.Errorf("unsupported compression type: %s", t) +} diff --git a/util/compression/compression_test.go b/util/compression/compression_test.go new file mode 100644 index 0000000000..8ef01ad5d1 --- /dev/null +++ b/util/compression/compression_test.go @@ -0,0 +1,200 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compression + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +const compressible = `ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +fsfsdfsfddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa2 +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa12 +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa1 +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa121 +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa324 +ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa145 +` + +func TestEncodeDecode(t *testing.T) { + for _, tcase := range []struct { + name string + + src string + types []Type + encBuf EncodeBuffer + decBuf DecodeBuffer + expectCompression bool + expectEncErr error + expectDecErr error + }{ + { + name: "empty src; no buffers", + types: Types(), + src: "", + expectCompression: false, + }, + { + name: "empty src; sync buffers", + types: Types(), + encBuf: NewSyncEncodeBuffer(), decBuf: NewSyncDecodeBuffer(), + src: "", + expectCompression: false, + }, + { + name: "empty src; concurrent buffers", + types: Types(), + encBuf: NewConcurrentEncodeBuffer(), decBuf: NewConcurrentDecodeBuffer(), + src: "", + expectCompression: false, + }, + { + name: "no buffers", + types: []Type{None}, + src: compressible, + expectCompression: false, + }, + { + name: "no buffers", + types: []Type{Snappy}, + src: compressible, + expectCompression: true, + }, + { + name: "no buffers", + types: []Type{Zstd}, + src: compressible, + expectEncErr: errors.New("zstd requested but EncodeBuffer was not provided"), + }, + { + name: "sync buffers", + types: []Type{None}, + encBuf: NewSyncEncodeBuffer(), decBuf: NewSyncDecodeBuffer(), + src: compressible, + expectCompression: false, + }, + { + name: "sync buffers", + types: Types()[1:], // All but none + encBuf: NewSyncEncodeBuffer(), decBuf: NewSyncDecodeBuffer(), + src: compressible, + expectCompression: true, + }, + { + name: "concurrent buffers", + types: []Type{None}, + encBuf: NewConcurrentEncodeBuffer(), decBuf: NewConcurrentDecodeBuffer(), + src: compressible, + expectCompression: false, + }, + { + name: "concurrent buffers", + types: Types()[1:], // All but none + encBuf: NewConcurrentEncodeBuffer(), decBuf: NewConcurrentDecodeBuffer(), + src: compressible, + expectCompression: true, + }, + } { + require.NotEmpty(t, tcase.types, "must specify at least one type") + for _, typ := range tcase.types { + t.Run(fmt.Sprintf("case=%v/type=%v", tcase.name, typ), func(t *testing.T) { + res, compressed, err := Encode(typ, []byte(tcase.src), tcase.encBuf) + if tcase.expectEncErr != nil { + require.ErrorContains(t, err, tcase.expectEncErr.Error()) + return + } + require.NoError(t, err) + require.Equal(t, tcase.expectCompression, compressed) + + if compressed { + require.Less(t, len(res), len(tcase.src)) + } + + // Decode back. + got, err := Decode(typ, res, tcase.decBuf) + if tcase.expectDecErr != nil { + require.ErrorContains(t, err, tcase.expectDecErr.Error()) + return + } + require.NoError(t, err) + require.Equal(t, tcase.src, string(got)) + }) + } + } +} + +/* + export bench=encode-v1 && go test ./util/compression/... \ + -run '^$' -bench '^BenchmarkEncode' \ + -benchtime 5s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt +*/ +func BenchmarkEncode(b *testing.B) { + for _, typ := range Types() { + b.Run(fmt.Sprintf("type=%v", typ), func(b *testing.B) { + var buf EncodeBuffer + compressible := []byte(compressible) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if buf == nil { + buf = NewSyncEncodeBuffer() + } + res, _, err := Encode(typ, compressible, buf) + require.NoError(b, err) + b.ReportMetric(float64(len(res)), "B") + } + }) + } +} + +/* + export bench=decode-v1 && go test ./util/compression/... \ + -run '^$' -bench '^BenchmarkDecode' \ + -benchtime 5s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt +*/ +func BenchmarkDecode(b *testing.B) { + for _, typ := range Types() { + b.Run(fmt.Sprintf("type=%v", typ), func(b *testing.B) { + var buf DecodeBuffer + res, _, err := Encode(typ, []byte(compressible), NewConcurrentEncodeBuffer()) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if buf == nil { + buf = NewSyncDecodeBuffer() + } + _, err := Decode(typ, res, buf) + require.NoError(b, err) + } + }) + } +}