This commit is contained in:
Bartlomiej Plotka 2025-03-05 13:12:48 +00:00 committed by GitHub
commit fd2b68bdb6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 920 additions and 506 deletions

View file

@ -73,7 +73,7 @@ import (
"github.com/prometheus/prometheus/tracing"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/agent"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/util/documentcli"
"github.com/prometheus/prometheus/util/logging"
"github.com/prometheus/prometheus/util/notifications"
@ -287,6 +287,20 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
return nil
}
// parseCompressionType parses the two compression-related configuration values and returns the CompressionType. If
// compression is enabled but the compressType is unrecognized, we default to Snappy compression.
func parseCompressionType(compress bool, compressType compression.Type) (compression.Type, error) {
if compress {
if compressType == compression.None {
return compression.None, errors.New("got --storage.tsdb.wal-compression=true, " +
"but --storage.*.wal-compression-type was set to None; " +
"set --storage.tsdb.wal-compression=false and remove --storage.*.wal-compression-type for no compression")
}
return compressType, nil
}
return compression.None, nil
}
func main() {
if os.Getenv("DEBUG") != "" {
runtime.SetBlockProfileRate(20)
@ -425,11 +439,15 @@ func main() {
serverOnlyFlag(a, "storage.tsdb.allow-overlapping-compaction", "Allow compaction of overlapping blocks. If set to false, TSDB stops vertical compaction and leaves overlapping blocks there. The use case is to let another component handle the compaction of overlapping blocks.").
Default("true").Hidden().BoolVar(&cfg.tsdb.EnableOverlappingCompaction)
serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL.").
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
var (
tsdbWALCompression bool
tsdbWALCompressionType string
)
serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL. If false, the --storage.tsdb.wal-compression-type flag is ignored.").
Hidden().Default("true").BoolVar(&tsdbWALCompression)
serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL.").
Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd))
serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL, used when --storage.tsdb.wal-compression is true.").
Hidden().Default(compression.Snappy).EnumVar(&tsdbWALCompressionType, compression.Types()...)
serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)
@ -447,11 +465,15 @@ func main() {
"Size at which to split WAL segment files. Example: 100MB").
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.agent.WALSegmentSize)
agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL.").
Default("true").BoolVar(&cfg.agent.WALCompression)
var (
agentWALCompression bool
agentWALCompressionType string
)
agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL. If false, the --storage.agent.wal-compression-type flag is ignored.").
Default("true").BoolVar(&agentWALCompression)
agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL.").
Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.agent.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd))
agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL, used when --storage.agent.wal-compression is true.").
Hidden().Default(compression.Snappy).EnumVar(&agentWALCompressionType, compression.Types()...)
agentOnlyFlag(a, "storage.agent.wal-truncate-frequency",
"The frequency at which to truncate the WAL and remove old data.").
@ -669,6 +691,18 @@ func main() {
logger.Warn("The --storage.tsdb.delayed-compaction.max-percent should have a value between 1 and 100. Using default", "default", tsdb.DefaultCompactionDelayMaxPercent)
cfg.tsdb.CompactionDelayMaxPercent = tsdb.DefaultCompactionDelayMaxPercent
}
cfg.tsdb.WALCompressionType, err = parseCompressionType(tsdbWALCompression, tsdbWALCompressionType)
if err != nil {
logger.Error(err.Error())
os.Exit(2)
}
} else {
cfg.agent.WALCompressionType, err = parseCompressionType(agentWALCompression, agentWALCompressionType)
if err != nil {
logger.Error(err.Error())
os.Exit(2)
}
}
noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}
@ -1257,7 +1291,7 @@ func main() {
"NoLockfile", cfg.tsdb.NoLockfile,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"WALSegmentSize", cfg.tsdb.WALSegmentSize,
"WALCompression", cfg.tsdb.WALCompression,
"WALCompressionType", cfg.tsdb.WALCompressionType,
)
startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
@ -1308,7 +1342,7 @@ func main() {
logger.Info("Agent WAL storage started")
logger.Debug("Agent WAL storage options",
"WALSegmentSize", cfg.agent.WALSegmentSize,
"WALCompression", cfg.agent.WALCompression,
"WALCompressionType", cfg.agent.WALCompressionType,
"StripeSize", cfg.agent.StripeSize,
"TruncateFrequency", cfg.agent.TruncateFrequency,
"MinWALTime", cfg.agent.MinWALTime,
@ -1781,8 +1815,7 @@ type tsdbOptions struct {
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
NoLockfile bool
WALCompression bool
WALCompressionType string
WALCompressionType compression.Type
HeadChunksWriteQueueSize int
SamplesPerChunk int
StripeSize int
@ -1806,7 +1839,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
WALCompression: opts.WALCompressionType,
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
SamplesPerChunk: opts.SamplesPerChunk,
StripeSize: opts.StripeSize,
@ -1828,8 +1861,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
// as agent.Option fields are unit agnostic (time).
type agentOptions struct {
WALSegmentSize units.Base2Bytes
WALCompression bool
WALCompressionType string
WALCompressionType compression.Type
StripeSize int
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
@ -1843,7 +1875,7 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option
}
return agent.Options{
WALSegmentSize: int(opts.WALSegmentSize),
WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
WALCompression: opts.WALCompressionType,
StripeSize: opts.StripeSize,
TruncateFrequency: time.Duration(opts.TruncateFrequency),
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),

View file

@ -23,11 +23,11 @@ import (
"os"
"time"
"github.com/golang/snappy"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/util/fmtutil"
)
@ -116,7 +116,12 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s
}
// Encode the request body into snappy encoding.
compressed := snappy.Encode(nil, raw)
compressed, _, err := compression.Encode(compression.Snappy, raw, nil)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
}
_, err = client.Store(context.Background(), compressed, 0)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)

View file

@ -42,7 +42,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--storage.tsdb.no-lockfile</code> | Do not create lockfile in data directory. Use with server mode only. | `false` |
| <code class="text-nowrap">--storage.tsdb.head-chunks-write-queue-size</code> | Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental. Use with server mode only. | `0` |
| <code class="text-nowrap">--storage.agent.path</code> | Base path for metrics storage. Use with agent mode only. | `data-agent/` |
| <code class="text-nowrap">--storage.agent.wal-compression</code> | Compress the agent WAL. Use with agent mode only. | `true` |
| <code class="text-nowrap">--storage.agent.wal-compression</code> | Compress the agent WAL. If false, the --storage.agent.wal-compression-type flag is ignored. Use with agent mode only. | `true` |
| <code class="text-nowrap">--storage.agent.retention.min-time</code> | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | |
| <code class="text-nowrap">--storage.agent.retention.max-time</code> | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | |
| <code class="text-nowrap">--storage.agent.no-lockfile</code> | Do not create lockfile in data directory. Use with agent mode only. | `false` |

View file

@ -42,6 +42,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote/azuread"
"github.com/prometheus/prometheus/storage/remote/googleiam"
"github.com/prometheus/prometheus/util/compression"
)
const (
@ -53,17 +54,6 @@ const (
appProtoContentType = "application/x-protobuf"
)
// Compression represents the encoding. Currently remote storage supports only
// one, but we experiment with more, thus leaving the compression scaffolding
// for now.
// NOTE(bwplotka): Keeping it public, as a non-stable help for importers to use.
type Compression string
const (
// SnappyBlockCompression represents https://github.com/google/snappy/blob/2c94e11145f0b7b184b831577c93e5a41c4c0346/format_description.txt
SnappyBlockCompression Compression = "snappy"
)
var (
// UserAgent represents Prometheus version to use for user agent header.
UserAgent = version.PrometheusUserAgent()
@ -130,7 +120,7 @@ type Client struct {
readQueriesDuration prometheus.ObserverVec
writeProtoMsg config.RemoteWriteProtoMsg
writeCompression Compression // Not exposed by ClientConfig for now.
writeCompression compression.Type // Not exposed by ClientConfig for now.
}
// ClientConfig configures a client.
@ -232,7 +222,7 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
retryOnRateLimit: conf.RetryOnRateLimit,
timeout: time.Duration(conf.Timeout),
writeProtoMsg: writeProtoMsg,
writeCompression: SnappyBlockCompression,
writeCompression: compression.Snappy,
}, nil
}
@ -269,7 +259,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) (WriteRespo
return WriteResponseStats{}, err
}
httpReq.Header.Add("Content-Encoding", string(c.writeCompression))
httpReq.Header.Add("Content-Encoding", c.writeCompression)
httpReq.Header.Set("Content-Type", remoteWriteContentTypeHeaders[c.writeProtoMsg])
httpReq.Header.Set("User-Agent", UserAgent)
if c.writeProtoMsg == config.RemoteWriteProtoMsgV1 {

View file

@ -24,7 +24,6 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
@ -45,6 +44,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/compression"
)
const (
@ -421,7 +421,7 @@ type QueueManager struct {
clientMtx sync.RWMutex
storeClient WriteClient
protoMsg config.RemoteWriteProtoMsg
enc Compression
compr compression.Type
seriesMtx sync.Mutex // Covers seriesLabels, seriesMetadata, droppedSeries and builder.
seriesLabels map[chunks.HeadSeriesRef]labels.Labels
@ -512,7 +512,7 @@ func NewQueueManager(
highestRecvTimestamp: highestRecvTimestamp,
protoMsg: protoMsg,
enc: SnappyBlockCompression, // Hardcoded for now, but scaffolding exists for likely future use.
compr: compression.Snappy, // Hardcoded for now, but scaffolding exists for likely future use.
}
walMetadata := false
@ -574,7 +574,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples (v1 flow).
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, t.enc)
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, t.compr)
if err != nil {
return err
}
@ -1502,7 +1502,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pBuf = proto.NewBuffer(nil)
pBufRaw []byte
buf []byte
encBuf = compression.NewSyncEncodeBuffer()
)
// TODO(@tpaschalis) Should we also raise the max if we have WAL metadata?
if s.qm.sendExemplars {
@ -1534,7 +1534,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
defer stop()
sendBatch := func(batch []timeSeries, protoMsg config.RemoteWriteProtoMsg, enc Compression, timer bool) {
sendBatch := func(batch []timeSeries, protoMsg config.RemoteWriteProtoMsg, compr compression.Type, timer bool) {
switch protoMsg {
case config.RemoteWriteProtoMsgV1:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
@ -1543,11 +1543,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
s.qm.logger.Debug("runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
}
_ = s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf, enc)
_ = s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, encBuf, compr)
case config.RemoteWriteProtoMsgV2:
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
_ = s.sendV2Samples(ctx, pendingDataV2[:n], symbolTable.Symbols(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf, enc)
_ = s.sendV2Samples(ctx, pendingDataV2[:n], symbolTable.Symbols(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, encBuf, compr)
symbolTable.Reset()
}
}
@ -1576,7 +1576,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
return
}
sendBatch(batch, s.qm.protoMsg, s.qm.enc, false)
sendBatch(batch, s.qm.protoMsg, s.qm.compr, false)
// TODO(bwplotka): Previously the return was between popular and send.
// Consider this when DRY-ing https://github.com/prometheus/prometheus/issues/14409
queue.ReturnForReuse(batch)
@ -1587,7 +1587,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C:
batch := queue.Batch()
if len(batch) > 0 {
sendBatch(batch, s.qm.protoMsg, s.qm.enc, true)
sendBatch(batch, s.qm.protoMsg, s.qm.compr, true)
}
queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1636,18 +1636,18 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error {
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf compression.EncodeBuffer, compr compression.Type) error {
begin := time.Now()
rs, err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc)
rs, err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compr)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, rs, time.Since(begin))
return err
}
// TODO(bwplotka): DRY this (have one logic for both v1 and v2).
// See https://github.com/prometheus/prometheus/issues/14409
func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error {
func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *[]byte, buf compression.EncodeBuffer, compr compression.Type) error {
begin := time.Now()
rs, err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc)
rs, err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compr)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, rs, time.Since(begin))
return err
}
@ -1689,9 +1689,9 @@ func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exempl
}
// sendSamplesWithBackoff to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) (WriteResponseStats, error) {
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf compression.EncodeBuffer, compr compression.Type) (WriteResponseStats, error) {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, enc)
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, nil, buf, compr)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
@ -1700,7 +1700,6 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
}
reqSize := len(req)
*buf = req
// Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
// to track the total amount of accepted data across the various attempts.
@ -1720,20 +1719,20 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
lowest := s.qm.buildRequestLimitTimestamp.Load()
if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out old samples during retries.
req, _, lowest, err := buildWriteRequest(
req2, _, lowest, err := buildWriteRequest(
s.qm.logger,
samples,
nil,
pBuf,
buf,
isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
enc,
buf,
compr,
)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
return err
}
*buf = req
req = req2
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
@ -1761,7 +1760,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
// Technically for v1, we will likely have empty response stats, but for
// newer Receivers this might be not, so used it in a best effort.
rs, err := s.qm.client().Store(ctx, *buf, try)
rs, err := s.qm.client().Store(ctx, req, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
// TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error
// so far we don't have those, so it's ok to potentially skew statistics.
@ -1803,9 +1802,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
}
// sendV2SamplesWithBackoff to the remote storage with backoff for recoverable errors.
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) (WriteResponseStats, error) {
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *[]byte, buf compression.EncodeBuffer, compr compression.Type) (WriteResponseStats, error) {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc)
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, nil, buf, compr)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
@ -1814,7 +1813,6 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
}
reqSize := len(req)
*buf = req
// Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
// to track the total amount of accepted data across the various attempts.
@ -1834,20 +1832,20 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
lowest := s.qm.buildRequestLimitTimestamp.Load()
if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out old samples during retries.
req, _, lowest, err := buildV2WriteRequest(
req2, _, lowest, err := buildV2WriteRequest(
s.qm.logger,
samples,
labels,
pBuf,
buf,
isV2TimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
enc,
buf,
compr,
)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
return err
}
*buf = req
req = req2
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
@ -1873,7 +1871,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
rs, err := s.qm.client().Store(ctx, *buf, try)
rs, err := s.qm.client().Store(ctx, req, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
// TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error
// so far we don't have those, so it's ok to potentially skew statistics.
@ -2114,21 +2112,7 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
}
func compressPayload(tmpbuf *[]byte, inp []byte, enc Compression) (compressed []byte, _ error) {
switch enc {
case SnappyBlockCompression:
compressed = snappy.Encode(*tmpbuf, inp)
if n := snappy.MaxEncodedLen(len(inp)); n > len(*tmpbuf) {
// grow the buffer for the next time
*tmpbuf = make([]byte, n)
}
return compressed, nil
default:
return compressed, fmt.Errorf("unknown compression scheme [%v]", enc)
}
}
func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) {
func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, filter func(prompb.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (compressed []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
@ -2151,22 +2135,14 @@ func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, meta
return nil, highest, lowest, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed, err = compressPayload(buf, pBuf.Bytes(), enc)
compressed, _, err = compression.Encode(compr, pBuf.Bytes(), buf)
if err != nil {
return nil, highest, lowest, err
}
return compressed, highest, lowest, nil
}
func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) {
func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labels []string, pBuf *[]byte, filter func(writev2.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (compressed []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
@ -2188,15 +2164,7 @@ func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labe
}
*pBuf = data
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed, err = compressPayload(buf, data, enc)
compressed, _, err = compression.Encode(compr, *pBuf, buf)
if err != nil {
return nil, highest, lowest, err
}

View file

@ -30,7 +30,6 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
@ -39,6 +38,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@ -963,7 +964,6 @@ type TestWriteClient struct {
receivedMetadata map[string][]prompb.MetricMetadata
writesReceived int
mtx sync.Mutex
buf []byte
protoMsg config.RemoteWriteProtoMsg
injectedErrs []error
currErr int
@ -1119,13 +1119,8 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResp
if c.returnError != nil {
return WriteResponseStats{}, c.returnError
}
// nil buffers are ok for snappy, ignore cast error.
if c.buf != nil {
c.buf = c.buf[:cap(c.buf)]
}
reqBuf, err := snappy.Decode(c.buf, req)
c.buf = reqBuf
reqBuf, err := compression.Decode(compression.Snappy, req, nil)
if err != nil {
return WriteResponseStats{}, err
}
@ -1858,7 +1853,7 @@ func createDummyTimeSeries(instances int) []timeSeries {
func BenchmarkBuildWriteRequest(b *testing.B) {
noopLogger := promslog.NewNopLogger()
bench := func(b *testing.B, batch []timeSeries) {
buff := make([]byte, 0)
cEnc := compression.NewSyncEncodeBuffer()
seriesBuff := make([]prompb.TimeSeries, len(batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
@ -1869,7 +1864,7 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
totalSize := 0
for i := 0; i < b.N; i++ {
populateTimeSeries(batch, seriesBuff, true, true)
req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy")
req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, nil, cEnc, compression.Snappy)
if err != nil {
b.Fatal(err)
}
@ -1899,7 +1894,7 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) {
noopLogger := promslog.NewNopLogger()
bench := func(b *testing.B, batch []timeSeries) {
symbolTable := writev2.NewSymbolTable()
buff := make([]byte, 0)
cEnc := compression.NewSyncEncodeBuffer()
seriesBuff := make([]writev2.TimeSeries, len(batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []writev2.Sample{{}}
@ -1910,7 +1905,7 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) {
totalSize := 0
for i := 0; i < b.N; i++ {
populateV2TimeSeries(&symbolTable, batch, seriesBuff, true, true)
req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, &buff, nil, "snappy")
req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, nil, cEnc, "snappy")
if err != nil {
b.Fatal(err)
}

View file

@ -24,11 +24,12 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
@ -150,8 +151,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Don't break yolo 1.0 clients if not needed. This is similar to what we did
// before 2.0: https://github.com/prometheus/prometheus/blob/d78253319daa62c8f28ed47e40bafcad2dd8b586/storage/remote/write_handler.go#L62
// We could give http.StatusUnsupportedMediaType, but let's assume snappy by default.
} else if enc != string(SnappyBlockCompression) {
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, SnappyBlockCompression)
} else if strings.ToLower(enc) != compression.Snappy {
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, compression.Snappy)
h.logger.Error("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
}
@ -164,7 +165,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
decompressed, err := snappy.Decode(nil, body)
decompressed, err := compression.Decode(compression.Snappy, body, nil)
if err != nil {
// TODO(bwplotka): Add more context to responded error?
h.logger.Error("Error decompressing remote write request", "err", err.Error())

View file

@ -31,6 +31,8 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/config"
@ -60,7 +62,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
name: "correct PRW 1.0 headers",
reqHeaders: map[string]string{
"Content-Type": remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV1],
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue,
},
expectedCode: http.StatusNoContent,
@ -69,7 +71,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
name: "missing remote write version",
reqHeaders: map[string]string{
"Content-Type": remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV1],
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
},
expectedCode: http.StatusNoContent,
},
@ -81,7 +83,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
{
name: "missing content-type",
reqHeaders: map[string]string{
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue,
},
expectedCode: http.StatusNoContent,
@ -98,7 +100,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
name: "wrong content-type",
reqHeaders: map[string]string{
"Content-Type": "yolo",
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue,
},
expectedCode: http.StatusUnsupportedMediaType,
@ -107,7 +109,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
name: "wrong content-type2",
reqHeaders: map[string]string{
"Content-Type": appProtoContentType + ";proto=yolo",
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue,
},
expectedCode: http.StatusUnsupportedMediaType,
@ -157,7 +159,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
name: "correct PRW 2.0 headers",
reqHeaders: map[string]string{
"Content-Type": remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2],
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue,
},
expectedCode: http.StatusNoContent,
@ -166,7 +168,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
name: "missing remote write version",
reqHeaders: map[string]string{
"Content-Type": remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2],
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
},
expectedCode: http.StatusNoContent, // We don't check for now.
},
@ -178,7 +180,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
{
name: "missing content-type",
reqHeaders: map[string]string{
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue,
},
// This only gives 415, because we explicitly only support 2.0. If we supported both
@ -199,7 +201,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
name: "wrong content-type",
reqHeaders: map[string]string{
"Content-Type": "yolo",
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue,
},
expectedCode: http.StatusUnsupportedMediaType,
@ -208,7 +210,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
name: "wrong content-type2",
reqHeaders: map[string]string{
"Content-Type": appProtoContentType + ";proto=yolo",
"Content-Encoding": string(SnappyBlockCompression),
"Content-Encoding": compression.Snappy,
RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue,
},
expectedCode: http.StatusUnsupportedMediaType,
@ -445,7 +447,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
require.NoError(t, err)
req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2])
req.Header.Set("Content-Encoding", string(SnappyBlockCompression))
req.Header.Set("Content-Encoding", compression.Snappy)
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
appendable := &mockAppendable{
@ -715,7 +717,7 @@ func TestCommitErr_V2Message(t *testing.T) {
require.NoError(t, err)
req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2])
req.Header.Set("Content-Encoding", string(SnappyBlockCompression))
req.Header.Set("Content-Encoding", compression.Snappy)
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
appendable := &mockAppendable{commitErr: errors.New("commit error")}

View file

@ -41,6 +41,7 @@ import (
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/util/zeropool"
)
@ -66,7 +67,7 @@ type Options struct {
WALSegmentSize int
// WALCompression configures the compression type to use on records in the WAL.
WALCompression wlog.CompressionType
WALCompression compression.Type
// StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance.
StripeSize int
@ -90,7 +91,7 @@ type Options struct {
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
WALCompression: wlog.CompressionNone,
WALCompression: compression.None,
StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
@ -337,7 +338,7 @@ func validateOptions(opts *Options) *Options {
}
if opts.WALCompression == "" {
opts.WALCompression = wlog.CompressionNone
opts.WALCompression = compression.None
}
// Revert StripeSize to DefaultStripeSize if StripeSize is either 0 or not a power of 2.

View file

@ -42,7 +42,7 @@ import (
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/compression"
)
func TestSplitByRange(t *testing.T) {
@ -1447,7 +1447,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
func TestHeadCompactionWithHistograms(t *testing.T) {
for _, floatTest := range []bool{true, false} {
t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
require.NoError(t, head.Init(0))
t.Cleanup(func() {
require.NoError(t, head.Close())
@ -1627,11 +1627,11 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
c.numBuckets,
),
func(t *testing.T) {
oldHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
oldHead, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
t.Cleanup(func() {
require.NoError(t, oldHead.Close())
})
sparseHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
sparseHead, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
t.Cleanup(func() {
require.NoError(t, sparseHead.Close())
})

View file

@ -46,6 +46,7 @@ import (
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met.
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/compression"
)
const (
@ -80,7 +81,7 @@ func DefaultOptions() *Options {
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
SamplesPerChunk: DefaultSamplesPerChunk,
WALCompression: wlog.CompressionNone,
WALCompression: compression.None,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
@ -124,7 +125,7 @@ type Options struct {
NoLockfile bool
// WALCompression configures the compression type to use on records in the WAL.
WALCompression wlog.CompressionType
WALCompression compression.Type
// Maximum number of CPUs that can simultaneously processes WAL replay.
// If it is <=0, then GOMAXPROCS is used.

View file

@ -37,6 +37,8 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
@ -45,17 +47,13 @@ import (
"go.uber.org/atomic"
"go.uber.org/goleak"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
@ -65,6 +63,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/util/testutil"
)
@ -1962,7 +1961,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
dir := t.TempDir()
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone)
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None)
require.NoError(t, err)
var enc record.Encoder
@ -2006,7 +2005,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone)
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None)
require.NoError(t, err)
var enc record.Encoder
@ -2407,7 +2406,7 @@ func TestDBReadOnly(t *testing.T) {
}
// Add head to test DBReadOnly WAL reading capabilities.
w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), wlog.CompressionSnappy)
w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), compression.Snappy)
require.NoError(t, err)
h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir)
require.NoError(t, h.Close())
@ -3058,7 +3057,7 @@ func TestCompactHead(t *testing.T) {
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: wlog.CompressionSnappy,
WALCompression: compression.Snappy,
}
db, err := Open(dbDir, promslog.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
@ -4662,7 +4661,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
ctx := context.Background()
numSamples := 10000
hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false)
hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false)
// Add some series so we can append metadata to them.
app := hb.Appender(ctx)
@ -7019,7 +7018,7 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) {
resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above.
// Removing m-map markers in WBL by rewriting it.
newWbl, err := wlog.New(promslog.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), wlog.CompressionNone)
newWbl, err := wlog.New(promslog.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), compression.None)
require.NoError(t, err)
sr, err := wlog.NewSegmentsReader(originalWblDir)
require.NoError(t, err)

View file

@ -29,7 +29,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/compression"
)
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
@ -132,7 +132,7 @@ func BenchmarkHead_WalCommit(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
h, w := newTestHead(b, 10000, wlog.CompressionNone, false)
h, w := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() {
if h != nil {
h.Close()

View file

@ -38,6 +38,8 @@ import (
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
@ -68,11 +70,11 @@ func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions {
return opts
}
func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) {
func newTestHead(t testing.TB, chunkRange int64, compressWAL compression.Type, oooEnabled bool) (*Head, *wlog.WL) {
return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled))
}
func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts *HeadOptions) (*Head, *wlog.WL) {
func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *HeadOptions) (*Head, *wlog.WL) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
require.NoError(t, err)
@ -92,7 +94,7 @@ func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts
func BenchmarkCreateSeries(b *testing.B) {
series := genSeries(b.N, 10, 0, 0)
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
h, _ := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() {
require.NoError(b, h.Close())
})
@ -113,7 +115,7 @@ func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) {
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
for _, samplesPerAppend := range []int64{1, 2, 5, 100} {
b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) {
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
h, _ := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() { require.NoError(b, h.Close()) })
ts := int64(1000)
@ -294,11 +296,11 @@ func BenchmarkLoadWLs(b *testing.B) {
func(b *testing.B) {
dir := b.TempDir()
wal, err := wlog.New(nil, nil, dir, wlog.CompressionNone)
wal, err := wlog.New(nil, nil, dir, compression.None)
require.NoError(b, err)
var wbl *wlog.WL
if c.oooSeriesPct != 0 {
wbl, err = wlog.New(nil, nil, dir, wlog.CompressionNone)
wbl, err = wlog.New(nil, nil, dir, compression.None)
require.NoError(b, err)
}
@ -459,11 +461,11 @@ func BenchmarkLoadRealWLs(b *testing.B) {
dir := b.TempDir()
require.NoError(b, fileutil.CopyDirs(srcDir, dir))
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compression.None)
require.NoError(b, err)
b.Cleanup(func() { wal.Close() })
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), compression.None)
require.NoError(b, err)
b.Cleanup(func() { wbl.Close() })
b.StartTimer()
@ -484,7 +486,7 @@ func BenchmarkLoadRealWLs(b *testing.B) {
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
// returned results are correct.
func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -674,7 +676,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
}
func TestHead_ReadWAL(t *testing.T) {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
entries := []interface{}{
[]record.RefSeries{
@ -756,7 +758,7 @@ func TestHead_ReadWAL(t *testing.T) {
}
func TestHead_WALMultiRef(t *testing.T) {
head, w := newTestHead(t, 1000, wlog.CompressionNone, false)
head, w := newTestHead(t, 1000, compression.None, false)
require.NoError(t, head.Init(0))
@ -791,7 +793,7 @@ func TestHead_WALMultiRef(t *testing.T) {
require.NotEqual(t, ref1, ref2, "Refs are the same")
require.NoError(t, head.Close())
w, err = wlog.New(nil, nil, w.Dir(), wlog.CompressionNone)
w, err = wlog.New(nil, nil, w.Dir(), compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -816,7 +818,7 @@ func TestHead_WALMultiRef(t *testing.T) {
}
func TestHead_ActiveAppenders(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer head.Close()
require.NoError(t, head.Init(0))
@ -849,7 +851,7 @@ func TestHead_ActiveAppenders(t *testing.T) {
}
func TestHead_UnknownWALRecord(t *testing.T) {
head, w := newTestHead(t, 1000, wlog.CompressionNone, false)
head, w := newTestHead(t, 1000, compression.None, false)
w.Log([]byte{255, 42})
require.NoError(t, head.Init(0))
require.NoError(t, head.Close())
@ -861,7 +863,7 @@ func BenchmarkHead_Truncate(b *testing.B) {
const total = 1e6
prepare := func(b *testing.B, churn int) *Head {
h, _ := newTestHead(b, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(b, 1000, compression.None, false)
b.Cleanup(func() {
require.NoError(b, h.Close())
})
@ -930,7 +932,7 @@ func BenchmarkHead_Truncate(b *testing.B) {
}
func TestHead_Truncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -1240,7 +1242,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
}
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
entries := []interface{}{
[]record.RefSeries{
@ -1320,7 +1322,7 @@ func TestHeadDeleteSimple(t *testing.T) {
},
}
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
for _, c := range cases {
head, w := newTestHead(t, 1000, compress, false)
@ -1402,7 +1404,7 @@ func TestHeadDeleteSimple(t *testing.T) {
}
func TestDeleteUntilCurMax(t *testing.T) {
hb, _ := newTestHead(t, 1000000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -1455,7 +1457,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
numSamples := 10000
// Enough samples to cause a checkpoint.
hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false)
hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false)
for i := 0; i < numSamples; i++ {
app := hb.Appender(context.Background())
@ -1547,7 +1549,7 @@ func TestDelete_e2e(t *testing.T) {
seriesMap[labels.New(l...).String()] = []chunks.Sample{}
}
hb, _ := newTestHead(t, 100000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 100000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -1915,7 +1917,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
func TestGCChunkAccess(t *testing.T) {
// Put a chunk, select it. GC it and then access it.
const chunkRange = 1000
h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
h, _ := newTestHead(t, chunkRange, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -1974,7 +1976,7 @@ func TestGCChunkAccess(t *testing.T) {
func TestGCSeriesAccess(t *testing.T) {
// Put a series, select it. GC it and then access it.
const chunkRange = 1000
h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
h, _ := newTestHead(t, chunkRange, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2033,7 +2035,7 @@ func TestGCSeriesAccess(t *testing.T) {
}
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2063,7 +2065,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
}
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2094,7 +2096,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
}
func TestHead_LogRollback(t *testing.T) {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
h, w := newTestHead(t, 1000, compress, false)
defer func() {
@ -2118,7 +2120,7 @@ func TestHead_LogRollback(t *testing.T) {
}
func TestHead_ReturnsSortedLabelValues(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2182,7 +2184,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
5,
},
} {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) {
dir := t.TempDir()
@ -2256,9 +2258,9 @@ func TestWblRepair_DecodingError(t *testing.T) {
// Fill the wbl and corrupt it.
{
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compression.None)
require.NoError(t, err)
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), compression.None)
require.NoError(t, err)
for i := 1; i <= totalRecs; i++ {
@ -2322,7 +2324,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
walDir := filepath.Join(dir, "wal")
// Fill the chunk segments and corrupt it.
{
w, err := wlog.New(nil, nil, walDir, wlog.CompressionNone)
w, err := wlog.New(nil, nil, walDir, compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -2391,7 +2393,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
}
func TestNewWalSegmentOnTruncate(t *testing.T) {
h, wal := newTestHead(t, 1000, wlog.CompressionNone, false)
h, wal := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2421,7 +2423,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
}
func TestAddDuplicateLabelName(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2504,7 +2506,7 @@ func TestMemSeriesIsolation(t *testing.T) {
}
// Test isolation without restart of Head.
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000, compression.None, false)
i := addSamples(hb)
testIsolation(hb, i)
@ -2566,11 +2568,11 @@ func TestMemSeriesIsolation(t *testing.T) {
require.NoError(t, hb.Close())
// Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay.
hb, w := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, w := newTestHead(t, 1000, compression.None, false)
i = addSamples(hb)
require.NoError(t, hb.Close())
wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, wlog.CompressionNone)
wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
@ -2619,7 +2621,7 @@ func TestIsolationRollback(t *testing.T) {
}
// Rollback after a failed append and test if the low watermark has progressed anyway.
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -2650,7 +2652,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled")
}
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -2687,7 +2689,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled")
}
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2718,7 +2720,7 @@ func TestIsolationWithoutAdd(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled")
}
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -2843,7 +2845,7 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario, opti
}
func testHeadSeriesChunkRace(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2878,7 +2880,7 @@ func testHeadSeriesChunkRace(t *testing.T) {
}
func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -2939,7 +2941,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
}
func TestHeadLabelValuesWithMatchers(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() { require.NoError(t, head.Close()) })
ctx := context.Background()
@ -3015,7 +3017,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) {
}
func TestHeadLabelNamesWithMatchers(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -3085,7 +3087,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) {
func TestHeadShardedPostings(t *testing.T) {
headOpts := newTestHeadDefaultOptions(1000, false)
headOpts.EnableSharding = true
head, _ := newTestHeadWithOptions(t, wlog.CompressionNone, headOpts)
head, _ := newTestHeadWithOptions(t, compression.None, headOpts)
defer func() {
require.NoError(t, head.Close())
}()
@ -3148,7 +3150,7 @@ func TestHeadShardedPostings(t *testing.T) {
}
func TestErrReuseAppender(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -3184,7 +3186,7 @@ func TestErrReuseAppender(t *testing.T) {
func TestHeadMintAfterTruncation(t *testing.T) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
head, _ := newTestHead(t, chunkRange, compression.None, false)
app := head.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings("a", "b"), 100, 100)
@ -3218,7 +3220,7 @@ func TestHeadMintAfterTruncation(t *testing.T) {
func TestHeadExemplars(t *testing.T) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
head, _ := newTestHead(t, chunkRange, compression.None, false)
app := head.Appender(context.Background())
l := labels.FromStrings("trace_id", "123")
@ -3240,7 +3242,7 @@ func TestHeadExemplars(t *testing.T) {
func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
chunkRange := int64(2000)
head, _ := newTestHead(b, chunkRange, wlog.CompressionNone, false)
head, _ := newTestHead(b, chunkRange, compression.None, false)
b.Cleanup(func() { require.NoError(b, head.Close()) })
ctx := context.Background()
@ -3685,7 +3687,7 @@ func TestAppendHistogram(t *testing.T) {
l := labels.FromStrings("a", "b")
for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} {
t.Run(strconv.Itoa(numHistograms), func(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -3789,7 +3791,7 @@ func TestAppendHistogram(t *testing.T) {
}
func TestHistogramInWALAndMmapChunk(t *testing.T) {
head, _ := newTestHead(t, 3000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 3000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -3940,7 +3942,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
// Restart head.
require.NoError(t, head.Close())
startHead := func() {
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -3969,7 +3971,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
}
func TestChunkSnapshot(t *testing.T) {
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
head, _ := newTestHead(t, 120*4, compression.None, false)
defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close())
@ -4062,7 +4064,7 @@ func TestChunkSnapshot(t *testing.T) {
}
openHeadAndCheckReplay := func() {
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -4256,7 +4258,7 @@ func TestChunkSnapshot(t *testing.T) {
}
func TestSnapshotError(t *testing.T) {
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
head, _ := newTestHead(t, 120*4, compression.None, false)
defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close())
@ -4316,7 +4318,7 @@ func TestSnapshotError(t *testing.T) {
require.NoError(t, f.Close())
// Create new Head which should replay this snapshot.
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
// Testing https://github.com/prometheus/prometheus/issues/9437 with the registry.
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
@ -4345,7 +4347,7 @@ func TestSnapshotError(t *testing.T) {
opts := head.opts
opts.SeriesCallback = c
w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -4367,7 +4369,7 @@ func TestSnapshotError(t *testing.T) {
func TestHistogramMetrics(t *testing.T) {
numHistograms := 10
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -4397,7 +4399,7 @@ func TestHistogramMetrics(t *testing.T) {
require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram)))
require.NoError(t, head.Close())
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -4419,7 +4421,7 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
t.Helper()
l := labels.FromStrings("a", "b")
numHistograms := 20
head, _ := newTestHead(t, 100000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 100000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -4571,7 +4573,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
for _, floatHisto := range []bool{true} { // FIXME
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -4692,7 +4694,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
head, _ := newTestHead(t, 1000, compression.None, true)
head.opts.OutOfOrderCapMax.Store(5)
head.opts.EnableOOONativeHistograms.Store(true)
@ -5003,7 +5005,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
// Tests https://github.com/prometheus/prometheus/issues/9725.
func TestChunkSnapshotReplayBug(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
// Write few series records and samples such that the series references are not in order in the WAL
@ -5070,7 +5072,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
dir := t.TempDir()
wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
@ -5115,9 +5117,9 @@ func TestWBLReplay(t *testing.T) {
func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5163,9 +5165,9 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
// Restart head.
require.NoError(t, h.Close())
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -5209,9 +5211,9 @@ func TestOOOMmapReplay(t *testing.T) {
func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5261,9 +5263,9 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
// Restart head.
require.NoError(t, h.Close())
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -5292,7 +5294,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
}
func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -5336,7 +5338,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
require.NoError(t, h.Close())
wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, wlog.CompressionNone)
wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, compression.None)
require.NoError(t, err)
h, err = NewHead(nil, nil, wal, nil, h.opts, nil)
require.NoError(t, err)
@ -5371,7 +5373,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding {
// Tests https://github.com/prometheus/prometheus/issues/10277.
func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5404,7 +5406,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
addChunks()
require.NoError(t, h.Close())
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
require.NoError(t, err)
mmapFilePath := filepath.Join(dir, "chunks_head", "000001")
@ -5430,7 +5432,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
var err error
openHead := func() {
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5513,9 +5515,9 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5606,9 +5608,9 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) {
func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5653,7 +5655,7 @@ func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) {
func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -5718,7 +5720,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
require.NoError(t, head.Close())
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -5729,7 +5731,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -5794,7 +5796,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
require.NoError(t, head.Close())
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -5804,7 +5806,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
}
func TestSnapshotAheadOfWALError(t *testing.T) {
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
head, _ := newTestHead(t, 120*4, compression.None, false)
head.opts.EnableMemorySnapshotOnShutdown = true
// Add a sample to fill WAL.
app := head.Appender(context.Background())
@ -5827,7 +5829,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) {
// to keep using the same snapshot directory instead of a random one.
require.NoError(t, os.RemoveAll(head.wal.Dir()))
head.opts.EnableMemorySnapshotOnShutdown = false
w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
// Add a sample to fill WAL.
@ -5846,7 +5848,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) {
// Create new Head which should detect the incorrect index and delete the snapshot.
head.opts.EnableMemorySnapshotOnShutdown = true
w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
@ -5865,7 +5867,7 @@ func BenchmarkCuttingHeadHistogramChunks(b *testing.B) {
)
samples := histogram.GenerateBigTestHistograms(numSamples, numBuckets)
h, _ := newTestHead(b, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(b, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(b, h.Close())
}()
@ -5982,7 +5984,7 @@ func TestCuttingNewHeadChunks(t *testing.T) {
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6050,7 +6052,7 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) {
numSamples := 1000
baseTS := int64(1695209650)
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6117,7 +6119,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) {
for testName, tc := range testcases {
t.Run(testName, func(t *testing.T) {
h, w := newTestHead(t, 1000, wlog.CompressionNone, false)
h, w := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6154,7 +6156,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) {
// `signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0xbb03d1`
// panic, that we have seen in the wild once.
func TestHeadCompactionWhileAppendAndCommitExemplar(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
app := h.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
ref, err := app.Append(0, lbls, 1, 1)
@ -6267,7 +6269,7 @@ func TestPostingsCardinalityStats(t *testing.T) {
}
func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
t.Cleanup(func() { head.Close() })
ls := labels.FromStrings(labels.MetricName, "test")
@ -6483,7 +6485,7 @@ func TestHeadAppender_AppendCT(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6521,7 +6523,7 @@ func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
// was compactable using default values for min and max times, `Head.compactable()`
// would return true which is incorrect. This test verifies that we short-circuit
// the check when the head has not yet had any samples added.
head, _ := newTestHead(t, 1, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -6563,7 +6565,7 @@ func TestHeadAppendHistogramAndCommitConcurrency(t *testing.T) {
}
func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(storage.Appender, int) error) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()

View file

@ -128,11 +128,10 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
var err error
dec := record.NewDecoder(syms)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
switch dec.Type(r.Record()) {
case record.Series:
series := h.wlReplaySeriesPool.Get()[:0]
series, err = dec.Series(rec, series)
series, err = dec.Series(r.Record(), series)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode series: %w", err),
@ -144,7 +143,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
decoded <- series
case record.Samples:
samples := h.wlReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
samples, err = dec.Samples(r.Record(), samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode samples: %w", err),
@ -156,7 +155,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
decoded <- samples
case record.Tombstones:
tstones := h.wlReplaytStonesPool.Get()[:0]
tstones, err = dec.Tombstones(rec, tstones)
tstones, err = dec.Tombstones(r.Record(), tstones)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode tombstones: %w", err),
@ -168,7 +167,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
decoded <- tstones
case record.Exemplars:
exemplars := h.wlReplayExemplarsPool.Get()[:0]
exemplars, err = dec.Exemplars(rec, exemplars)
exemplars, err = dec.Exemplars(r.Record(), exemplars)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode exemplars: %w", err),
@ -180,7 +179,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
decoded <- exemplars
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
hists := h.wlReplayHistogramsPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
hists, err = dec.HistogramSamples(r.Record(), hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode histograms: %w", err),
@ -192,7 +191,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
decoded <- hists
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
hists, err = dec.FloatHistogramSamples(r.Record(), hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode float histograms: %w", err),
@ -204,7 +203,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
decoded <- hists
case record.Metadata:
meta := h.wlReplayMetadataPool.Get()[:0]
meta, err := dec.Metadata(rec, meta)
meta, err := dec.Metadata(r.Record(), meta)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode metadata: %w", err),

View file

@ -28,7 +28,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/compression"
)
type chunkInterval struct {
@ -300,7 +300,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
for perm, intervals := range permutations {
for _, headChunk := range []bool{false, true} {
t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
h, _ := newTestHead(t, 1000, compression.None, true)
defer func() {
require.NoError(t, h.Close())
}()
@ -388,7 +388,7 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
head, _ := newTestHead(t, chunkRange, compression.None, true)
head.opts.EnableOOONativeHistograms.Store(true)
t.Cleanup(func() { require.NoError(t, head.Close()) })

View file

@ -23,14 +23,14 @@ import (
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/util/testutil"
)
@ -170,7 +170,7 @@ func TestCheckpoint(t *testing.T) {
}
}
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -385,7 +385,7 @@ func TestCheckpoint(t *testing.T) {
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Create a new wlog with invalid data.
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, 64*1024, CompressionNone)
w, err := NewSize(nil, nil, dir, 64*1024, compression.None)
require.NoError(t, err)
var enc record.Encoder
require.NoError(t, w.Log(enc.Series([]record.RefSeries{

View file

@ -22,9 +22,9 @@ import (
"io"
"log/slog"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/util/compression"
)
// LiveReaderMetrics holds all metrics exposed by the LiveReader.
@ -51,14 +51,11 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics {
// NewLiveReader returns a new live reader.
func NewLiveReader(logger *slog.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader {
// Calling zstd.NewReader with a nil io.Reader and no options cannot return an error.
zstdReader, _ := zstd.NewReader(nil)
lr := &LiveReader{
logger: logger,
rdr: r,
zstdReader: zstdReader,
metrics: metrics,
logger: logger,
rdr: r,
decBuf: compression.NewSyncDecodeBuffer(),
metrics: metrics,
// Until we understand how they come about, make readers permissive
// to records spanning pages.
@ -72,12 +69,13 @@ func NewLiveReader(logger *slog.Logger, metrics *LiveReaderMetrics, r io.Reader)
// that are still in the process of being written, and returns records as soon
// as they can be read.
type LiveReader struct {
logger *slog.Logger
rdr io.Reader
err error
rec []byte
compressBuf []byte
zstdReader *zstd.Decoder
logger *slog.Logger
rdr io.Reader
err error
rec []byte
precomprBuf []byte
decBuf compression.DecodeBuffer
hdr [recordHeaderSize]byte
buf [pageSize]byte
readIndex int // Index in buf to start at for next read.
@ -195,39 +193,28 @@ func (r *LiveReader) buildRecord() (bool, error) {
rt := recTypeFromHeader(r.hdr[0])
if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
r.compressBuf = r.compressBuf[:0]
r.precomprBuf = r.precomprBuf[:0]
}
isSnappyCompressed := r.hdr[0]&snappyMask == snappyMask
isZstdCompressed := r.hdr[0]&zstdMask == zstdMask
if isSnappyCompressed || isZstdCompressed {
r.compressBuf = append(r.compressBuf, temp...)
} else {
r.rec = append(r.rec, temp...)
// TODO(bwplotka): Handle unknown compressions.
compr := compression.None
if r.hdr[0]&snappyMask == snappyMask {
compr = compression.Snappy
} else if r.hdr[0]&zstdMask == zstdMask {
compr = compression.Zstd
}
r.precomprBuf = append(r.precomprBuf, temp...)
if err := validateRecord(rt, r.index); err != nil {
r.index = 0
return false, err
}
if rt == recLast || rt == recFull {
r.index = 0
if isSnappyCompressed && len(r.compressBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.compressBuf)
if err != nil {
return false, err
}
} else if isZstdCompressed && len(r.compressBuf) > 0 {
r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0])
if err != nil {
return false, err
}
r.rec, err = compression.Decode(compr, r.precomprBuf, r.decBuf)
if err != nil {
return false, err
}
return true, nil
}

View file

@ -21,17 +21,17 @@ import (
"hash/crc32"
"io"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/prometheus/util/compression"
)
// Reader reads WAL records from an io.Reader.
type Reader struct {
rdr io.Reader
err error
rec []byte
compressBuf []byte
zstdReader *zstd.Decoder
rdr io.Reader
err error
rec []byte
precomprBuf []byte
decBuf compression.DecodeBuffer
buf [pageSize]byte
total int64 // Total bytes processed.
curRecTyp recType // Used for checking that the last record is not torn.
@ -39,15 +39,13 @@ type Reader struct {
// NewReader returns a new reader.
func NewReader(r io.Reader) *Reader {
// Calling zstd.NewReader with a nil io.Reader and no options cannot return an error.
zstdReader, _ := zstd.NewReader(nil)
return &Reader{rdr: r, zstdReader: zstdReader}
return &Reader{rdr: r, decBuf: compression.NewSyncDecodeBuffer()}
}
// Next advances the reader to the next records and returns true if it exists.
// It must not be called again after it returned false.
func (r *Reader) Next() bool {
err := r.next()
err := r.nextNew()
if err != nil && errors.Is(err, io.EOF) {
// The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before
@ -61,14 +59,13 @@ func (r *Reader) Next() bool {
return r.err == nil
}
func (r *Reader) next() (err error) {
func (r *Reader) nextNew() (err error) {
// We have to use r.buf since allocating byte arrays here fails escape
// analysis and ends up on the heap, even though it seemingly should not.
hdr := r.buf[:recordHeaderSize]
buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0]
r.compressBuf = r.compressBuf[:0]
r.precomprBuf = r.precomprBuf[:0]
i := 0
for {
@ -77,8 +74,13 @@ func (r *Reader) next() (err error) {
}
r.total++
r.curRecTyp = recTypeFromHeader(hdr[0])
isSnappyCompressed := hdr[0]&snappyMask == snappyMask
isZstdCompressed := hdr[0]&zstdMask == zstdMask
compr := compression.None
if hdr[0]&snappyMask == snappyMask {
compr = compression.Snappy
} else if hdr[0]&zstdMask == zstdMask {
compr = compression.Zstd
}
// Gobble up zero bytes.
if r.curRecTyp == recPageTerm {
@ -133,29 +135,14 @@ func (r *Reader) next() (err error) {
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return fmt.Errorf("unexpected checksum %x, expected %x", c, crc)
}
if isSnappyCompressed || isZstdCompressed {
r.compressBuf = append(r.compressBuf, buf[:length]...)
} else {
r.rec = append(r.rec, buf[:length]...)
}
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
r.precomprBuf = append(r.precomprBuf, buf[:length]...)
if r.curRecTyp == recLast || r.curRecTyp == recFull {
if isSnappyCompressed && len(r.compressBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.compressBuf)
return err
} else if isZstdCompressed && len(r.compressBuf) > 0 {
r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0])
return err
}
return nil
r.rec, err = compression.Decode(compr, r.precomprBuf, r.decBuf)
return err
}
// Only increment i for non-zero records since we use it

View file

@ -29,11 +29,11 @@ import (
"testing"
"time"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/common/promslog"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/util/compression"
)
type reader interface {
@ -315,7 +315,7 @@ func allSegments(dir string) (io.ReadCloser, error) {
func TestReaderFuzz(t *testing.T) {
for name, fn := range readerConstructors {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) {
dir := t.TempDir()
@ -354,7 +354,7 @@ func TestReaderFuzz(t *testing.T) {
func TestReaderFuzz_Live(t *testing.T) {
logger := promslog.NewNopLogger()
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -444,7 +444,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
logger := promslog.NewNopLogger()
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, pageSize, CompressionNone)
w, err := NewSize(nil, nil, dir, pageSize, compression.None)
require.NoError(t, err)
rec := make([]byte, pageSize-recordHeaderSize)
@ -484,7 +484,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
logger := promslog.NewNopLogger()
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, pageSize*2, CompressionNone)
w, err := NewSize(nil, nil, dir, pageSize*2, compression.None)
require.NoError(t, err)
rec := make([]byte, pageSize-recordHeaderSize)
@ -531,7 +531,7 @@ func TestReaderData(t *testing.T) {
for name, fn := range readerConstructors {
t.Run(name, func(t *testing.T) {
w, err := New(nil, nil, dir, CompressionSnappy)
w, err := New(nil, nil, dir, compression.Snappy)
require.NoError(t, err)
sr, err := allSegments(dir)

View file

@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/util/compression"
)
var (
@ -142,7 +143,7 @@ func TestTailSamples(t *testing.T) {
const samplesCount = 250
const exemplarsCount = 25
const histogramsCount = 50
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
now := time.Now()
@ -290,7 +291,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@ -358,7 +359,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -446,7 +447,7 @@ func TestReadCheckpoint(t *testing.T) {
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -519,7 +520,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
const seriesCount = 20
const samplesCount = 300
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -590,11 +591,11 @@ func TestCheckpointSeriesReset(t *testing.T) {
const seriesCount = 20
const samplesCount = 350
testCases := []struct {
compress CompressionType
compress compression.Type
segments int
}{
{compress: CompressionNone, segments: 14},
{compress: CompressionSnappy, segments: 13},
{compress: compression.None, segments: 14},
{compress: compression.Snappy, segments: 13},
}
for _, tc := range testCases {
@ -681,8 +682,8 @@ func TestRun_StartupTime(t *testing.T) {
const seriesCount = 20
const samplesCount = 300
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@ -774,8 +775,8 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
const seriesCount = 10
const samplesCount = 50
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
wdir := path.Join(dir, "wal")

View file

@ -29,12 +29,12 @@ import (
"sync"
"time"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/util/compression"
)
const (
@ -169,26 +169,6 @@ func OpenReadSegment(fn string) (*Segment, error) {
return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil
}
type CompressionType string
const (
CompressionNone CompressionType = "none"
CompressionSnappy CompressionType = "snappy"
CompressionZstd CompressionType = "zstd"
)
// ParseCompressionType parses the two compression-related configuration values and returns the CompressionType. If
// compression is enabled but the compressType is unrecognized, we default to Snappy compression.
func ParseCompressionType(compress bool, compressType string) CompressionType {
if compress {
if compressType == "zstd" {
return CompressionZstd
}
return CompressionSnappy
}
return CompressionNone
}
// WL is a write log that stores records in segment files.
// It must be read from start to end once before logging new data.
// If an error occurs during read, the repair procedure must be called
@ -210,9 +190,8 @@ type WL struct {
stopc chan chan struct{}
actorc chan func()
closed bool // To allow calling Close() more than once without blocking.
compress CompressionType
compressBuf []byte
zstdWriter *zstd.Encoder
compress compression.Type
cEnc compression.EncodeBuffer
WriteNotified WriteNotified
@ -220,14 +199,17 @@ type WL struct {
}
type wlMetrics struct {
fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter
pageCompletions prometheus.Counter
truncateFail prometheus.Counter
truncateTotal prometheus.Counter
currentSegment prometheus.Gauge
writesFailed prometheus.Counter
walFileSize prometheus.GaugeFunc
fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter
pageCompletions prometheus.Counter
truncateFail prometheus.Counter
truncateTotal prometheus.Counter
currentSegment prometheus.Gauge
writesFailed prometheus.Counter
walFileSize prometheus.GaugeFunc
recordPartWrites prometheus.Counter
recordPartBytes prometheus.Counter
recordBytesSaved *prometheus.CounterVec
r prometheus.Registerer
}
@ -244,78 +226,78 @@ func (w *wlMetrics) Unregister() {
w.r.Unregister(w.currentSegment)
w.r.Unregister(w.writesFailed)
w.r.Unregister(w.walFileSize)
w.r.Unregister(w.recordPartWrites)
w.r.Unregister(w.recordPartBytes)
w.r.Unregister(w.recordBytesSaved)
}
func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics {
m := &wlMetrics{
return &wlMetrics{
r: r,
fsyncDuration: promauto.With(r).NewSummary(prometheus.SummaryOpts{
Name: "fsync_duration_seconds",
Help: "Duration of write log fsync.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
pageFlushes: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "page_flushes_total",
Help: "Total number of page flushes.",
}),
pageCompletions: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "completed_pages_total",
Help: "Total number of completed pages.",
}),
truncateFail: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "truncations_failed_total",
Help: "Total number of write log truncations that failed.",
}),
truncateTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "truncations_total",
Help: "Total number of write log truncations attempted.",
}),
currentSegment: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "segment_current",
Help: "Write log segment index that TSDB is currently writing to.",
}),
writesFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "writes_failed_total",
Help: "Total number of write log writes that failed.",
}),
walFileSize: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: "storage_size_bytes",
Help: "Size of the write log directory.",
}, func() float64 {
val, err := w.Size()
if err != nil {
w.logger.Error("Failed to calculate size of \"wal\" dir", "err", err.Error())
}
return float64(val)
}),
recordPartWrites: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "record_part_writes_total",
Help: "Total number of record parts written before flushing.",
}),
recordPartBytes: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "record_parts_bytes_written_total",
Help: "Total number of record part bytes written before flushing, including" +
" CRC and compression headers.",
}),
recordBytesSaved: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "record_bytes_saved_total",
Help: "Total number of bytes saved by the optional record compression." +
" Use this metric to learn about the effectiveness compression.",
}, []string{"compression"}),
}
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "fsync_duration_seconds",
Help: "Duration of write log fsync.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
m.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "page_flushes_total",
Help: "Total number of page flushes.",
})
m.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "completed_pages_total",
Help: "Total number of completed pages.",
})
m.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "truncations_failed_total",
Help: "Total number of write log truncations that failed.",
})
m.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "truncations_total",
Help: "Total number of write log truncations attempted.",
})
m.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "segment_current",
Help: "Write log segment index that TSDB is currently writing to.",
})
m.writesFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "writes_failed_total",
Help: "Total number of write log writes that failed.",
})
m.walFileSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "storage_size_bytes",
Help: "Size of the write log directory.",
}, func() float64 {
val, err := w.Size()
if err != nil {
w.logger.Error("Failed to calculate size of \"wal\" dir",
"err", err.Error())
}
return float64(val)
})
if r != nil {
r.MustRegister(
m.fsyncDuration,
m.pageFlushes,
m.pageCompletions,
m.truncateFail,
m.truncateTotal,
m.currentSegment,
m.writesFailed,
m.walFileSize,
)
}
return m
}
// New returns a new WAL over the given directory.
func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress CompressionType) (*WL, error) {
func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress compression.Type) (*WL, error) {
return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
}
// NewSize returns a new write log over the given directory.
// New segments are created with the specified size.
func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress CompressionType) (*WL, error) {
func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress compression.Type) (*WL, error) {
if segmentSize%pageSize != 0 {
return nil, errors.New("invalid segment size")
}
@ -326,15 +308,6 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment
logger = promslog.NewNopLogger()
}
var zstdWriter *zstd.Encoder
if compress == CompressionZstd {
var err error
zstdWriter, err = zstd.NewWriter(nil)
if err != nil {
return nil, err
}
}
w := &WL{
dir: dir,
logger: logger,
@ -343,7 +316,7 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment
actorc: make(chan func(), 100),
stopc: make(chan chan struct{}),
compress: compress,
zstdWriter: zstdWriter,
cEnc: compression.NewSyncEncodeBuffer(),
}
prefix := "prometheus_tsdb_wal_"
if filepath.Base(dir) == WblDirName {
@ -382,22 +355,16 @@ func Open(logger *slog.Logger, dir string) (*WL, error) {
if logger == nil {
logger = promslog.NewNopLogger()
}
zstdWriter, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}
w := &WL{
dir: dir,
logger: logger,
zstdWriter: zstdWriter,
dir: dir,
logger: logger,
}
return w, nil
}
// CompressionType returns if compression is enabled on this WAL.
func (w *WL) CompressionType() CompressionType {
func (w *WL) CompressionType() compression.Type {
return w.compress
}
@ -715,26 +682,22 @@ func (w *WL) log(rec []byte, final bool) error {
}
// Compress the record before calculating if a new segment is needed.
compressed := false
if w.compress == CompressionSnappy && len(rec) > 0 {
// If MaxEncodedLen is less than 0 the record is too large to be compressed.
if len(rec) > 0 && snappy.MaxEncodedLen(len(rec)) >= 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
w.compressBuf = w.compressBuf[:cap(w.compressBuf)]
w.compressBuf = snappy.Encode(w.compressBuf, rec)
if len(w.compressBuf) < len(rec) {
rec = w.compressBuf
compressed = true
}
}
} else if w.compress == CompressionZstd && len(rec) > 0 {
w.compressBuf = w.zstdWriter.EncodeAll(rec, w.compressBuf[:0])
if len(w.compressBuf) < len(rec) {
rec = w.compressBuf
compressed = true
enc, compressed, err := compression.Encode(w.compress, rec, w.cEnc)
if err != nil {
return err
}
if compressed {
savedBytes := len(rec) - len(enc)
// Even if the compression was applied, skip it, if there's no benefit
// in the WAL record size (we have a choice). For small records e.g. snappy
// compression can yield larger records than the uncompressed.
if savedBytes <= 0 {
enc = rec
compressed = false
savedBytes = 0
}
w.metrics.recordBytesSaved.WithLabelValues(w.compress).Add(float64(savedBytes))
}
// If the record is too big to fit within the active page in the current
@ -743,7 +706,7 @@ func (w *WL) log(rec []byte, final bool) error {
left := w.page.remaining() - recordHeaderSize // Free space in the active page.
left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages in the active segment.
if len(rec) > left {
if len(enc) > left {
if _, err := w.nextSegment(true); err != nil {
return err
}
@ -751,32 +714,36 @@ func (w *WL) log(rec []byte, final bool) error {
// Populate as many pages as necessary to fit the record.
// Be careful to always do one pass to ensure we write zero-length records.
for i := 0; i == 0 || len(rec) > 0; i++ {
for i := 0; i == 0 || len(enc) > 0; i++ {
p := w.page
// Find how much of the record we can fit into the page.
var (
l = min(len(rec), (pageSize-p.alloc)-recordHeaderSize)
part = rec[:l]
l = min(len(enc), (pageSize-p.alloc)-recordHeaderSize)
part = enc[:l]
buf = p.buf[p.alloc:]
typ recType
)
switch {
case i == 0 && len(part) == len(rec):
case i == 0 && len(part) == len(enc):
typ = recFull
case len(part) == len(rec):
case len(part) == len(enc):
typ = recLast
case i == 0:
typ = recFirst
default:
typ = recMiddle
}
if compressed {
if w.compress == CompressionSnappy {
switch w.compress {
case compression.Snappy:
typ |= snappyMask
} else if w.compress == CompressionZstd {
case compression.Zstd:
typ |= zstdMask
default:
return fmt.Errorf("unsupported compression type: %v", w.compress)
}
}
@ -788,6 +755,9 @@ func (w *WL) log(rec []byte, final bool) error {
copy(buf[recordHeaderSize:], part)
p.alloc += len(part) + recordHeaderSize
w.metrics.recordPartWrites.Inc()
w.metrics.recordPartBytes.Add(float64(len(part) + recordHeaderSize))
if w.page.full() {
if err := w.flushPage(true); err != nil {
// TODO When the flushing fails at this point and the record has not been
@ -796,7 +766,7 @@ func (w *WL) log(rec []byte, final bool) error {
return err
}
}
rec = rec[l:]
enc = enc[l:]
}
// If it's the final record of the batch and the page is not empty, flush it.

View file

@ -30,6 +30,7 @@ import (
"go.uber.org/goleak"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/util/compression"
)
func TestMain(m *testing.M) {
@ -125,7 +126,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
// then corrupt a given record in a given segment.
// As a result we want a repaired WAL with given intact records.
segSize := 3 * pageSize
w, err := NewSize(nil, nil, dir, segSize, CompressionNone)
w, err := NewSize(nil, nil, dir, segSize, compression.None)
require.NoError(t, err)
var records [][]byte
@ -150,7 +151,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
require.NoError(t, f.Close())
w, err = NewSize(nil, nil, dir, segSize, CompressionNone)
w, err = NewSize(nil, nil, dir, segSize, compression.None)
require.NoError(t, err)
defer w.Close()
@ -222,7 +223,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
// Produce a WAL with a two segments of 3 pages with 3 records each,
// so when we truncate the file we're guaranteed to split a record.
{
w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone)
w, err := NewSize(logger, nil, dir, segmentSize, compression.None)
require.NoError(t, err)
for i := 0; i < 18; i++ {
@ -293,7 +294,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
err = sr.Close()
require.NoError(t, err)
w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone)
w, err := NewSize(logger, nil, dir, segmentSize, compression.None)
require.NoError(t, err)
err = w.Repair(corruptionErr)
@ -336,7 +337,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
// TestClose ensures that calling Close more than once doesn't panic and doesn't block.
func TestClose(t *testing.T) {
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, pageSize, CompressionNone)
w, err := NewSize(nil, nil, dir, pageSize, compression.None)
require.NoError(t, err)
require.NoError(t, w.Close())
require.Error(t, w.Close())
@ -349,7 +350,7 @@ func TestSegmentMetric(t *testing.T) {
)
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, segmentSize, CompressionNone)
w, err := NewSize(nil, nil, dir, segmentSize, compression.None)
require.NoError(t, err)
initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment)
@ -368,7 +369,7 @@ func TestSegmentMetric(t *testing.T) {
}
func TestCompression(t *testing.T) {
bootstrap := func(compressed CompressionType) string {
bootstrap := func(compressed compression.Type) string {
const (
segmentSize = pageSize
recordSize = (pageSize / 2) - recordHeaderSize
@ -396,10 +397,10 @@ func TestCompression(t *testing.T) {
}
}()
dirUnCompressed := bootstrap(CompressionNone)
dirUnCompressed := bootstrap(compression.None)
tmpDirs = append(tmpDirs, dirUnCompressed)
for _, compressionType := range []CompressionType{CompressionSnappy, CompressionZstd} {
for _, compressionType := range []compression.Type{compression.Snappy, compression.Zstd} {
dirCompressed := bootstrap(compressionType)
tmpDirs = append(tmpDirs, dirCompressed)
@ -443,7 +444,7 @@ func TestLogPartialWrite(t *testing.T) {
t.Run(testName, func(t *testing.T) {
dirPath := t.TempDir()
w, err := NewSize(nil, nil, dirPath, segmentSize, CompressionNone)
w, err := NewSize(nil, nil, dirPath, segmentSize, compression.None)
require.NoError(t, err)
// Replace the underlying segment file with a mocked one that injects a failure.
@ -510,7 +511,7 @@ func (f *faultySegmentFile) Write(p []byte) (int, error) {
}
func BenchmarkWAL_LogBatched(b *testing.B) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) {
dir := b.TempDir()
@ -540,7 +541,7 @@ func BenchmarkWAL_LogBatched(b *testing.B) {
}
func BenchmarkWAL_Log(b *testing.B) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range compression.Types() {
b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) {
dir := b.TempDir()
@ -567,7 +568,7 @@ func TestUnregisterMetrics(t *testing.T) {
reg := prometheus.NewRegistry()
for i := 0; i < 2; i++ {
wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), CompressionNone)
wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), compression.None)
require.NoError(t, err)
require.NoError(t, wl.Close())
}

148
util/compression/buffers.go Normal file
View file

@ -0,0 +1,148 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compression
import (
"sync"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/prometheus/util/zeropool"
)
type EncodeBuffer interface {
zstdEncBuf() *zstd.Encoder
get() []byte
put([]byte)
}
type syncEBuffer struct {
onceZstd sync.Once
w *zstd.Encoder
buf []byte
}
// NewSyncEncodeBuffer returns synchronous buffer that can only be used
// on one encoding goroutine at once. Notably, the encoded byte slice returned
// by Encode is valid only until the next Encode call.
func NewSyncEncodeBuffer() EncodeBuffer {
return &syncEBuffer{}
}
func (b *syncEBuffer) zstdEncBuf() *zstd.Encoder {
b.onceZstd.Do(func() {
// Without params this never returns error.
b.w, _ = zstd.NewWriter(nil)
})
return b.w
}
func (b *syncEBuffer) get() []byte {
return b.buf
}
func (b *syncEBuffer) put(buf []byte) {
b.buf = buf
}
type concurrentEBuffer struct {
onceZstd sync.Once
w *zstd.Encoder
pool zeropool.Pool[[]byte]
}
// NewConcurrentEncodeBuffer returns a buffer that can be used concurrently.
// NOTE: For Zstd compression a concurrency limit, equal to GOMAXPROCS is implied.
func NewConcurrentEncodeBuffer() EncodeBuffer {
return &concurrentEBuffer{}
}
func (b *concurrentEBuffer) zstdEncBuf() *zstd.Encoder {
b.onceZstd.Do(func() {
// Without params this never returns error.
b.w, _ = zstd.NewWriter(nil)
})
return b.w
}
func (b *concurrentEBuffer) get() []byte {
return b.pool.Get()
}
func (b *concurrentEBuffer) put(buf []byte) {
b.pool.Put(buf)
}
type DecodeBuffer interface {
zstdDecBuf() *zstd.Decoder
get() []byte
put([]byte)
}
type syncDBuffer struct {
onceZstd sync.Once
r *zstd.Decoder
buf []byte
}
// NewSyncDecodeBuffer returns synchronous buffer that can only be used
// on one decoding goroutine at once. Notably, the decoded byte slice returned
// by Decode is valid only until the next Decode call.
func NewSyncDecodeBuffer() DecodeBuffer {
return &syncDBuffer{}
}
func (b *syncDBuffer) zstdDecBuf() *zstd.Decoder {
b.onceZstd.Do(func() {
// Without params this never returns error.
b.r, _ = zstd.NewReader(nil)
})
return b.r
}
func (b *syncDBuffer) get() []byte {
return b.buf
}
func (b *syncDBuffer) put(buf []byte) {
b.buf = buf
}
type concurrentDBuffer struct {
onceZstd sync.Once
r *zstd.Decoder
pool zeropool.Pool[[]byte]
}
// NewConcurrentDecodeBuffer returns a buffer that can be used concurrently.
// NOTE: For Zstd compression a concurrency limit, equal to GOMAXPROCS is implied.
func NewConcurrentDecodeBuffer() DecodeBuffer {
return &concurrentDBuffer{}
}
func (b *concurrentDBuffer) zstdDecBuf() *zstd.Decoder {
b.onceZstd.Do(func() {
// Without params this never returns error.
b.r, _ = zstd.NewReader(nil)
})
return b.r
}
func (b *concurrentDBuffer) get() []byte {
return b.pool.Get()
}
func (b *concurrentDBuffer) put(buf []byte) {
b.pool.Put(buf)
}

View file

@ -0,0 +1,125 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compression
import (
"errors"
"fmt"
"github.com/golang/snappy"
)
// Type represents a valid compression type supported by this package.
type Type = string
const (
// None represents no compression case.
// None it's a default when Type is empty.
None Type = "none"
// Snappy represents snappy block format.
Snappy Type = "snappy"
// Zstd represents "speed" mode of Zstd (Zstandard https://facebook.github.io/zstd/).
// This is roughly equivalent to the default Zstandard mode (level 3).
Zstd Type = "zstd"
)
func Types() []Type { return []Type{None, Snappy, Zstd} }
// Encode returns the encoded form of src for the given compression type. It also
// returns the indicator if the compression was performed. Encode may skip
// compressing for None type, but also when src is too large e.g. for Snappy block format.
//
// The buf allows passing various buffer implementations that make encoding more
// efficient. See NewSyncEncodeBuffer and NewConcurrentEncodeBuffer for further
// details. For non-zstd compression types, it is valid to pass nil buf.
//
// Encode is concurrency-safe, however note the concurrency limits for the
// buffer of your choice.
func Encode(t Type, src []byte, buf EncodeBuffer) (ret []byte, compressed bool, err error) {
if len(src) == 0 || t == "" || t == None {
return src, false, nil
}
if t == Snappy {
// If MaxEncodedLen is less than 0 the record is too large to be compressed.
if snappy.MaxEncodedLen(len(src)) < 0 {
return src, false, nil
}
var b []byte
if buf != nil {
b = buf.get()
defer func() {
buf.put(ret)
}()
}
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
b = b[:cap(b)]
return snappy.Encode(b, src), true, nil
}
if t == Zstd {
if buf == nil {
return nil, false, errors.New("zstd requested but EncodeBuffer was not provided")
}
b := buf.get()
defer func() {
buf.put(ret)
}()
return buf.zstdEncBuf().EncodeAll(src, b[:0]), true, nil
}
return nil, false, fmt.Errorf("unsupported compression type: %s", t)
}
// Decode returns the decoded form of src for the given compression type.
//
// The buf allows passing various buffer implementations that make decoding more
// efficient. See NewSyncDecodeBuffer and NewConcurrentDecodeBuffer for further
// details. For non-zstd compression types, it is valid to pass nil buf.
//
// Decode is concurrency-safe, however note the concurrency limits for the
// buffer of your choice.
func Decode(t Type, src []byte, buf DecodeBuffer) (ret []byte, err error) {
if len(src) == 0 || t == "" || t == None {
return src, nil
}
if t == Snappy {
var b []byte
if buf != nil {
b = buf.get()
defer func() {
// Save/put reusable return buffer.
buf.put(ret)
}()
}
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
b = b[:cap(b)]
return snappy.Decode(b, src)
}
if t == Zstd {
if buf == nil {
return nil, errors.New("zstd requested but DecodeBuffer was not provided")
}
b := buf.get()
defer func() {
// Save/put reusable return buffer.
buf.put(ret)
}()
return buf.zstdDecBuf().DecodeAll(src, b[:0])
}
return nil, fmt.Errorf("unsupported compression type: %s", t)
}

View file

@ -0,0 +1,200 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compression
import (
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
const compressible = `ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
fsfsdfsfddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa2
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa12
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa1
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa121
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa324
ddddddddddsfpgjsdoadjgfpajdspfgjasfjapddddddddddaaaaaaaa145
`
func TestEncodeDecode(t *testing.T) {
for _, tcase := range []struct {
name string
src string
types []Type
encBuf EncodeBuffer
decBuf DecodeBuffer
expectCompression bool
expectEncErr error
expectDecErr error
}{
{
name: "empty src; no buffers",
types: Types(),
src: "",
expectCompression: false,
},
{
name: "empty src; sync buffers",
types: Types(),
encBuf: NewSyncEncodeBuffer(), decBuf: NewSyncDecodeBuffer(),
src: "",
expectCompression: false,
},
{
name: "empty src; concurrent buffers",
types: Types(),
encBuf: NewConcurrentEncodeBuffer(), decBuf: NewConcurrentDecodeBuffer(),
src: "",
expectCompression: false,
},
{
name: "no buffers",
types: []Type{None},
src: compressible,
expectCompression: false,
},
{
name: "no buffers",
types: []Type{Snappy},
src: compressible,
expectCompression: true,
},
{
name: "no buffers",
types: []Type{Zstd},
src: compressible,
expectEncErr: errors.New("zstd requested but EncodeBuffer was not provided"),
},
{
name: "sync buffers",
types: []Type{None},
encBuf: NewSyncEncodeBuffer(), decBuf: NewSyncDecodeBuffer(),
src: compressible,
expectCompression: false,
},
{
name: "sync buffers",
types: Types()[1:], // All but none
encBuf: NewSyncEncodeBuffer(), decBuf: NewSyncDecodeBuffer(),
src: compressible,
expectCompression: true,
},
{
name: "concurrent buffers",
types: []Type{None},
encBuf: NewConcurrentEncodeBuffer(), decBuf: NewConcurrentDecodeBuffer(),
src: compressible,
expectCompression: false,
},
{
name: "concurrent buffers",
types: Types()[1:], // All but none
encBuf: NewConcurrentEncodeBuffer(), decBuf: NewConcurrentDecodeBuffer(),
src: compressible,
expectCompression: true,
},
} {
require.NotEmpty(t, tcase.types, "must specify at least one type")
for _, typ := range tcase.types {
t.Run(fmt.Sprintf("case=%v/type=%v", tcase.name, typ), func(t *testing.T) {
res, compressed, err := Encode(typ, []byte(tcase.src), tcase.encBuf)
if tcase.expectEncErr != nil {
require.ErrorContains(t, err, tcase.expectEncErr.Error())
return
}
require.NoError(t, err)
require.Equal(t, tcase.expectCompression, compressed)
if compressed {
require.Less(t, len(res), len(tcase.src))
}
// Decode back.
got, err := Decode(typ, res, tcase.decBuf)
if tcase.expectDecErr != nil {
require.ErrorContains(t, err, tcase.expectDecErr.Error())
return
}
require.NoError(t, err)
require.Equal(t, tcase.src, string(got))
})
}
}
}
/*
export bench=encode-v1 && go test ./util/compression/... \
-run '^$' -bench '^BenchmarkEncode' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkEncode(b *testing.B) {
for _, typ := range Types() {
b.Run(fmt.Sprintf("type=%v", typ), func(b *testing.B) {
var buf EncodeBuffer
compressible := []byte(compressible)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if buf == nil {
buf = NewSyncEncodeBuffer()
}
res, _, err := Encode(typ, compressible, buf)
require.NoError(b, err)
b.ReportMetric(float64(len(res)), "B")
}
})
}
}
/*
export bench=decode-v1 && go test ./util/compression/... \
-run '^$' -bench '^BenchmarkDecode' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkDecode(b *testing.B) {
for _, typ := range Types() {
b.Run(fmt.Sprintf("type=%v", typ), func(b *testing.B) {
var buf DecodeBuffer
res, _, err := Encode(typ, []byte(compressible), NewConcurrentEncodeBuffer())
require.NoError(b, err)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if buf == nil {
buf = NewSyncDecodeBuffer()
}
_, err := Decode(typ, res, buf)
require.NoError(b, err)
}
})
}
}