diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md deleted file mode 100644 index cf839b560..000000000 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ /dev/null @@ -1,26 +0,0 @@ ---- -name: Feature request -about: Suggest an idea for this project. -title: '' -labels: '' -assignees: '' ---- - - -## Proposal -**Use case. Why is this important?** - -*“Nice to have” is not a good use case. :)* diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml new file mode 100644 index 000000000..40f6f1388 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.yml @@ -0,0 +1,23 @@ +--- +name: Feature request +description: Suggest an idea for this project. +body: + - type: markdown + attributes: + value: >- + Please do *NOT* ask support questions in Github issues. + + + If your issue is not a feature request or bug report use + our [community support](https://prometheus.io/community/). + + + There is also [commercial + support](https://prometheus.io/support-training/) available. + - type: textarea + attributes: + label: Proposal + description: Use case. Why is this important? + placeholder: “Nice to have” is not a good use case. :) + validations: + required: true diff --git a/.github/workflows/buf-lint.yml b/.github/workflows/buf-lint.yml index 8c2d52a98..8d85178fb 100644 --- a/.github/workflows/buf-lint.yml +++ b/.github/workflows/buf-lint.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - uses: bufbuild/buf-setup-action@v1.7.0 + - uses: bufbuild/buf-setup-action@v1.8.0 - uses: bufbuild/buf-lint-action@v1 with: input: 'prompb' diff --git a/.github/workflows/buf.yml b/.github/workflows/buf.yml index d87482567..175940dbc 100644 --- a/.github/workflows/buf.yml +++ b/.github/workflows/buf.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - uses: bufbuild/buf-setup-action@v1.7.0 + - uses: bufbuild/buf-setup-action@v1.8.0 - uses: bufbuild/buf-lint-action@v1 with: input: 'prompb' diff --git a/CHANGELOG.md b/CHANGELOG.md index c5e858b54..bb99ef410 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 2.39.1 / 2022-10-07 + +* [BUGFIX] Rules: Fix notifier relabel changing the labels on active alerts. #11427 + ## 2.39.0 / 2022-10-05 * [FEATURE] **experimental** TSDB: Add support for ingesting out-of-order samples. This is configured via `out_of_order_time_window` field in the config file; check config file docs for more info. #11075 diff --git a/README.md b/README.md index 6ca98143c..6b3f6cf01 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,11 @@ -# Prometheus +

+ Prometheus
Prometheus +

+ +

Visit prometheus.io for the full documentation, +examples and guides.

+ +
[![CircleCI](https://circleci.com/gh/prometheus/prometheus/tree/main.svg?style=shield)][circleci] [![Docker Repository on Quay](https://quay.io/repository/prometheus/prometheus/status)][quay] @@ -8,8 +15,7 @@ [![Gitpod ready-to-code](https://img.shields.io/badge/Gitpod-ready--to--code-blue?logo=gitpod)](https://gitpod.io/#https://github.com/prometheus/prometheus) [![Fuzzing Status](https://oss-fuzz-build-logs.storage.googleapis.com/badges/prometheus.svg)](https://bugs.chromium.org/p/oss-fuzz/issues/list?sort=-opened&can=1&q=proj:prometheus) -Visit [prometheus.io](https://prometheus.io) for the full documentation, -examples and guides. +
Prometheus, a [Cloud Native Computing Foundation](https://cncf.io/) project, is a systems and service monitoring system. It collects metrics from configured targets at given intervals, evaluates rule expressions, diff --git a/VERSION b/VERSION index cde8adf34..ec1282255 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.39.0 +2.39.1 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b18b47710..39fa5e311 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -73,7 +73,6 @@ import ( "github.com/prometheus/prometheus/tsdb/agent" "github.com/prometheus/prometheus/util/logging" prom_runtime "github.com/prometheus/prometheus/util/runtime" - "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/web" ) @@ -623,7 +622,7 @@ func main() { Appendable: fanoutStorage, Queryable: localStorage, QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), - NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()), + NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), Context: ctxRule, ExternalURL: cfg.web.ExternalURL, Registerer: prometheus.DefaultRegisterer, @@ -1274,36 +1273,6 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) { return eu, nil } -type sender interface { - Send(alerts ...*notifier.Alert) -} - -// sendAlerts implements the rules.NotifyFunc for a Notifier. -func sendAlerts(s sender, externalURL string) rules.NotifyFunc { - return func(ctx context.Context, expr string, alerts ...*rules.Alert) { - var res []*notifier.Alert - - for _, alert := range alerts { - a := ¬ifier.Alert{ - StartsAt: alert.FiredAt, - Labels: alert.Labels, - Annotations: alert.Annotations, - GeneratorURL: externalURL + strutil.TableLinkForExpression(expr), - } - if !alert.ResolvedAt.IsZero() { - a.EndsAt = alert.ResolvedAt - } else { - a.EndsAt = alert.ValidUntil - } - res = append(res, a) - } - - if len(alerts) > 0 { - s.Send(res...) - } - } -} - // readyStorage implements the Storage interface while allowing to set the actual // storage at a later point in time. type readyStorage struct { diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index 7dec5b9a5..9fbca5c33 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -198,7 +198,7 @@ func TestSendAlerts(t *testing.T) { } require.Equal(t, tc.exp, alerts) }) - sendAlerts(senderFunc, "http://localhost:9090")(context.TODO(), "up", tc.in...) + rules.SendAlerts(senderFunc, "http://localhost:9090")(context.TODO(), "up", tc.in...) }) } } diff --git a/documentation/examples/prometheus-kubernetes.yml b/documentation/examples/prometheus-kubernetes.yml index 3d3861ab4..9a6228734 100644 --- a/documentation/examples/prometheus-kubernetes.yml +++ b/documentation/examples/prometheus-kubernetes.yml @@ -57,11 +57,6 @@ scrape_configs: regex: default;kubernetes;https # Scrape config for nodes (kubelet). - # - # Rather than connecting directly to the node, the scrape is proxied though the - # Kubernetes apiserver. This means it will work if Prometheus is running out of - # cluster, or can't connect to nodes for some other reason (e.g. because of - # firewalling). - job_name: "kubernetes-nodes" # Default to scraping over https. If required, just disable this or change to diff --git a/documentation/images/prometheus-logo.svg b/documentation/images/prometheus-logo.svg new file mode 100644 index 000000000..026f9e5bc --- /dev/null +++ b/documentation/images/prometheus-logo.svg @@ -0,0 +1,50 @@ + + + +image/svg+xml diff --git a/go.mod b/go.mod index dc13caeb2..176d8b732 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/pprof v0.0.0-20220829040838-70bd9ae97f40 github.com/gophercloud/gophercloud v1.0.0 - github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 + github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/hashicorp/consul/api v1.15.2 github.com/hashicorp/nomad/api v0.0.0-20220921012004-ddeeb1040edf diff --git a/go.sum b/go.sum index f0821abb6..7992e9f37 100644 --- a/go.sum +++ b/go.sum @@ -438,8 +438,8 @@ github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 h1:uirlL/j72L93RhV4+mkWhjv0cov2I0MIgPOG9rMDr1k= -github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= +github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 h1:A3dhViTeFDSQcGOXuUi6ukCQSMyDtDISBp2z6OOo2YM= +github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= diff --git a/rules/alerting.go b/rules/alerting.go index ccbb1b592..4cfb1fa85 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -507,6 +507,8 @@ func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay } alert.ValidUntil = ts.Add(4 * delta) anew := *alert + // The notifier re-uses the labels slice, hence make a copy. + anew.Labels = alert.Labels.Copy() alerts = append(alerts, &anew) } }) diff --git a/rules/alerting_test.go b/rules/alerting_test.go index ad577b039..4f5f5e683 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -20,10 +20,13 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" @@ -659,3 +662,53 @@ func TestQueryForStateSeries(t *testing.T) { testFunc(tst) } } + +// TestSendAlertsDontAffectActiveAlerts tests a fix for https://github.com/prometheus/prometheus/issues/11424. +func TestSendAlertsDontAffectActiveAlerts(t *testing.T) { + rule := NewAlertingRule( + "TestRule", + nil, + time.Minute, + labels.FromStrings("severity", "critical"), + labels.EmptyLabels(), labels.EmptyLabels(), "", true, nil, + ) + + // Set an active alert. + lbls := labels.FromStrings("a1", "1") + h := lbls.Hash() + al := &Alert{State: StateFiring, Labels: lbls, ActiveAt: time.Now()} + rule.active[h] = al + + expr, err := parser.ParseExpr("foo") + require.NoError(t, err) + rule.vector = expr + + // The relabel rule reproduced the bug here. + opts := notifier.Options{ + QueueCapacity: 1, + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"a1"}, + Regex: relabel.MustNewRegexp("(.+)"), + TargetLabel: "a1", + Replacement: "bug", + Action: "replace", + }, + }, + } + nm := notifier.NewManager(&opts, log.NewNopLogger()) + + f := SendAlerts(nm, "") + notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) { + require.Len(t, alerts, 1) + require.Equal(t, al, alerts[0]) + f(ctx, expr, alerts...) + } + + rule.sendAlerts(context.Background(), time.Now(), 0, 0, notifyFunc) + nm.Stop() + + // The relabel rule changes a1=1 to a1=bug. + // But the labels with the AlertingRule should not be changed. + require.Equal(t, labels.FromStrings("a1", "1"), rule.active[h].Labels) +} diff --git a/rules/manager.go b/rules/manager.go index b84534d27..ced61ea49 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -35,10 +35,12 @@ import ( "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/strutil" ) // RuleHealth describes the health state of a rule. @@ -1169,3 +1171,33 @@ func (m *Manager) AlertingRules() []*AlertingRule { return alerts } + +type Sender interface { + Send(alerts ...*notifier.Alert) +} + +// SendAlerts implements the rules.NotifyFunc for a Notifier. +func SendAlerts(s Sender, externalURL string) NotifyFunc { + return func(ctx context.Context, expr string, alerts ...*Alert) { + var res []*notifier.Alert + + for _, alert := range alerts { + a := ¬ifier.Alert{ + StartsAt: alert.FiredAt, + Labels: alert.Labels, + Annotations: alert.Annotations, + GeneratorURL: externalURL + strutil.TableLinkForExpression(expr), + } + if !alert.ResolvedAt.IsZero() { + a.EndsAt = alert.ResolvedAt + } else { + a.EndsAt = alert.ValidUntil + } + res = append(res, a) + } + + if len(alerts) > 0 { + s.Send(res...) + } + } +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 9e0c643da..e701cb94b 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -39,7 +39,7 @@ import ( "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) const ( @@ -400,7 +400,7 @@ type QueueManager struct { relabelConfigs []*relabel.Config sendExemplars bool sendNativeHistograms bool - watcher *wal.Watcher + watcher *wlog.Watcher metadataWatcher *MetadataWatcher clientMtx sync.RWMutex @@ -433,8 +433,8 @@ type QueueManager struct { // the WAL directory will be constructed as /wal. func NewQueueManager( metrics *queueManagerMetrics, - watcherMetrics *wal.WatcherMetrics, - readerMetrics *wal.LiveReaderMetrics, + watcherMetrics *wlog.WatcherMetrics, + readerMetrics *wlog.LiveReaderMetrics, logger log.Logger, dir string, samplesIn *ewmaRate, @@ -484,7 +484,7 @@ func NewQueueManager( highestRecvTimestamp: highestRecvTimestamp, } - t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) + t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) if t.mcfg.Send { t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index dffdc84a8..86b4e4586 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -855,7 +855,7 @@ func BenchmarkSampleSend(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { m.Append(samples) - m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does + m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does m.SeriesReset(i + 1) } // Do not include shutdown diff --git a/storage/remote/write.go b/storage/remote/write.go index 6d8240e91..2e38fb8e6 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -30,7 +30,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) var ( @@ -60,8 +60,8 @@ type WriteStorage struct { reg prometheus.Registerer mtx sync.Mutex - watcherMetrics *wal.WatcherMetrics - liveReaderMetrics *wal.LiveReaderMetrics + watcherMetrics *wlog.WatcherMetrics + liveReaderMetrics *wlog.LiveReaderMetrics externalLabels labels.Labels dir string queues map[string]*QueueManager @@ -82,8 +82,8 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f } rws := &WriteStorage{ queues: make(map[string]*QueueManager), - watcherMetrics: wal.NewWatcherMetrics(reg), - liveReaderMetrics: wal.NewLiveReaderMetrics(reg), + watcherMetrics: wlog.NewWatcherMetrics(reg), + liveReaderMetrics: wlog.NewLiveReaderMetrics(reg), logger: logger, reg: reg, flushDeadline: flushDeadline, diff --git a/tsdb/README.md b/tsdb/README.md index ad9354586..80770e8dd 100644 --- a/tsdb/README.md +++ b/tsdb/README.md @@ -13,7 +13,7 @@ which handles storage and querying of all Prometheus v2 data. ## External resources -* A writeup of the original design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/). +* A writeup of the original design can be found [here](https://web.archive.org/web/20210803115658/https://fabxc.org/tsdb/). * Video: [Storing 16 Bytes at Scale](https://youtu.be/b_pEevMAC3I) from [PromCon 2017](https://promcon.io/2017-munich/). * Compression is based on the Gorilla TSDB [white paper](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf). diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 985e24d95..9574c4b8a 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -41,7 +41,7 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) var ErrUnsupported = errors.New("unsupported operation with WAL-only storage") @@ -81,7 +81,7 @@ type Options struct { // millisecond-precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wal.DefaultSegmentSize, + WALSegmentSize: wlog.DefaultSegmentSize, WALCompression: false, StripeSize: tsdb.DefaultStripeSize, TruncateFrequency: DefaultTruncateFrequency, @@ -220,7 +220,7 @@ type DB struct { opts *Options rs *remote.Storage - wal *wal.WAL + wal *wlog.WL locker *tsdbutil.DirLocker appenderPool sync.Pool @@ -255,7 +255,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin // remote_write expects WAL to be stored in a "wal" subdirectory of the main storage. dir = filepath.Join(dir, "wal") - w, err := wal.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression) + w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression) if err != nil { return nil, errors.Wrap(err, "creating WAL") } @@ -307,7 +307,7 @@ func validateOptions(opts *Options) *Options { opts = DefaultOptions() } if opts.WALSegmentSize <= 0 { - opts.WALSegmentSize = wal.DefaultSegmentSize + opts.WALSegmentSize = wlog.DefaultSegmentSize } // Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2. @@ -337,7 +337,7 @@ func (db *DB) replayWAL() error { level.Info(db.logger).Log("msg", "replaying WAL, this may take a while", "dir", db.wal.Dir()) start := time.Now() - dir, startFrom, err := wal.LastCheckpoint(db.wal.Dir()) + dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir()) if err != nil && err != record.ErrNotFound { return errors.Wrap(err, "find last checkpoint") } @@ -345,7 +345,7 @@ func (db *DB) replayWAL() error { multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} if err == nil { - sr, err := wal.NewSegmentsReader(dir) + sr, err := wlog.NewSegmentsReader(dir) if err != nil { return errors.Wrap(err, "open checkpoint") } @@ -357,7 +357,7 @@ func (db *DB) replayWAL() error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := db.loadWAL(wal.NewReader(sr), multiRef); err != nil { + if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil { return errors.Wrap(err, "backfill checkpoint") } startFrom++ @@ -365,20 +365,20 @@ func (db *DB) replayWAL() error { } // Find the last segment. - _, last, err := wal.Segments(db.wal.Dir()) + _, last, err := wlog.Segments(db.wal.Dir()) if err != nil { return errors.Wrap(err, "finding WAL segments") } // Backfil segments from the most recent checkpoint onwards. for i := startFrom; i <= last; i++ { - seg, err := wal.OpenReadSegment(wal.SegmentName(db.wal.Dir(), i)) + seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i)) if err != nil { return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) } - sr := wal.NewSegmentBufReader(seg) - err = db.loadWAL(wal.NewReader(sr), multiRef) + sr := wlog.NewSegmentBufReader(seg) + err = db.loadWAL(wlog.NewReader(sr), multiRef) if err := sr.Close(); err != nil { level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err) } @@ -394,7 +394,7 @@ func (db *DB) replayWAL() error { return nil } -func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { +func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { var ( dec record.Decoder lastRef = chunks.HeadSeriesRef(db.nextRef.Load()) @@ -423,7 +423,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He series := seriesPool.Get().([]record.RefSeries)[:0] series, err = dec.Series(rec, series) if err != nil { - errCh <- &wal.CorruptionErr{ + errCh <- &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode series"), Segment: r.Segment(), Offset: r.Offset(), @@ -435,7 +435,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He samples := samplesPool.Get().([]record.RefSample)[:0] samples, err = dec.Samples(rec, samples) if err != nil { - errCh <- &wal.CorruptionErr{ + errCh <- &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode samples"), Segment: r.Segment(), Offset: r.Offset(), @@ -449,7 +449,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He // stripeSeries.exemplars in the next block by using setLatestExemplar. continue default: - errCh <- &wal.CorruptionErr{ + errCh <- &wlog.CorruptionErr{ Err: errors.Errorf("invalid record type %v", dec.Type(rec)), Segment: r.Segment(), Offset: r.Offset(), @@ -564,7 +564,7 @@ func (db *DB) truncate(mint int64) error { db.gc(mint) level.Info(db.logger).Log("msg", "series GC completed", "duration", time.Since(start)) - first, last, err := wal.Segments(db.wal.Dir()) + first, last, err := wlog.Segments(db.wal.Dir()) if err != nil { return errors.Wrap(err, "get segment range") } @@ -598,9 +598,9 @@ func (db *DB) truncate(mint int64) error { db.metrics.checkpointCreationTotal.Inc() - if _, err = wal.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil { + if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil { db.metrics.checkpointCreationFail.Inc() - if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok { + if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok { db.metrics.walCorruptionsTotal.Inc() } return errors.Wrap(err, "create checkpoint") @@ -622,7 +622,7 @@ func (db *DB) truncate(mint int64) error { db.metrics.checkpointDeleteTotal.Inc() db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) - if err := wal.DeleteCheckpoints(db.wal.Dir(), last); err != nil { + if err := wlog.DeleteCheckpoints(db.wal.Dir(), last); err != nil { // Leftover old checkpoints do not cause problems down the line beyond // occupying disk space. They will just be ignored since a newer checkpoint // exists. @@ -642,7 +642,7 @@ func (db *DB) gc(mint int64) { deleted := db.series.GC(mint) db.metrics.numActiveSeries.Sub(float64(len(deleted))) - _, last, _ := wal.Segments(db.wal.Dir()) + _, last, _ := wlog.Segments(db.wal.Dir()) // We want to keep series records for any newly deleted series // until we've passed the last recorded segment. This prevents diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 965553373..c32e901e6 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -35,7 +35,7 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/testutil" ) @@ -141,7 +141,7 @@ func TestCommit(t *testing.T) { require.NoError(t, app.Commit()) require.NoError(t, s.Close()) - sr, err := wal.NewSegmentsReader(s.wal.Dir()) + sr, err := wlog.NewSegmentsReader(s.wal.Dir()) require.NoError(t, err) defer func() { require.NoError(t, sr.Close()) @@ -149,7 +149,7 @@ func TestCommit(t *testing.T) { // Read records from WAL and check for expected count of series, samples, and exemplars. var ( - r = wal.NewReader(sr) + r = wlog.NewReader(sr) dec record.Decoder walSeriesCount, walSamplesCount, walExemplarsCount int @@ -211,7 +211,7 @@ func TestRollback(t *testing.T) { require.NoError(t, app.Commit()) require.NoError(t, s.Close()) - sr, err := wal.NewSegmentsReader(s.wal.Dir()) + sr, err := wlog.NewSegmentsReader(s.wal.Dir()) require.NoError(t, err) defer func() { require.NoError(t, sr.Close()) @@ -219,7 +219,7 @@ func TestRollback(t *testing.T) { // Read records from WAL and check for expected count of series and samples. var ( - r = wal.NewReader(sr) + r = wlog.NewReader(sr) dec record.Decoder walSeriesCount, walSamplesCount, walExemplarsCount int @@ -534,10 +534,10 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) { // Read back what was written to the WAL. var walExemplarsCount int - sr, err := wal.NewSegmentsReader(s.wal.Dir()) + sr, err := wlog.NewSegmentsReader(s.wal.Dir()) require.NoError(t, err) defer sr.Close() - r := wal.NewReader(sr) + r := wlog.NewReader(sr) var dec record.Decoder for r.Next() { diff --git a/tsdb/block_test.go b/tsdb/block_test.go index f6823c9c2..ab8f6703b 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -37,7 +37,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) // In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped @@ -489,7 +489,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { return filepath.Join(dir, ulid.String()) } -func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head { +func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string) *Head { opts := DefaultHeadOptions() opts.ChunkDirRoot = chunkDir head, err := NewHead(nil, nil, w, nil, opts, nil) @@ -530,7 +530,7 @@ func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir str return head } -func createHeadWithOOOSamples(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string, oooSampleFrequency int) *Head { +func createHeadWithOOOSamples(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string, oooSampleFrequency int) *Head { opts := DefaultHeadOptions() opts.ChunkDirRoot = chunkDir opts.OutOfOrderTimeWindow.Store(10000000000) diff --git a/tsdb/db.go b/tsdb/db.go index 54ed6467a..d47206795 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -45,7 +45,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met. "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) const ( @@ -70,7 +70,7 @@ var ErrNotReady = errors.New("TSDB not ready") // millisecond precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wal.DefaultSegmentSize, + WALSegmentSize: wlog.DefaultSegmentSize, MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), MinBlockDuration: DefaultBlockDuration, @@ -393,14 +393,14 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { if len(blockReaders) > 0 { maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime } - w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal")) + w, err := wlog.Open(db.logger, filepath.Join(db.dir, "wal")) if err != nil { return err } - var wbl *wal.WAL - wblDir := filepath.Join(db.dir, wal.WblDirName) + var wbl *wlog.WL + wblDir := filepath.Join(db.dir, wlog.WblDirName) if _, err := os.Stat(wblDir); !os.IsNotExist(err) { - wbl, err = wal.Open(db.logger, wblDir) + wbl, err = wlog.Open(db.logger, wblDir) if err != nil { return err } @@ -477,14 +477,14 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue if err := head.Close(); err != nil { return nil, err } - w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal")) + w, err := wlog.Open(db.logger, filepath.Join(db.dir, "wal")) if err != nil { return nil, err } - var wbl *wal.WAL - wblDir := filepath.Join(db.dir, wal.WblDirName) + var wbl *wlog.WL + wblDir := filepath.Join(db.dir, wlog.WblDirName) if _, err := os.Stat(wblDir); !os.IsNotExist(err) { - wbl, err = wal.Open(db.logger, wblDir) + wbl, err = wlog.Open(db.logger, wblDir) if err != nil { return nil, err } @@ -681,7 +681,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } walDir := filepath.Join(dir, "wal") - wblDir := filepath.Join(dir, wal.WblDirName) + wblDir := filepath.Join(dir, wlog.WblDirName) // Migrate old WAL if one exists. if err := MigrateWAL(l, walDir); err != nil { @@ -743,15 +743,15 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } db.compactCancel = cancel - var wlog, wblog *wal.WAL - segmentSize := wal.DefaultSegmentSize + var wal, wbl *wlog.WL + segmentSize := wlog.DefaultSegmentSize // Wal is enabled. if opts.WALSegmentSize >= 0 { // Wal is set to a custom size. if opts.WALSegmentSize > 0 { segmentSize = opts.WALSegmentSize } - wlog, err = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression) + wal, err = wlog.NewSize(l, r, walDir, segmentSize, opts.WALCompression) if err != nil { return nil, err } @@ -761,7 +761,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs return nil, err } if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 { - wblog, err = wal.NewSize(l, r, wblDir, segmentSize, opts.WALCompression) + wbl, err = wlog.NewSize(l, r, wblDir, segmentSize, opts.WALCompression) if err != nil { return nil, err } @@ -786,7 +786,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs // We only override this flag if isolation is disabled at DB level. We use the default otherwise. headOpts.IsolationDisabled = opts.IsolationDisabled } - db.head, err = NewHead(r, l, wlog, wblog, headOpts, stats.Head) + db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head) if err != nil { return nil, err } @@ -818,12 +818,12 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs isOOOErr := isErrLoadOOOWal(initErr) if isOOOErr { level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr) - if err := wblog.Repair(initErr); err != nil { + if err := wbl.Repair(initErr); err != nil { return nil, errors.Wrap(err, "repair corrupted OOO WAL") } } else { level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr) - if err := wlog.Repair(initErr); err != nil { + if err := wal.Repair(initErr); err != nil { return nil, errors.Wrap(err, "repair corrupted WAL") } } @@ -952,19 +952,19 @@ func (db *DB) ApplyConfig(conf *config.Config) error { } // Create WBL if it was not present and if OOO is enabled with WAL enabled. - var wblog *wal.WAL + var wblog *wlog.WL var err error if db.head.wbl != nil { // The existing WBL from the disk might have been replayed while OOO was disabled. wblog = db.head.wbl } else if !db.oooWasEnabled.Load() && oooTimeWindow > 0 && db.opts.WALSegmentSize >= 0 { - segmentSize := wal.DefaultSegmentSize + segmentSize := wlog.DefaultSegmentSize // Wal is set to a custom size. if db.opts.WALSegmentSize > 0 { segmentSize = db.opts.WALSegmentSize } - oooWalDir := filepath.Join(db.dir, wal.WblDirName) - wblog, err = wal.NewSize(db.logger, db.registerer, oooWalDir, segmentSize, db.opts.WALCompression) + oooWalDir := filepath.Join(db.dir, wlog.WblDirName) + wblog, err = wlog.NewSize(db.logger, db.registerer, oooWalDir, segmentSize, db.opts.WALCompression) if err != nil { return err } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2cdd0253c..83ffb5dbc 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -52,7 +52,7 @@ import ( "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/testutil" ) @@ -244,7 +244,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) { require.NoError(t, err) f, err := os.OpenFile(path.Join(db.Dir(), "wal", walFiles[0].Name()), os.O_RDWR, 0o666) require.NoError(t, err) - r := wal.NewReader(bufio.NewReader(f)) + r := wlog.NewReader(bufio.NewReader(f)) require.True(t, r.Next(), "reading the series record") require.True(t, r.Next(), "reading the first sample record") // Write an invalid record header to corrupt everything after the first wal sample. @@ -1515,9 +1515,9 @@ func TestSizeRetention(t *testing.T) { require.Equal(t, expSize, actSize, "registered size doesn't match actual disk size") // Create a WAL checkpoint, and compare sizes. - first, last, err := wal.Segments(db.Head().wal.Dir()) + first, last, err := wlog.Segments(db.Head().wal.Dir()) require.NoError(t, err) - _, err = wal.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x chunks.HeadSeriesRef) bool { return false }, 0) + _, err = wlog.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x chunks.HeadSeriesRef) bool { return false }, 0) require.NoError(t, err) blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err = db.Head().wal.Size() @@ -1923,7 +1923,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false) require.NoError(t, err) var enc record.Encoder @@ -1965,7 +1965,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 1000, 6000)) require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false) require.NoError(t, err) var enc record.Encoder @@ -2365,7 +2365,7 @@ func TestDBReadOnly(t *testing.T) { } // Add head to test DBReadOnly WAL reading capabilities. - w, err := wal.New(logger, nil, filepath.Join(dbDir, "wal"), true) + w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), true) require.NoError(t, err) h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir) require.NoError(t, h.Close()) @@ -3131,7 +3131,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.NoError(t, app.Commit()) // Check the existing WAL files. - first, last, err := wal.Segments(db.head.wal.Dir()) + first, last, err := wlog.Segments(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 0, first) require.Equal(t, 60, last) @@ -3146,14 +3146,14 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.Equal(t, 58.0, prom_testutil.ToFloat64(db.head.metrics.headTruncateTotal)) // The compaction should have only truncated first 2/3 of WAL (while also rotating the files). - first, last, err = wal.Segments(db.head.wal.Dir()) + first, last, err = wlog.Segments(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 40, first) require.Equal(t, 61, last) // The first checkpoint would be for first 2/3rd of WAL, hence till 39. // That should be the last checkpoint. - _, cno, err := wal.LastCheckpoint(db.head.wal.Dir()) + _, cno, err := wlog.LastCheckpoint(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 39, cno) @@ -3189,7 +3189,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.Equal(t, newBlockMaxt, db.head.MinTime()) // Another WAL file was rotated. - first, last, err = wal.Segments(db.head.wal.Dir()) + first, last, err = wlog.Segments(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 40, first) require.Equal(t, 62, last) @@ -3202,14 +3202,14 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.Equal(t, 59, len(db.Blocks())) // The compaction should have only truncated first 2/3 of WAL (while also rotating the files). - first, last, err = wal.Segments(db.head.wal.Dir()) + first, last, err = wlog.Segments(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 55, first) require.Equal(t, 63, last) // The first checkpoint would be for first 2/3rd of WAL, hence till 54. // That should be the last checkpoint. - _, cno, err = wal.LastCheckpoint(db.head.wal.Dir()) + _, cno, err = wlog.LastCheckpoint(db.head.wal.Dir()) require.NoError(t, err) require.Equal(t, 54, cno) } @@ -3657,9 +3657,9 @@ func TestOOOWALWrite(t *testing.T) { } getRecords := func(walDir string) []interface{} { - sr, err := wal.NewSegmentsReader(walDir) + sr, err := wlog.NewSegmentsReader(walDir) require.NoError(t, err) - r := wal.NewReader(sr) + r := wlog.NewReader(sr) defer func() { require.NoError(t, sr.Close()) }() @@ -3696,7 +3696,7 @@ func TestOOOWALWrite(t *testing.T) { require.Equal(t, inOrderRecords, actRecs) // The OOO WAL. - actRecs = getRecords(path.Join(dir, wal.WblDirName)) + actRecs = getRecords(path.Join(dir, wlog.WblDirName)) require.Equal(t, oooRecords, actRecs) } @@ -3890,16 +3890,16 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { require.NoError(t, app.Commit()) // Let's create a checkpoint. - first, last, err := wal.Segments(w.Dir()) + first, last, err := wlog.Segments(w.Dir()) require.NoError(t, err) keep := func(id chunks.HeadSeriesRef) bool { return id != 3 } - _, err = wal.Checkpoint(log.NewNopLogger(), w, first, last-1, keep, 0) + _, err = wlog.Checkpoint(log.NewNopLogger(), w, first, last-1, keep, 0) require.NoError(t, err) // Confirm there's been a checkpoint. - cdir, _, err := wal.LastCheckpoint(w.Dir()) + cdir, _, err := wlog.LastCheckpoint(w.Dir()) require.NoError(t, err) // Read in checkpoint and WAL. @@ -4647,7 +4647,7 @@ func TestOOODisabled(t *testing.T) { "number of ooo/oob samples mismatch") // Verifying that no OOO artifacts were generated. - _, err = os.ReadDir(path.Join(db.Dir(), wal.WblDirName)) + _, err = os.ReadDir(path.Join(db.Dir(), wlog.WblDirName)) require.True(t, os.IsNotExist(err)) ms, created, err := db.head.getOrCreate(s1.Hash(), s1) @@ -4812,12 +4812,12 @@ func TestWBLAndMmapReplay(t *testing.T) { resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above. // Removing m-map markers in WBL by rewriting it. - newWbl, err := wal.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), false) + newWbl, err := wlog.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), false) require.NoError(t, err) - sr, err := wal.NewSegmentsReader(originalWblDir) + sr, err := wlog.NewSegmentsReader(originalWblDir) require.NoError(t, err) var dec record.Decoder - r, markers, addedRecs := wal.NewReader(sr), 0, 0 + r, markers, addedRecs := wlog.NewReader(sr), 0, 0 for r.Next() { rec := r.Record() if dec.Type(rec) == record.MmapMarkers { diff --git a/tsdb/head.go b/tsdb/head.go index dff04e660..85baad680 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -42,7 +42,7 @@ import ( "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) var ( @@ -76,7 +76,7 @@ type Head struct { metrics *headMetrics opts *HeadOptions - wal, wbl *wal.WAL + wal, wbl *wlog.WL exemplarMetrics *ExemplarMetrics exemplars ExemplarStorage logger log.Logger @@ -191,7 +191,7 @@ type SeriesLifecycleCallback interface { } // NewHead opens the head block in dir. -func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *HeadOptions, stats *HeadStats) (*Head, error) { var err error if l == nil { l = log.NewNopLogger() @@ -612,13 +612,13 @@ func (h *Head) Init(minValidTime int64) error { checkpointReplayStart := time.Now() // Backfill the checkpoint first if it exists. - dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) + dir, startFrom, err := wlog.LastCheckpoint(h.wal.Dir()) if err != nil && err != record.ErrNotFound { return errors.Wrap(err, "find last checkpoint") } // Find the last segment. - _, endAt, e := wal.Segments(h.wal.Dir()) + _, endAt, e := wlog.Segments(h.wal.Dir()) if e != nil { return errors.Wrap(e, "finding WAL segments") } @@ -627,7 +627,7 @@ func (h *Head) Init(minValidTime int64) error { multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} if err == nil && startFrom >= snapIdx { - sr, err := wal.NewSegmentsReader(dir) + sr, err := wlog.NewSegmentsReader(dir) if err != nil { return errors.Wrap(err, "open checkpoint") } @@ -639,7 +639,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil { + if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil { return errors.Wrap(err, "backfill checkpoint") } h.updateWALReplayStatusRead(startFrom) @@ -655,7 +655,7 @@ func (h *Head) Init(minValidTime int64) error { } // Backfill segments from the most recent checkpoint onwards. for i := startFrom; i <= endAt; i++ { - s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i)) + s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wal.Dir(), i)) if err != nil { return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) } @@ -664,7 +664,7 @@ func (h *Head) Init(minValidTime int64) error { if i == snapIdx { offset = snapOffset } - sr, err := wal.NewSegmentBufReaderWithOffset(offset, s) + sr, err := wlog.NewSegmentBufReaderWithOffset(offset, s) if errors.Cause(err) == io.EOF { // File does not exist. continue @@ -672,7 +672,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return errors.Wrapf(err, "segment reader (offset=%d)", offset) } - err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks) + err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks) if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) } @@ -687,20 +687,20 @@ func (h *Head) Init(minValidTime int64) error { wblReplayStart := time.Now() if h.wbl != nil { // Replay OOO WAL. - startFrom, endAt, e = wal.Segments(h.wbl.Dir()) + startFrom, endAt, e = wlog.Segments(h.wbl.Dir()) if e != nil { return errors.Wrap(e, "finding OOO WAL segments") } h.startWALReplayStatus(startFrom, endAt) for i := startFrom; i <= endAt; i++ { - s, err := wal.OpenReadSegment(wal.SegmentName(h.wbl.Dir(), i)) + s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i)) if err != nil { return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i)) } - sr := wal.NewSegmentBufReader(s) - err = h.loadWBL(wal.NewReader(sr), multiRef, lastMmapRef) + sr := wlog.NewSegmentBufReader(s) + err = h.loadWBL(wlog.NewReader(sr), multiRef, lastMmapRef) if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err) } @@ -850,7 +850,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error) (map[chunks.HeadSeriesRef return mmappedChunks, oooMmappedChunks, lastRef, nil } -func (h *Head) ApplyConfig(cfg *config.Config, wbl *wal.WAL) { +func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL) { oooTimeWindow := int64(0) if cfg.StorageConfig.TSDBConfig != nil { oooTimeWindow = cfg.StorageConfig.TSDBConfig.OutOfOrderTimeWindow @@ -882,7 +882,7 @@ func (h *Head) ApplyConfig(cfg *config.Config, wbl *wal.WAL) { // SetOutOfOrderTimeWindow updates the out of order related parameters. // If the Head already has a WBL set, then the wbl will be ignored. -func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wal.WAL) { +func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wlog.WL) { if oooTimeWindow > 0 && h.wbl == nil { h.wbl = wbl } @@ -1115,7 +1115,7 @@ func (h *Head) truncateWAL(mint int64) error { start := time.Now() h.lastWALTruncationTime.Store(mint) - first, last, err := wal.Segments(h.wal.Dir()) + first, last, err := wlog.Segments(h.wal.Dir()) if err != nil { return errors.Wrap(err, "get segment range") } @@ -1147,9 +1147,9 @@ func (h *Head) truncateWAL(mint int64) error { return ok } h.metrics.checkpointCreationTotal.Inc() - if _, err = wal.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { + if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { h.metrics.checkpointCreationFail.Inc() - if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok { + if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok { h.metrics.walCorruptionsTotal.Inc() } return errors.Wrap(err, "create checkpoint") @@ -1172,7 +1172,7 @@ func (h *Head) truncateWAL(mint int64) error { h.deletedMtx.Unlock() h.metrics.checkpointDeleteTotal.Inc() - if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil { + if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil { // Leftover old checkpoints do not cause problems down the line beyond // occupying disk space. // They will just be ignored since a higher checkpoint exists. @@ -1415,7 +1415,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { h.tombstones.TruncateBefore(mint) if h.wal != nil { - _, last, _ := wal.Segments(h.wal.Dir()) + _, last, _ := wlog.Segments(h.wal.Dir()) h.deletedMtx.Lock() // Keep series records until we're past segment 'last' // because the WAL will still have samples records with diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b4e12446e..542e2303a 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -48,12 +48,12 @@ import ( "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) -func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wal.WAL) { +func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wlog.WL) { dir := t.TempDir() - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) require.NoError(t, err) opts := DefaultHeadOptions() @@ -66,14 +66,14 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) ( opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) } - h, err := NewHead(nil, nil, wlog, nil, opts, nil) + h, err := NewHead(nil, nil, wal, nil, opts, nil) require.NoError(t, err) require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return nil })) - return h, wlog + return h, wal } func BenchmarkCreateSeries(b *testing.B) { @@ -91,7 +91,7 @@ func BenchmarkCreateSeries(b *testing.B) { } } -func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { +func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) { var enc record.Encoder for _, r := range recs { switch v := r.(type) { @@ -108,12 +108,12 @@ func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { } func readTestWAL(t testing.TB, dir string) (recs []interface{}) { - sr, err := wal.NewSegmentsReader(dir) + sr, err := wlog.NewSegmentsReader(dir) require.NoError(t, err) defer sr.Close() var dec record.Decoder - r := wal.NewReader(sr) + r := wlog.NewReader(sr) for r.Next() { rec := r.Record() @@ -192,7 +192,7 @@ func BenchmarkLoadWAL(b *testing.B) { func(b *testing.B) { dir := b.TempDir() - w, err := wal.New(nil, nil, dir, false) + w, err := wlog.New(nil, nil, dir, false) require.NoError(b, err) // Write series. @@ -574,7 +574,7 @@ func TestHead_WALMultiRef(t *testing.T) { require.NotEqual(t, ref1, ref2, "Refs are the same") require.NoError(t, head.Close()) - w, err = wal.New(nil, nil, w.Dir(), false) + w, err = wlog.New(nil, nil, w.Dir(), false) require.NoError(t, err) opts := DefaultHeadOptions() @@ -882,7 +882,7 @@ func TestHeadDeleteSimple(t *testing.T) { require.NoError(t, app.Commit()) // Compare the samples for both heads - before and after the reloadBlocks. - reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks. + reloadedW, err := wlog.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks. require.NoError(t, err) opts := DefaultHeadOptions() opts.ChunkRange = 1000 @@ -1003,7 +1003,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { require.NoError(t, hb.Close()) // Confirm there's been a checkpoint. - cdir, _, err := wal.LastCheckpoint(w.Dir()) + cdir, _, err := wlog.LastCheckpoint(w.Dir()) require.NoError(t, err) // Read in checkpoint and WAL. recs := readTestWAL(t, cdir) @@ -1650,7 +1650,7 @@ func TestWalRepair_DecodingError(t *testing.T) { // Fill the wal and corrupt it. { - w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress) + w, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compress) require.NoError(t, err) for i := 1; i <= test.totalRecs; i++ { @@ -1671,7 +1671,7 @@ func TestWalRepair_DecodingError(t *testing.T) { initErr := h.Init(math.MinInt64) err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. - _, corrErr := err.(*wal.CorruptionErr) + _, corrErr := err.(*wlog.CorruptionErr) require.True(t, corrErr, "reading the wal didn't return corruption error") require.NoError(t, h.Close()) // Head will close the wal as well. } @@ -1688,10 +1688,10 @@ func TestWalRepair_DecodingError(t *testing.T) { // Read the wal content after the repair. { - sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) + sr, err := wlog.NewSegmentsReader(filepath.Join(dir, "wal")) require.NoError(t, err) defer sr.Close() - r := wal.NewReader(sr) + r := wlog.NewReader(sr) var actRec int for r.Next() { @@ -1713,7 +1713,7 @@ func TestHeadReadWriterRepair(t *testing.T) { walDir := filepath.Join(dir, "wal") // Fill the chunk segments and corrupt it. { - w, err := wal.New(nil, nil, walDir, false) + w, err := wlog.New(nil, nil, walDir, false) require.NoError(t, err) opts := DefaultHeadOptions() @@ -1775,7 +1775,7 @@ func TestHeadReadWriterRepair(t *testing.T) { } func TestNewWalSegmentOnTruncate(t *testing.T) { - h, wlog := newTestHead(t, 1000, false, false) + h, wal := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1787,19 +1787,19 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } add(0) - _, last, err := wal.Segments(wlog.Dir()) + _, last, err := wlog.Segments(wal.Dir()) require.NoError(t, err) require.Equal(t, 0, last) add(1) require.NoError(t, h.Truncate(1)) - _, last, err = wal.Segments(wlog.Dir()) + _, last, err = wlog.Segments(wal.Dir()) require.NoError(t, err) require.Equal(t, 1, last) add(2) require.NoError(t, h.Truncate(2)) - _, last, err = wal.Segments(wlog.Dir()) + _, last, err = wlog.Segments(wal.Dir()) require.NoError(t, err) require.Equal(t, 2, last) } @@ -1954,12 +1954,12 @@ func TestMemSeriesIsolation(t *testing.T) { i = addSamples(hb) require.NoError(t, hb.Close()) - wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false) + wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, false) require.NoError(t, err) opts := DefaultHeadOptions() opts.ChunkRange = 1000 - opts.ChunkDirRoot = wlog.Dir() - hb, err = NewHead(nil, nil, wlog, nil, opts, nil) + opts.ChunkDirRoot = wal.Dir() + hb, err = NewHead(nil, nil, wal, nil, opts, nil) defer func() { require.NoError(t, hb.Close()) }() require.NoError(t, err) require.NoError(t, hb.Init(0)) @@ -3021,7 +3021,7 @@ func TestChunkSnapshot(t *testing.T) { } openHeadAndCheckReplay := func() { - w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -3228,7 +3228,7 @@ func TestSnapshotError(t *testing.T) { require.NoError(t, f.Close()) // Create new Head which should replay this snapshot. - w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) require.NoError(t, err) // Testing https://github.com/prometheus/prometheus/issues/9437 with the registry. head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil) @@ -3613,7 +3613,7 @@ loop: // Tests https://github.com/prometheus/prometheus/issues/9725. func TestChunkSnapshotReplayBug(t *testing.T) { dir := t.TempDir() - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) require.NoError(t, err) // Write few series records and samples such that the series references are not in order in the WAL @@ -3640,10 +3640,10 @@ func TestChunkSnapshotReplayBug(t *testing.T) { rec := enc.Series([]record.RefSeries{seriesRec}, buf) buf = rec[:0] - require.NoError(t, wlog.Log(rec)) + require.NoError(t, wal.Log(rec)) rec = enc.Samples([]record.RefSample{samplesRec}, buf) buf = rec[:0] - require.NoError(t, wlog.Log(rec)) + require.NoError(t, wal.Log(rec)) } // Write a corrupt snapshot to fail the replay on startup. @@ -3657,7 +3657,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkDirRoot = dir opts.EnableMemorySnapshotOnShutdown = true - head, err := NewHead(nil, nil, wlog, nil, opts, nil) + head, err := NewHead(nil, nil, wal, nil, opts, nil) require.NoError(t, err) require.NoError(t, head.Init(math.MinInt64)) defer func() { @@ -3680,7 +3680,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) { func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { dir := t.TempDir() - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) require.NoError(t, err) // Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots. @@ -3691,7 +3691,7 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkDirRoot = dir opts.EnableMemorySnapshotOnShutdown = true - head, err := NewHead(nil, nil, wlog, nil, opts, nil) + head, err := NewHead(nil, nil, wlTemp, nil, opts, nil) require.NoError(t, err) require.NoError(t, head.Init(math.MinInt64)) @@ -3718,9 +3718,9 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { // TODO(codesome): Needs test for ooo WAL repair. func TestOOOWalReplay(t *testing.T) { dir := t.TempDir() - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) require.NoError(t, err) - oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) require.NoError(t, err) opts := DefaultHeadOptions() @@ -3728,7 +3728,7 @@ func TestOOOWalReplay(t *testing.T) { opts.ChunkDirRoot = dir opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds()) - h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil) + h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) require.NoError(t, h.Init(0)) @@ -3765,11 +3765,11 @@ func TestOOOWalReplay(t *testing.T) { // Restart head. require.NoError(t, h.Close()) - wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) require.NoError(t, err) - oooWlog, err = wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) + oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) require.NoError(t, err) - h, err = NewHead(nil, nil, wlog, oooWlog, opts, nil) + h, err = NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) require.NoError(t, h.Init(0)) // Replay happens here. @@ -3802,9 +3802,9 @@ func TestOOOWalReplay(t *testing.T) { // TestOOOMmapReplay checks the replay at a low level. func TestOOOMmapReplay(t *testing.T) { dir := t.TempDir() - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) require.NoError(t, err) - oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) require.NoError(t, err) opts := DefaultHeadOptions() @@ -3813,7 +3813,7 @@ func TestOOOMmapReplay(t *testing.T) { opts.OutOfOrderCapMax.Store(30) opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds()) - h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil) + h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) require.NoError(t, h.Init(0)) @@ -3853,11 +3853,11 @@ func TestOOOMmapReplay(t *testing.T) { // Restart head. require.NoError(t, h.Close()) - wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) require.NoError(t, err) - oooWlog, err = wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) + oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) require.NoError(t, err) - h, err = NewHead(nil, nil, wlog, oooWlog, opts, nil) + h, err = NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) require.NoError(t, h.Init(0)) // Replay happens here. @@ -3927,9 +3927,9 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { require.NoError(t, h.Close()) - wlog, err := wal.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false) + wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false) require.NoError(t, err) - h, err = NewHead(nil, nil, wlog, nil, h.opts, nil) + h, err = NewHead(nil, nil, wal, nil, h.opts, nil) require.NoError(t, err) require.NoError(t, h.Init(0)) @@ -3962,7 +3962,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding { // Tests https://github.com/prometheus/prometheus/issues/10277. func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { dir := t.TempDir() - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) require.NoError(t, err) opts := DefaultHeadOptions() @@ -3971,7 +3971,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { opts.EnableExemplarStorage = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) - h, err := NewHead(nil, nil, wlog, nil, opts, nil) + h, err := NewHead(nil, nil, wal, nil, opts, nil) require.NoError(t, err) require.NoError(t, h.Init(0)) @@ -3995,7 +3995,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { addChunks() require.NoError(t, h.Close()) - wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) require.NoError(t, err) mmapFilePath := filepath.Join(dir, "chunks_head", "000001") @@ -4005,7 +4005,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - h, err = NewHead(nil, nil, wlog, nil, opts, nil) + h, err = NewHead(nil, nil, wal, nil, opts, nil) require.NoError(t, err) require.NoError(t, h.Init(0)) @@ -4021,7 +4021,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) { var err error openHead := func() { - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) require.NoError(t, err) opts := DefaultHeadOptions() @@ -4030,7 +4030,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) { opts.EnableMemorySnapshotOnShutdown = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) - h, err = NewHead(nil, nil, wlog, nil, opts, nil) + h, err = NewHead(nil, nil, wal, nil, opts, nil) require.NoError(t, err) require.NoError(t, h.Init(0)) } @@ -4228,9 +4228,9 @@ func generateBigTestHistograms(n int) []*histogram.Histogram { func TestOOOAppendWithNoSeries(t *testing.T) { dir := t.TempDir() - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) require.NoError(t, err) - oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) require.NoError(t, err) opts := DefaultHeadOptions() @@ -4238,7 +4238,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) { opts.OutOfOrderCapMax.Store(30) opts.OutOfOrderTimeWindow.Store(120 * time.Minute.Milliseconds()) - h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil) + h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, h.Close()) @@ -4309,16 +4309,16 @@ func TestOOOAppendWithNoSeries(t *testing.T) { func TestHeadMinOOOTimeUpdate(t *testing.T) { dir := t.TempDir() - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) require.NoError(t, err) - oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) require.NoError(t, err) opts := DefaultHeadOptions() opts.ChunkDirRoot = dir opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) - h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil) + h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, h.Close()) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index e92d02085..7628461e1 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -39,10 +39,10 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) -func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { +func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 @@ -99,7 +99,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H defer func() { // For CorruptionErr ensure to terminate all workers before exiting. - _, ok := err.(*wal.CorruptionErr) + _, ok := err.(*wlog.CorruptionErr) if ok || seriesCreationErr != nil { for i := 0; i < n; i++ { processors[i].closeAndDrain() @@ -156,7 +156,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H series := seriesPool.Get().([]record.RefSeries)[:0] series, err = dec.Series(rec, series) if err != nil { - decodeErr = &wal.CorruptionErr{ + decodeErr = &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode series"), Segment: r.Segment(), Offset: r.Offset(), @@ -168,7 +168,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H samples := samplesPool.Get().([]record.RefSample)[:0] samples, err = dec.Samples(rec, samples) if err != nil { - decodeErr = &wal.CorruptionErr{ + decodeErr = &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode samples"), Segment: r.Segment(), Offset: r.Offset(), @@ -180,7 +180,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H tstones := tstonesPool.Get().([]tombstones.Stone)[:0] tstones, err = dec.Tombstones(rec, tstones) if err != nil { - decodeErr = &wal.CorruptionErr{ + decodeErr = &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode tombstones"), Segment: r.Segment(), Offset: r.Offset(), @@ -192,7 +192,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0] exemplars, err = dec.Exemplars(rec, exemplars) if err != nil { - decodeErr = &wal.CorruptionErr{ + decodeErr = &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode exemplars"), Segment: r.Segment(), Offset: r.Offset(), @@ -216,7 +216,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H meta := metadataPool.Get().([]record.RefMetadata)[:0] meta, err := dec.Metadata(rec, meta) if err != nil { - decodeErr = &wal.CorruptionErr{ + decodeErr = &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode metadata"), Segment: r.Segment(), Offset: r.Offset(), @@ -596,7 +596,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks } -func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { +func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { // Track number of samples, m-map markers, that referenced a series we don't know about // for error reporting. var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64 @@ -628,7 +628,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H defer func() { // For CorruptionErr ensure to terminate all workers before exiting. // We also wrap it to identify OOO WBL corruption. - _, ok := err.(*wal.CorruptionErr) + _, ok := err.(*wlog.CorruptionErr) if ok { err = &errLoadWbl{err: err} for i := 0; i < n; i++ { @@ -658,7 +658,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H samples := samplesPool.Get().([]record.RefSample)[:0] samples, err = dec.Samples(rec, samples) if err != nil { - decodeErr = &wal.CorruptionErr{ + decodeErr = &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode samples"), Segment: r.Segment(), Offset: r.Offset(), @@ -670,7 +670,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H markers := markersPool.Get().([]record.RefMmapMarker)[:0] markers, err = dec.MmapMarkers(rec, markers) if err != nil { - decodeErr = &wal.CorruptionErr{ + decodeErr = &wlog.CorruptionErr{ Err: errors.Wrap(err, "decode mmap markers"), Segment: r.Segment(), Offset: r.Offset(), @@ -1046,7 +1046,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return stats, errors.Wrap(err, "create chunk snapshot dir") } - cp, err := wal.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled()) + cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled()) if err != nil { return stats, errors.Wrap(err, "open chunk snapshot") } @@ -1285,7 +1285,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie } start := time.Now() - sr, err := wal.NewSegmentsReader(dir) + sr, err := wlog.NewSegmentsReader(dir) if err != nil { return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot") } @@ -1356,7 +1356,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie }(i, recordChan) } - r := wal.NewReader(sr) + r := wlog.NewReader(sr) var loopErr error Outer: for r.Next() { diff --git a/tsdb/wal.go b/tsdb/wal.go index 615903c63..03043c781 100644 --- a/tsdb/wal.go +++ b/tsdb/wal.go @@ -37,7 +37,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) // WALEntryType indicates what data a WAL entry contains. @@ -89,7 +89,7 @@ func newWalMetrics(r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. // -// DEPRECATED: use wal pkg combined with the record codex instead. +// DEPRECATED: use wlog pkg combined with the record codex instead. type WAL interface { Reader() WALReader LogSeries([]record.RefSeries) error @@ -146,7 +146,7 @@ func newCRC32() hash.Hash32 { // SegmentWAL is a write ahead log for series data. // -// DEPRECATED: use wal pkg combined with the record coders instead. +// DEPRECATED: use wlog pkg combined with the record coders instead. type SegmentWAL struct { mtx sync.Mutex metrics *walMetrics @@ -1229,7 +1229,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err := os.RemoveAll(tmpdir); err != nil { return errors.Wrap(err, "cleanup replacement dir") } - repl, err := wal.New(logger, nil, tmpdir, false) + repl, err := wlog.New(logger, nil, tmpdir, false) if err != nil { return errors.Wrap(err, "open new WAL") } diff --git a/tsdb/wal_test.go b/tsdb/wal_test.go index 325e65a92..da242b875 100644 --- a/tsdb/wal_test.go +++ b/tsdb/wal_test.go @@ -34,7 +34,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" - "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/tsdb/wlog" ) func TestSegmentWAL_cut(t *testing.T) { @@ -450,7 +450,7 @@ func TestMigrateWAL_Empty(t *testing.T) { wdir := path.Join(dir, "wal") // Initialize empty WAL. - w, err := wal.New(nil, nil, wdir, false) + w, err := wlog.New(nil, nil, wdir, false) require.NoError(t, err) require.NoError(t, w.Close()) @@ -493,7 +493,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) { // Perform migration. require.NoError(t, MigrateWAL(nil, wdir)) - w, err := wal.New(nil, nil, wdir, false) + w, err := wlog.New(nil, nil, wdir, false) require.NoError(t, err) // We can properly write some new data after migration. @@ -505,10 +505,10 @@ func TestMigrateWAL_Fuzz(t *testing.T) { require.NoError(t, w.Close()) // Read back all data. - sr, err := wal.NewSegmentsReader(wdir) + sr, err := wlog.NewSegmentsReader(wdir) require.NoError(t, err) - r := wal.NewReader(sr) + r := wlog.NewReader(sr) var res []interface{} var dec record.Decoder diff --git a/tsdb/wal/checkpoint.go b/tsdb/wlog/checkpoint.go similarity index 98% rename from tsdb/wal/checkpoint.go rename to tsdb/wlog/checkpoint.go index d892d720a..42b03f48f 100644 --- a/tsdb/wal/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "fmt" @@ -93,7 +93,7 @@ const checkpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser diff --git a/tsdb/wal/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go similarity index 97% rename from tsdb/wal/checkpoint_test.go rename to tsdb/wlog/checkpoint_test.go index 920c45af3..22f577efd 100644 --- a/tsdb/wal/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "fmt" @@ -301,7 +301,7 @@ func TestCheckpoint(t *testing.T) { } func TestCheckpointNoTmpFolderAfterError(t *testing.T) { - // Create a new wal with invalid data. + // Create a new wlog with invalid data. dir := t.TempDir() w, err := NewSize(nil, nil, dir, 64*1024, false) require.NoError(t, err) @@ -318,17 +318,17 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - // Run the checkpoint and since the wal contains corrupt data this should return an error. + // Run the checkpoint and since the wlog contains corrupt data this should return an error. _, err = Checkpoint(log.NewNopLogger(), w, 0, 1, nil, 0) require.Error(t, err) - // Walk the wal dir to make sure there are no tmp folder left behind after the error. + // Walk the wlog dir to make sure there are no tmp folder left behind after the error. err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { if err != nil { return errors.Wrapf(err, "access err %q: %v", path, err) } if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { - return fmt.Errorf("wal dir contains temporary folder:%s", info.Name()) + return fmt.Errorf("wlog dir contains temporary folder:%s", info.Name()) } return nil }) diff --git a/tsdb/wal/live_reader.go b/tsdb/wlog/live_reader.go similarity index 99% rename from tsdb/wal/live_reader.go rename to tsdb/wlog/live_reader.go index f09d149aa..fd949a963 100644 --- a/tsdb/wal/live_reader.go +++ b/tsdb/wlog/live_reader.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "encoding/binary" diff --git a/tsdb/wal/reader.go b/tsdb/wlog/reader.go similarity index 99% rename from tsdb/wal/reader.go rename to tsdb/wlog/reader.go index 7612f8775..e2b50d4b2 100644 --- a/tsdb/wal/reader.go +++ b/tsdb/wlog/reader.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "encoding/binary" diff --git a/tsdb/wal/reader_test.go b/tsdb/wlog/reader_test.go similarity index 99% rename from tsdb/wal/reader_test.go rename to tsdb/wlog/reader_test.go index 191e54636..97d251b3a 100644 --- a/tsdb/wal/reader_test.go +++ b/tsdb/wlog/reader_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "bytes" @@ -240,7 +240,7 @@ func TestReader_Live(t *testing.T) { const fuzzLen = 500 -func generateRandomEntries(w *WAL, records chan []byte) error { +func generateRandomEntries(w *WL, records chan []byte) error { var recs [][]byte for i := 0; i < fuzzLen; i++ { var sz int64 diff --git a/tsdb/wal/watcher.go b/tsdb/wlog/watcher.go similarity index 99% rename from tsdb/wal/watcher.go rename to tsdb/wlog/watcher.go index 1cf8f5311..5d7c84d34 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wlog/watcher.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "fmt" @@ -304,7 +304,7 @@ func (w *Watcher) firstAndLast() (int, int, error) { return refs[0], refs[len(refs)-1], nil } -// Copied from tsdb/wal/wal.go so we do not have to open a WAL. +// Copied from tsdb/wlog/wlog.go so we do not have to open a WAL. // Plan is to move WAL watcher to TSDB and dedupe these implementations. func (w *Watcher) segments(dir string) ([]int, error) { files, err := os.ReadDir(dir) diff --git a/tsdb/wal/watcher_test.go b/tsdb/wlog/watcher_test.go similarity index 99% rename from tsdb/wal/watcher_test.go rename to tsdb/wlog/watcher_test.go index 7a70c1756..545cc338e 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -10,7 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "fmt" diff --git a/tsdb/wal/wal.go b/tsdb/wlog/wlog.go similarity index 92% rename from tsdb/wal/wal.go rename to tsdb/wlog/wlog.go index 191b09ed9..5ae308d4e 100644 --- a/tsdb/wal/wal.go +++ b/tsdb/wlog/wlog.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "bufio" @@ -133,7 +133,7 @@ func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) { // If it was torn mid-record, a full read (which the caller should do anyway // to ensure integrity) will detect it as a corruption by the end. if d := stat.Size() % pageSize; d != 0 { - level.Warn(logger).Log("msg", "Last page of the wal is torn, filling it with zeros", "segment", segName) + level.Warn(logger).Log("msg", "Last page of the wlog is torn, filling it with zeros", "segment", segName) if _, err := f.Write(make([]byte, pageSize-d)); err != nil { f.Close() return nil, errors.Wrap(err, "zero-pad torn page") @@ -164,7 +164,7 @@ func OpenReadSegment(fn string) (*Segment, error) { return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil } -// WAL is a write ahead log that stores records in segment files. +// WL is a write log that stores records in segment files. // It must be read from start to end once before logging new data. // If an error occurs during read, the repair procedure must be called // before it's safe to do further writes. @@ -174,7 +174,7 @@ func OpenReadSegment(fn string) (*Segment, error) { // Records are never split across segments to allow full segments to be // safely truncated. It also ensures that torn writes never corrupt records // beyond the most recent segment. -type WAL struct { +type WL struct { dir string logger log.Logger segmentSize int @@ -188,10 +188,10 @@ type WAL struct { compress bool snappyBuf []byte - metrics *walMetrics + metrics *wlMetrics } -type walMetrics struct { +type wlMetrics struct { fsyncDuration prometheus.Summary pageFlushes prometheus.Counter pageCompletions prometheus.Counter @@ -201,12 +201,12 @@ type walMetrics struct { writesFailed prometheus.Counter } -func newWALMetrics(r prometheus.Registerer) *walMetrics { - m := &walMetrics{} +func newWLMetrics(r prometheus.Registerer) *wlMetrics { + m := &wlMetrics{} m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "fsync_duration_seconds", - Help: "Duration of WAL fsync.", + Help: "Duration of write log fsync.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) m.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ @@ -219,19 +219,19 @@ func newWALMetrics(r prometheus.Registerer) *walMetrics { }) m.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{ Name: "truncations_failed_total", - Help: "Total number of WAL truncations that failed.", + Help: "Total number of write log truncations that failed.", }) m.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{ Name: "truncations_total", - Help: "Total number of WAL truncations attempted.", + Help: "Total number of write log truncations attempted.", }) m.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "segment_current", - Help: "WAL segment index that TSDB is currently writing to.", + Help: "Write log segment index that TSDB is currently writing to.", }) m.writesFailed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "writes_failed_total", - Help: "Total number of WAL writes that failed.", + Help: "Total number of write log writes that failed.", }) if r != nil { @@ -250,13 +250,13 @@ func newWALMetrics(r prometheus.Registerer) *walMetrics { } // New returns a new WAL over the given directory. -func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) { +func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WL, error) { return NewSize(logger, reg, dir, DefaultSegmentSize, compress) } -// NewSize returns a new WAL over the given directory. +// NewSize returns a new write log over the given directory. // New segments are created with the specified size. -func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) { +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -266,7 +266,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi if logger == nil { logger = log.NewNopLogger() } - w := &WAL{ + w := &WL{ dir: dir, logger: logger, segmentSize: segmentSize, @@ -277,9 +277,9 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi } prefix := "prometheus_tsdb_wal_" if filepath.Base(dir) == WblDirName { - prefix = "prometheus_tsdb_out_of_order_wal_" + prefix = "prometheus_tsdb_out_of_order_wbl_" } - w.metrics = newWALMetrics(prometheus.WrapRegistererWithPrefix(prefix, reg)) + w.metrics = newWLMetrics(prometheus.WrapRegistererWithPrefix(prefix, reg)) _, last, err := Segments(w.Dir()) if err != nil { @@ -308,11 +308,11 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi } // Open an existing WAL. -func Open(logger log.Logger, dir string) (*WAL, error) { +func Open(logger log.Logger, dir string) (*WL, error) { if logger == nil { logger = log.NewNopLogger() } - w := &WAL{ + w := &WL{ dir: dir, logger: logger, } @@ -321,16 +321,16 @@ func Open(logger log.Logger, dir string) (*WAL, error) { } // CompressionEnabled returns if compression is enabled on this WAL. -func (w *WAL) CompressionEnabled() bool { +func (w *WL) CompressionEnabled() bool { return w.compress } // Dir returns the directory of the WAL. -func (w *WAL) Dir() string { +func (w *WL) Dir() string { return w.dir } -func (w *WAL) run() { +func (w *WL) run() { Loop: for { select { @@ -350,7 +350,7 @@ Loop: // Repair attempts to repair the WAL based on the error. // It discards all data after the corruption. -func (w *WAL) Repair(origErr error) error { +func (w *WL) Repair(origErr error) error { // We could probably have a mode that only discards torn records right around // the corruption to preserve as data much as possible. // But that's not generally applicable if the records have any kind of causality. @@ -466,7 +466,7 @@ func SegmentName(dir string, i int) string { // NextSegment creates the next segment and closes the previous one asynchronously. // It returns the file number of the new file. -func (w *WAL) NextSegment() (int, error) { +func (w *WL) NextSegment() (int, error) { w.mtx.Lock() defer w.mtx.Unlock() return w.nextSegment(true) @@ -474,7 +474,7 @@ func (w *WAL) NextSegment() (int, error) { // NextSegmentSync creates the next segment and closes the previous one in sync. // It returns the file number of the new file. -func (w *WAL) NextSegmentSync() (int, error) { +func (w *WL) NextSegmentSync() (int, error) { w.mtx.Lock() defer w.mtx.Unlock() return w.nextSegment(false) @@ -482,9 +482,9 @@ func (w *WAL) NextSegmentSync() (int, error) { // nextSegment creates the next segment and closes the previous one. // It returns the file number of the new file. -func (w *WAL) nextSegment(async bool) (int, error) { +func (w *WL) nextSegment(async bool) (int, error) { if w.closed { - return 0, errors.New("wal is closed") + return 0, errors.New("wlog is closed") } // Only flush the current page if it actually holds data. @@ -519,7 +519,7 @@ func (w *WAL) nextSegment(async bool) (int, error) { return next.Index(), nil } -func (w *WAL) setSegment(segment *Segment) error { +func (w *WL) setSegment(segment *Segment) error { w.segment = segment // Correctly initialize donePages. @@ -535,7 +535,7 @@ func (w *WAL) setSegment(segment *Segment) error { // flushPage writes the new contents of the page to disk. If no more records will fit into // the page, the remaining bytes will be set to zero and a new page will be started. // If clear is true, this is enforced regardless of how many bytes are left in the page. -func (w *WAL) flushPage(clear bool) error { +func (w *WL) flushPage(clear bool) error { w.metrics.pageFlushes.Inc() p := w.page @@ -601,13 +601,13 @@ func (t recType) String() string { } } -func (w *WAL) pagesPerSegment() int { +func (w *WL) pagesPerSegment() int { return w.segmentSize / pageSize } // Log writes the records into the log. // Multiple records can be passed at once to reduce writes and increase throughput. -func (w *WAL) Log(recs ...[]byte) error { +func (w *WL) Log(recs ...[]byte) error { w.mtx.Lock() defer w.mtx.Unlock() // Callers could just implement their own list record format but adding @@ -625,7 +625,7 @@ func (w *WAL) Log(recs ...[]byte) error { // - the final record of a batch // - the record is bigger than the page size // - the current page is full. -func (w *WAL) log(rec []byte, final bool) error { +func (w *WL) log(rec []byte, final bool) error { // When the last page flush failed the page will remain full. // When the page is full, need to flush it before trying to add more records to it. if w.page.full() { @@ -721,7 +721,7 @@ func (w *WAL) log(rec []byte, final bool) error { // LastSegmentAndOffset returns the last segment number of the WAL // and the offset in that file upto which the segment has been filled. -func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) { +func (w *WL) LastSegmentAndOffset() (seg, offset int, err error) { w.mtx.Lock() defer w.mtx.Unlock() @@ -736,7 +736,7 @@ func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) { } // Truncate drops all segments before i. -func (w *WAL) Truncate(i int) (err error) { +func (w *WL) Truncate(i int) (err error) { w.metrics.truncateTotal.Inc() defer func() { if err != nil { @@ -758,27 +758,27 @@ func (w *WAL) Truncate(i int) (err error) { return nil } -func (w *WAL) fsync(f *Segment) error { +func (w *WL) fsync(f *Segment) error { start := time.Now() err := f.Sync() w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) return err } -// Sync forces a file sync on the current wal segment. This function is meant +// Sync forces a file sync on the current write log segment. This function is meant // to be used only on tests due to different behaviour on Operating Systems // like windows and linux -func (w *WAL) Sync() error { +func (w *WL) Sync() error { return w.fsync(w.segment) } // Close flushes all writes and closes active segment. -func (w *WAL) Close() (err error) { +func (w *WL) Close() (err error) { w.mtx.Lock() defer w.mtx.Unlock() if w.closed { - return errors.New("wal already closed") + return errors.New("wlog already closed") } if w.segment == nil { @@ -811,8 +811,8 @@ func (w *WAL) Close() (err error) { // Segments returns the range [first, n] of currently existing segments. // If no segments are found, first and n are -1. -func Segments(walDir string) (first, last int, err error) { - refs, err := listSegments(walDir) +func Segments(wlDir string) (first, last int, err error) { + refs, err := listSegments(wlDir) if err != nil { return 0, 0, err } @@ -979,8 +979,8 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) { return n, nil } -// Computing size of the WAL. +// Size computes the size of the write log. // We do this by adding the sizes of all the files under the WAL dir. -func (w *WAL) Size() (int64, error) { +func (w *WL) Size() (int64, error) { return fileutil.DirSize(w.Dir()) } diff --git a/tsdb/wal/wal_test.go b/tsdb/wlog/wlog_test.go similarity index 99% rename from tsdb/wal/wal_test.go rename to tsdb/wlog/wlog_test.go index 55cd6caa1..ed8a9df2e 100644 --- a/tsdb/wal/wal_test.go +++ b/tsdb/wlog/wlog_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package wlog import ( "bytes" @@ -137,7 +137,7 @@ func TestWALRepair_ReadingError(t *testing.T) { } first, last, err := Segments(w.Dir()) require.NoError(t, err) - require.Equal(t, 3, 1+last-first, "wal creation didn't result in expected number of segments") + require.Equal(t, 3, 1+last-first, "wlog creation didn't result in expected number of segments") require.NoError(t, w.Close()) diff --git a/web/ui/module/codemirror-promql/package.json b/web/ui/module/codemirror-promql/package.json index e5e11fc21..544a10698 100644 --- a/web/ui/module/codemirror-promql/package.json +++ b/web/ui/module/codemirror-promql/package.json @@ -1,6 +1,6 @@ { "name": "@prometheus-io/codemirror-promql", - "version": "0.39.0", + "version": "0.39.1", "description": "a CodeMirror mode for the PromQL language", "types": "dist/esm/index.d.ts", "module": "dist/esm/index.js", @@ -29,7 +29,7 @@ }, "homepage": "https://github.com/prometheus/prometheus/blob/main/web/ui/module/codemirror-promql/README.md", "dependencies": { - "@prometheus-io/lezer-promql": "^0.39.0", + "@prometheus-io/lezer-promql": "^0.39.1", "lru-cache": "^6.0.0" }, "devDependencies": { diff --git a/web/ui/module/lezer-promql/package.json b/web/ui/module/lezer-promql/package.json index 5df04782e..8d0bdea45 100644 --- a/web/ui/module/lezer-promql/package.json +++ b/web/ui/module/lezer-promql/package.json @@ -1,6 +1,6 @@ { "name": "@prometheus-io/lezer-promql", - "version": "0.39.0", + "version": "0.39.1", "description": "lezer-based PromQL grammar", "main": "index.cjs", "type": "module", diff --git a/web/ui/package-lock.json b/web/ui/package-lock.json index d58ca378d..e75ca42cf 100644 --- a/web/ui/package-lock.json +++ b/web/ui/package-lock.json @@ -28,10 +28,10 @@ }, "module/codemirror-promql": { "name": "@prometheus-io/codemirror-promql", - "version": "0.39.0", + "version": "0.39.1", "license": "Apache-2.0", "dependencies": { - "@prometheus-io/lezer-promql": "^0.39.0", + "@prometheus-io/lezer-promql": "^0.39.1", "lru-cache": "^6.0.0" }, "devDependencies": { @@ -61,7 +61,7 @@ }, "module/lezer-promql": { "name": "@prometheus-io/lezer-promql", - "version": "0.39.0", + "version": "0.39.1", "license": "Apache-2.0", "devDependencies": { "@lezer/generator": "^1.1.1", @@ -17625,7 +17625,7 @@ }, "react-app": { "name": "@prometheus-io/app", - "version": "0.39.0", + "version": "0.39.1", "dependencies": { "@codemirror/autocomplete": "^6.2.0", "@codemirror/commands": "^6.1.0", @@ -17643,7 +17643,7 @@ "@lezer/lr": "^1.2.3", "@nexucis/fuzzy": "^0.4.1", "@nexucis/kvsearch": "^0.8.1", - "@prometheus-io/codemirror-promql": "^0.39.0", + "@prometheus-io/codemirror-promql": "^0.39.1", "bootstrap": "^4.6.2", "css.escape": "^1.5.1", "downshift": "^6.1.11", @@ -19883,7 +19883,7 @@ "@lezer/lr": "^1.2.3", "@nexucis/fuzzy": "^0.4.1", "@nexucis/kvsearch": "^0.8.1", - "@prometheus-io/codemirror-promql": "^0.39.0", + "@prometheus-io/codemirror-promql": "^0.39.1", "@testing-library/react-hooks": "^7.0.2", "@types/enzyme": "^3.10.12", "@types/flot": "0.0.32", @@ -19935,7 +19935,7 @@ "@lezer/common": "^1.0.1", "@lezer/highlight": "^1.1.0", "@lezer/lr": "^1.2.3", - "@prometheus-io/lezer-promql": "^0.39.0", + "@prometheus-io/lezer-promql": "^0.39.1", "@types/lru-cache": "^5.1.1", "isomorphic-fetch": "^3.0.0", "lru-cache": "^6.0.0", diff --git a/web/ui/react-app/package.json b/web/ui/react-app/package.json index 8e80ca253..8e7b103a0 100644 --- a/web/ui/react-app/package.json +++ b/web/ui/react-app/package.json @@ -1,6 +1,6 @@ { "name": "@prometheus-io/app", - "version": "0.39.0", + "version": "0.39.1", "private": true, "dependencies": { "@codemirror/autocomplete": "^6.2.0", @@ -19,7 +19,7 @@ "@lezer/common": "^1.0.1", "@nexucis/fuzzy": "^0.4.1", "@nexucis/kvsearch": "^0.8.1", - "@prometheus-io/codemirror-promql": "^0.39.0", + "@prometheus-io/codemirror-promql": "^0.39.1", "bootstrap": "^4.6.2", "css.escape": "^1.5.1", "downshift": "^6.1.11",