mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Merge remote-tracking branch 'upstream/main' into fix-conflict
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
commit
648be89822
26
.github/ISSUE_TEMPLATE/feature_request.md
vendored
26
.github/ISSUE_TEMPLATE/feature_request.md
vendored
|
@ -1,26 +0,0 @@
|
|||
---
|
||||
name: Feature request
|
||||
about: Suggest an idea for this project.
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
---
|
||||
|
||||
<!--
|
||||
|
||||
Please do *NOT* ask support questions in Github issues.
|
||||
|
||||
If your issue is not a feature request or bug report use our
|
||||
community support.
|
||||
|
||||
https://prometheus.io/community/
|
||||
|
||||
There is also commercial support available.
|
||||
|
||||
https://prometheus.io/support-training/
|
||||
|
||||
-->
|
||||
## Proposal
|
||||
**Use case. Why is this important?**
|
||||
|
||||
*“Nice to have” is not a good use case. :)*
|
23
.github/ISSUE_TEMPLATE/feature_request.yml
vendored
Normal file
23
.github/ISSUE_TEMPLATE/feature_request.yml
vendored
Normal file
|
@ -0,0 +1,23 @@
|
|||
---
|
||||
name: Feature request
|
||||
description: Suggest an idea for this project.
|
||||
body:
|
||||
- type: markdown
|
||||
attributes:
|
||||
value: >-
|
||||
Please do *NOT* ask support questions in Github issues.
|
||||
|
||||
|
||||
If your issue is not a feature request or bug report use
|
||||
our [community support](https://prometheus.io/community/).
|
||||
|
||||
|
||||
There is also [commercial
|
||||
support](https://prometheus.io/support-training/) available.
|
||||
- type: textarea
|
||||
attributes:
|
||||
label: Proposal
|
||||
description: Use case. Why is this important?
|
||||
placeholder: “Nice to have” is not a good use case. :)
|
||||
validations:
|
||||
required: true
|
2
.github/workflows/buf-lint.yml
vendored
2
.github/workflows/buf-lint.yml
vendored
|
@ -10,7 +10,7 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: bufbuild/buf-setup-action@v1.7.0
|
||||
- uses: bufbuild/buf-setup-action@v1.8.0
|
||||
- uses: bufbuild/buf-lint-action@v1
|
||||
with:
|
||||
input: 'prompb'
|
||||
|
|
2
.github/workflows/buf.yml
vendored
2
.github/workflows/buf.yml
vendored
|
@ -9,7 +9,7 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: bufbuild/buf-setup-action@v1.7.0
|
||||
- uses: bufbuild/buf-setup-action@v1.8.0
|
||||
- uses: bufbuild/buf-lint-action@v1
|
||||
with:
|
||||
input: 'prompb'
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
# Changelog
|
||||
|
||||
## 2.39.1 / 2022-10-07
|
||||
|
||||
* [BUGFIX] Rules: Fix notifier relabel changing the labels on active alerts. #11427
|
||||
|
||||
## 2.39.0 / 2022-10-05
|
||||
|
||||
* [FEATURE] **experimental** TSDB: Add support for ingesting out-of-order samples. This is configured via `out_of_order_time_window` field in the config file; check config file docs for more info. #11075
|
||||
|
|
12
README.md
12
README.md
|
@ -1,4 +1,11 @@
|
|||
# Prometheus
|
||||
<h1 align="center" style="border-bottom: none">
|
||||
<a href="//prometheus.io" target="_blank"><img alt="Prometheus" src="/documentation/images/prometheus-logo.svg"></a><br>Prometheus
|
||||
</h1>
|
||||
|
||||
<p align="center">Visit <a href="//prometheus.io" target="_blank">prometheus.io</a> for the full documentation,
|
||||
examples and guides.</p>
|
||||
|
||||
<div align="center">
|
||||
|
||||
[![CircleCI](https://circleci.com/gh/prometheus/prometheus/tree/main.svg?style=shield)][circleci]
|
||||
[![Docker Repository on Quay](https://quay.io/repository/prometheus/prometheus/status)][quay]
|
||||
|
@ -8,8 +15,7 @@
|
|||
[![Gitpod ready-to-code](https://img.shields.io/badge/Gitpod-ready--to--code-blue?logo=gitpod)](https://gitpod.io/#https://github.com/prometheus/prometheus)
|
||||
[![Fuzzing Status](https://oss-fuzz-build-logs.storage.googleapis.com/badges/prometheus.svg)](https://bugs.chromium.org/p/oss-fuzz/issues/list?sort=-opened&can=1&q=proj:prometheus)
|
||||
|
||||
Visit [prometheus.io](https://prometheus.io) for the full documentation,
|
||||
examples and guides.
|
||||
</div>
|
||||
|
||||
Prometheus, a [Cloud Native Computing Foundation](https://cncf.io/) project, is a systems and service monitoring system. It collects metrics
|
||||
from configured targets at given intervals, evaluates rule expressions,
|
||||
|
|
|
@ -73,7 +73,6 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/agent"
|
||||
"github.com/prometheus/prometheus/util/logging"
|
||||
prom_runtime "github.com/prometheus/prometheus/util/runtime"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
"github.com/prometheus/prometheus/web"
|
||||
)
|
||||
|
||||
|
@ -623,7 +622,7 @@ func main() {
|
|||
Appendable: fanoutStorage,
|
||||
Queryable: localStorage,
|
||||
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
|
||||
NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
|
||||
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
|
||||
Context: ctxRule,
|
||||
ExternalURL: cfg.web.ExternalURL,
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
|
@ -1274,36 +1273,6 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) {
|
|||
return eu, nil
|
||||
}
|
||||
|
||||
type sender interface {
|
||||
Send(alerts ...*notifier.Alert)
|
||||
}
|
||||
|
||||
// sendAlerts implements the rules.NotifyFunc for a Notifier.
|
||||
func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
|
||||
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
|
||||
var res []*notifier.Alert
|
||||
|
||||
for _, alert := range alerts {
|
||||
a := ¬ifier.Alert{
|
||||
StartsAt: alert.FiredAt,
|
||||
Labels: alert.Labels,
|
||||
Annotations: alert.Annotations,
|
||||
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = alert.ResolvedAt
|
||||
} else {
|
||||
a.EndsAt = alert.ValidUntil
|
||||
}
|
||||
res = append(res, a)
|
||||
}
|
||||
|
||||
if len(alerts) > 0 {
|
||||
s.Send(res...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readyStorage implements the Storage interface while allowing to set the actual
|
||||
// storage at a later point in time.
|
||||
type readyStorage struct {
|
||||
|
|
|
@ -198,7 +198,7 @@ func TestSendAlerts(t *testing.T) {
|
|||
}
|
||||
require.Equal(t, tc.exp, alerts)
|
||||
})
|
||||
sendAlerts(senderFunc, "http://localhost:9090")(context.TODO(), "up", tc.in...)
|
||||
rules.SendAlerts(senderFunc, "http://localhost:9090")(context.TODO(), "up", tc.in...)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,11 +57,6 @@ scrape_configs:
|
|||
regex: default;kubernetes;https
|
||||
|
||||
# Scrape config for nodes (kubelet).
|
||||
#
|
||||
# Rather than connecting directly to the node, the scrape is proxied though the
|
||||
# Kubernetes apiserver. This means it will work if Prometheus is running out of
|
||||
# cluster, or can't connect to nodes for some other reason (e.g. because of
|
||||
# firewalling).
|
||||
- job_name: "kubernetes-nodes"
|
||||
|
||||
# Default to scraping over https. If required, just disable this or change to
|
||||
|
|
50
documentation/images/prometheus-logo.svg
Normal file
50
documentation/images/prometheus-logo.svg
Normal file
|
@ -0,0 +1,50 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!-- Generator: Adobe Illustrator 16.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 0) -->
|
||||
|
||||
<svg
|
||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:cc="http://creativecommons.org/ns#"
|
||||
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
|
||||
xmlns:svg="http://www.w3.org/2000/svg"
|
||||
xmlns="http://www.w3.org/2000/svg"
|
||||
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
|
||||
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
|
||||
version="1.1"
|
||||
id="Layer_1"
|
||||
x="0px"
|
||||
y="0px"
|
||||
width="115.333px"
|
||||
height="114px"
|
||||
viewBox="0 0 115.333 114"
|
||||
enable-background="new 0 0 115.333 114"
|
||||
xml:space="preserve"
|
||||
sodipodi:docname="prometheus_logo_orange.svg"
|
||||
inkscape:version="0.92.1 r15371"><metadata
|
||||
id="metadata4495"><rdf:RDF><cc:Work
|
||||
rdf:about=""><dc:format>image/svg+xml</dc:format><dc:type
|
||||
rdf:resource="http://purl.org/dc/dcmitype/StillImage" /><dc:title></dc:title></cc:Work></rdf:RDF></metadata><defs
|
||||
id="defs4493" /><sodipodi:namedview
|
||||
pagecolor="#ffffff"
|
||||
bordercolor="#666666"
|
||||
borderopacity="1"
|
||||
objecttolerance="10"
|
||||
gridtolerance="10"
|
||||
guidetolerance="10"
|
||||
inkscape:pageopacity="0"
|
||||
inkscape:pageshadow="2"
|
||||
inkscape:window-width="1484"
|
||||
inkscape:window-height="886"
|
||||
id="namedview4491"
|
||||
showgrid="false"
|
||||
inkscape:zoom="5.2784901"
|
||||
inkscape:cx="60.603667"
|
||||
inkscape:cy="60.329656"
|
||||
inkscape:window-x="54"
|
||||
inkscape:window-y="7"
|
||||
inkscape:window-maximized="0"
|
||||
inkscape:current-layer="Layer_1" /><g
|
||||
id="Layer_2" /><path
|
||||
style="fill:#e6522c;fill-opacity:1"
|
||||
inkscape:connector-curvature="0"
|
||||
id="path4486"
|
||||
d="M 56.667,0.667 C 25.372,0.667 0,26.036 0,57.332 c 0,31.295 25.372,56.666 56.667,56.666 31.295,0 56.666,-25.371 56.666,-56.666 0,-31.296 -25.372,-56.665 -56.666,-56.665 z m 0,106.055 c -8.904,0 -16.123,-5.948 -16.123,-13.283 H 72.79 c 0,7.334 -7.219,13.283 -16.123,13.283 z M 83.297,89.04 H 30.034 V 79.382 H 83.298 V 89.04 Z M 83.106,74.411 H 30.186 C 30.01,74.208 29.83,74.008 29.66,73.802 24.208,67.182 22.924,63.726 21.677,60.204 c -0.021,-0.116 6.611,1.355 11.314,2.413 0,0 2.42,0.56 5.958,1.205 -3.397,-3.982 -5.414,-9.044 -5.414,-14.218 0,-11.359 8.712,-21.285 5.569,-29.308 3.059,0.249 6.331,6.456 6.552,16.161 3.252,-4.494 4.613,-12.701 4.613,-17.733 0,-5.21 3.433,-11.262 6.867,-11.469 -3.061,5.045 0.793,9.37 4.219,20.099 1.285,4.03 1.121,10.812 2.113,15.113 C 63.797,33.534 65.333,20.5 71,16 c -2.5,5.667 0.37,12.758 2.333,16.167 3.167,5.5 5.087,9.667 5.087,17.548 0,5.284 -1.951,10.259 -5.242,14.148 3.742,-0.702 6.326,-1.335 6.326,-1.335 l 12.152,-2.371 c 10e-4,-10e-4 -1.765,7.261 -8.55,14.254 z" /></svg>
|
After Width: | Height: | Size: 2.7 KiB |
2
go.mod
2
go.mod
|
@ -25,7 +25,7 @@ require (
|
|||
github.com/golang/snappy v0.0.4
|
||||
github.com/google/pprof v0.0.0-20220829040838-70bd9ae97f40
|
||||
github.com/gophercloud/gophercloud v1.0.0
|
||||
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2
|
||||
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0
|
||||
github.com/hashicorp/consul/api v1.15.2
|
||||
github.com/hashicorp/nomad/api v0.0.0-20220921012004-ddeeb1040edf
|
||||
|
|
4
go.sum
4
go.sum
|
@ -438,8 +438,8 @@ github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
|
|||
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 h1:uirlL/j72L93RhV4+mkWhjv0cov2I0MIgPOG9rMDr1k=
|
||||
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
|
||||
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 h1:A3dhViTeFDSQcGOXuUi6ukCQSMyDtDISBp2z6OOo2YM=
|
||||
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
|
||||
|
|
|
@ -507,6 +507,8 @@ func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay
|
|||
}
|
||||
alert.ValidUntil = ts.Add(4 * delta)
|
||||
anew := *alert
|
||||
// The notifier re-uses the labels slice, hence make a copy.
|
||||
anew.Labels = alert.Labels.Copy()
|
||||
alerts = append(alerts, &anew)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -20,10 +20,13 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/relabel"
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
|
@ -659,3 +662,53 @@ func TestQueryForStateSeries(t *testing.T) {
|
|||
testFunc(tst)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSendAlertsDontAffectActiveAlerts tests a fix for https://github.com/prometheus/prometheus/issues/11424.
|
||||
func TestSendAlertsDontAffectActiveAlerts(t *testing.T) {
|
||||
rule := NewAlertingRule(
|
||||
"TestRule",
|
||||
nil,
|
||||
time.Minute,
|
||||
labels.FromStrings("severity", "critical"),
|
||||
labels.EmptyLabels(), labels.EmptyLabels(), "", true, nil,
|
||||
)
|
||||
|
||||
// Set an active alert.
|
||||
lbls := labels.FromStrings("a1", "1")
|
||||
h := lbls.Hash()
|
||||
al := &Alert{State: StateFiring, Labels: lbls, ActiveAt: time.Now()}
|
||||
rule.active[h] = al
|
||||
|
||||
expr, err := parser.ParseExpr("foo")
|
||||
require.NoError(t, err)
|
||||
rule.vector = expr
|
||||
|
||||
// The relabel rule reproduced the bug here.
|
||||
opts := notifier.Options{
|
||||
QueueCapacity: 1,
|
||||
RelabelConfigs: []*relabel.Config{
|
||||
{
|
||||
SourceLabels: model.LabelNames{"a1"},
|
||||
Regex: relabel.MustNewRegexp("(.+)"),
|
||||
TargetLabel: "a1",
|
||||
Replacement: "bug",
|
||||
Action: "replace",
|
||||
},
|
||||
},
|
||||
}
|
||||
nm := notifier.NewManager(&opts, log.NewNopLogger())
|
||||
|
||||
f := SendAlerts(nm, "")
|
||||
notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) {
|
||||
require.Len(t, alerts, 1)
|
||||
require.Equal(t, al, alerts[0])
|
||||
f(ctx, expr, alerts...)
|
||||
}
|
||||
|
||||
rule.sendAlerts(context.Background(), time.Now(), 0, 0, notifyFunc)
|
||||
nm.Stop()
|
||||
|
||||
// The relabel rule changes a1=1 to a1=bug.
|
||||
// But the labels with the AlertingRule should not be changed.
|
||||
require.Equal(t, labels.FromStrings("a1", "1"), rule.active[h].Labels)
|
||||
}
|
||||
|
|
|
@ -35,10 +35,12 @@ import (
|
|||
"github.com/prometheus/prometheus/model/rulefmt"
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
||||
// RuleHealth describes the health state of a rule.
|
||||
|
@ -1169,3 +1171,33 @@ func (m *Manager) AlertingRules() []*AlertingRule {
|
|||
|
||||
return alerts
|
||||
}
|
||||
|
||||
type Sender interface {
|
||||
Send(alerts ...*notifier.Alert)
|
||||
}
|
||||
|
||||
// SendAlerts implements the rules.NotifyFunc for a Notifier.
|
||||
func SendAlerts(s Sender, externalURL string) NotifyFunc {
|
||||
return func(ctx context.Context, expr string, alerts ...*Alert) {
|
||||
var res []*notifier.Alert
|
||||
|
||||
for _, alert := range alerts {
|
||||
a := ¬ifier.Alert{
|
||||
StartsAt: alert.FiredAt,
|
||||
Labels: alert.Labels,
|
||||
Annotations: alert.Annotations,
|
||||
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = alert.ResolvedAt
|
||||
} else {
|
||||
a.EndsAt = alert.ValidUntil
|
||||
}
|
||||
res = append(res, a)
|
||||
}
|
||||
|
||||
if len(alerts) > 0 {
|
||||
s.Send(res...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ import (
|
|||
"github.com/prometheus/prometheus/scrape"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -400,7 +400,7 @@ type QueueManager struct {
|
|||
relabelConfigs []*relabel.Config
|
||||
sendExemplars bool
|
||||
sendNativeHistograms bool
|
||||
watcher *wal.Watcher
|
||||
watcher *wlog.Watcher
|
||||
metadataWatcher *MetadataWatcher
|
||||
|
||||
clientMtx sync.RWMutex
|
||||
|
@ -433,8 +433,8 @@ type QueueManager struct {
|
|||
// the WAL directory will be constructed as <dir>/wal.
|
||||
func NewQueueManager(
|
||||
metrics *queueManagerMetrics,
|
||||
watcherMetrics *wal.WatcherMetrics,
|
||||
readerMetrics *wal.LiveReaderMetrics,
|
||||
watcherMetrics *wlog.WatcherMetrics,
|
||||
readerMetrics *wlog.LiveReaderMetrics,
|
||||
logger log.Logger,
|
||||
dir string,
|
||||
samplesIn *ewmaRate,
|
||||
|
@ -484,7 +484,7 @@ func NewQueueManager(
|
|||
highestRecvTimestamp: highestRecvTimestamp,
|
||||
}
|
||||
|
||||
t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite)
|
||||
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite)
|
||||
if t.mcfg.Send {
|
||||
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
|
||||
}
|
||||
|
|
|
@ -855,7 +855,7 @@ func BenchmarkSampleSend(b *testing.B) {
|
|||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
m.Append(samples)
|
||||
m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does
|
||||
m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
|
||||
m.SeriesReset(i + 1)
|
||||
}
|
||||
// Do not include shutdown
|
||||
|
|
|
@ -30,7 +30,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -60,8 +60,8 @@ type WriteStorage struct {
|
|||
reg prometheus.Registerer
|
||||
mtx sync.Mutex
|
||||
|
||||
watcherMetrics *wal.WatcherMetrics
|
||||
liveReaderMetrics *wal.LiveReaderMetrics
|
||||
watcherMetrics *wlog.WatcherMetrics
|
||||
liveReaderMetrics *wlog.LiveReaderMetrics
|
||||
externalLabels labels.Labels
|
||||
dir string
|
||||
queues map[string]*QueueManager
|
||||
|
@ -82,8 +82,8 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
|
|||
}
|
||||
rws := &WriteStorage{
|
||||
queues: make(map[string]*QueueManager),
|
||||
watcherMetrics: wal.NewWatcherMetrics(reg),
|
||||
liveReaderMetrics: wal.NewLiveReaderMetrics(reg),
|
||||
watcherMetrics: wlog.NewWatcherMetrics(reg),
|
||||
liveReaderMetrics: wlog.NewLiveReaderMetrics(reg),
|
||||
logger: logger,
|
||||
reg: reg,
|
||||
flushDeadline: flushDeadline,
|
||||
|
|
|
@ -13,7 +13,7 @@ which handles storage and querying of all Prometheus v2 data.
|
|||
|
||||
## External resources
|
||||
|
||||
* A writeup of the original design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/).
|
||||
* A writeup of the original design can be found [here](https://web.archive.org/web/20210803115658/https://fabxc.org/tsdb/).
|
||||
* Video: [Storing 16 Bytes at Scale](https://youtu.be/b_pEevMAC3I) from [PromCon 2017](https://promcon.io/2017-munich/).
|
||||
* Compression is based on the Gorilla TSDB [white paper](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ import (
|
|||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
var ErrUnsupported = errors.New("unsupported operation with WAL-only storage")
|
||||
|
@ -81,7 +81,7 @@ type Options struct {
|
|||
// millisecond-precision timestamps.
|
||||
func DefaultOptions() *Options {
|
||||
return &Options{
|
||||
WALSegmentSize: wal.DefaultSegmentSize,
|
||||
WALSegmentSize: wlog.DefaultSegmentSize,
|
||||
WALCompression: false,
|
||||
StripeSize: tsdb.DefaultStripeSize,
|
||||
TruncateFrequency: DefaultTruncateFrequency,
|
||||
|
@ -220,7 +220,7 @@ type DB struct {
|
|||
opts *Options
|
||||
rs *remote.Storage
|
||||
|
||||
wal *wal.WAL
|
||||
wal *wlog.WL
|
||||
locker *tsdbutil.DirLocker
|
||||
|
||||
appenderPool sync.Pool
|
||||
|
@ -255,7 +255,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
|
|||
// remote_write expects WAL to be stored in a "wal" subdirectory of the main storage.
|
||||
dir = filepath.Join(dir, "wal")
|
||||
|
||||
w, err := wal.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
|
||||
w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "creating WAL")
|
||||
}
|
||||
|
@ -307,7 +307,7 @@ func validateOptions(opts *Options) *Options {
|
|||
opts = DefaultOptions()
|
||||
}
|
||||
if opts.WALSegmentSize <= 0 {
|
||||
opts.WALSegmentSize = wal.DefaultSegmentSize
|
||||
opts.WALSegmentSize = wlog.DefaultSegmentSize
|
||||
}
|
||||
|
||||
// Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2.
|
||||
|
@ -337,7 +337,7 @@ func (db *DB) replayWAL() error {
|
|||
level.Info(db.logger).Log("msg", "replaying WAL, this may take a while", "dir", db.wal.Dir())
|
||||
start := time.Now()
|
||||
|
||||
dir, startFrom, err := wal.LastCheckpoint(db.wal.Dir())
|
||||
dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir())
|
||||
if err != nil && err != record.ErrNotFound {
|
||||
return errors.Wrap(err, "find last checkpoint")
|
||||
}
|
||||
|
@ -345,7 +345,7 @@ func (db *DB) replayWAL() error {
|
|||
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
|
||||
|
||||
if err == nil {
|
||||
sr, err := wal.NewSegmentsReader(dir)
|
||||
sr, err := wlog.NewSegmentsReader(dir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open checkpoint")
|
||||
}
|
||||
|
@ -357,7 +357,7 @@ func (db *DB) replayWAL() error {
|
|||
|
||||
// A corrupted checkpoint is a hard error for now and requires user
|
||||
// intervention. There's likely little data that can be recovered anyway.
|
||||
if err := db.loadWAL(wal.NewReader(sr), multiRef); err != nil {
|
||||
if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
|
||||
return errors.Wrap(err, "backfill checkpoint")
|
||||
}
|
||||
startFrom++
|
||||
|
@ -365,20 +365,20 @@ func (db *DB) replayWAL() error {
|
|||
}
|
||||
|
||||
// Find the last segment.
|
||||
_, last, err := wal.Segments(db.wal.Dir())
|
||||
_, last, err := wlog.Segments(db.wal.Dir())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "finding WAL segments")
|
||||
}
|
||||
|
||||
// Backfil segments from the most recent checkpoint onwards.
|
||||
for i := startFrom; i <= last; i++ {
|
||||
seg, err := wal.OpenReadSegment(wal.SegmentName(db.wal.Dir(), i))
|
||||
seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
|
||||
}
|
||||
|
||||
sr := wal.NewSegmentBufReader(seg)
|
||||
err = db.loadWAL(wal.NewReader(sr), multiRef)
|
||||
sr := wlog.NewSegmentBufReader(seg)
|
||||
err = db.loadWAL(wlog.NewReader(sr), multiRef)
|
||||
if err := sr.Close(); err != nil {
|
||||
level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
||||
}
|
||||
|
@ -394,7 +394,7 @@ func (db *DB) replayWAL() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
|
||||
func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
|
||||
var (
|
||||
dec record.Decoder
|
||||
lastRef = chunks.HeadSeriesRef(db.nextRef.Load())
|
||||
|
@ -423,7 +423,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
|
|||
series := seriesPool.Get().([]record.RefSeries)[:0]
|
||||
series, err = dec.Series(rec, series)
|
||||
if err != nil {
|
||||
errCh <- &wal.CorruptionErr{
|
||||
errCh <- &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode series"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -435,7 +435,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
|
|||
samples := samplesPool.Get().([]record.RefSample)[:0]
|
||||
samples, err = dec.Samples(rec, samples)
|
||||
if err != nil {
|
||||
errCh <- &wal.CorruptionErr{
|
||||
errCh <- &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode samples"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -449,7 +449,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
|
|||
// stripeSeries.exemplars in the next block by using setLatestExemplar.
|
||||
continue
|
||||
default:
|
||||
errCh <- &wal.CorruptionErr{
|
||||
errCh <- &wlog.CorruptionErr{
|
||||
Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -564,7 +564,7 @@ func (db *DB) truncate(mint int64) error {
|
|||
db.gc(mint)
|
||||
level.Info(db.logger).Log("msg", "series GC completed", "duration", time.Since(start))
|
||||
|
||||
first, last, err := wal.Segments(db.wal.Dir())
|
||||
first, last, err := wlog.Segments(db.wal.Dir())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get segment range")
|
||||
}
|
||||
|
@ -598,9 +598,9 @@ func (db *DB) truncate(mint int64) error {
|
|||
|
||||
db.metrics.checkpointCreationTotal.Inc()
|
||||
|
||||
if _, err = wal.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil {
|
||||
if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil {
|
||||
db.metrics.checkpointCreationFail.Inc()
|
||||
if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok {
|
||||
if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok {
|
||||
db.metrics.walCorruptionsTotal.Inc()
|
||||
}
|
||||
return errors.Wrap(err, "create checkpoint")
|
||||
|
@ -622,7 +622,7 @@ func (db *DB) truncate(mint int64) error {
|
|||
db.metrics.checkpointDeleteTotal.Inc()
|
||||
db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted)))
|
||||
|
||||
if err := wal.DeleteCheckpoints(db.wal.Dir(), last); err != nil {
|
||||
if err := wlog.DeleteCheckpoints(db.wal.Dir(), last); err != nil {
|
||||
// Leftover old checkpoints do not cause problems down the line beyond
|
||||
// occupying disk space. They will just be ignored since a newer checkpoint
|
||||
// exists.
|
||||
|
@ -642,7 +642,7 @@ func (db *DB) gc(mint int64) {
|
|||
deleted := db.series.GC(mint)
|
||||
db.metrics.numActiveSeries.Sub(float64(len(deleted)))
|
||||
|
||||
_, last, _ := wal.Segments(db.wal.Dir())
|
||||
_, last, _ := wlog.Segments(db.wal.Dir())
|
||||
|
||||
// We want to keep series records for any newly deleted series
|
||||
// until we've passed the last recorded segment. This prevents
|
||||
|
|
|
@ -35,7 +35,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
|
@ -141,7 +141,7 @@ func TestCommit(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
require.NoError(t, s.Close())
|
||||
|
||||
sr, err := wal.NewSegmentsReader(s.wal.Dir())
|
||||
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, sr.Close())
|
||||
|
@ -149,7 +149,7 @@ func TestCommit(t *testing.T) {
|
|||
|
||||
// Read records from WAL and check for expected count of series, samples, and exemplars.
|
||||
var (
|
||||
r = wal.NewReader(sr)
|
||||
r = wlog.NewReader(sr)
|
||||
dec record.Decoder
|
||||
|
||||
walSeriesCount, walSamplesCount, walExemplarsCount int
|
||||
|
@ -211,7 +211,7 @@ func TestRollback(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
require.NoError(t, s.Close())
|
||||
|
||||
sr, err := wal.NewSegmentsReader(s.wal.Dir())
|
||||
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, sr.Close())
|
||||
|
@ -219,7 +219,7 @@ func TestRollback(t *testing.T) {
|
|||
|
||||
// Read records from WAL and check for expected count of series and samples.
|
||||
var (
|
||||
r = wal.NewReader(sr)
|
||||
r = wlog.NewReader(sr)
|
||||
dec record.Decoder
|
||||
|
||||
walSeriesCount, walSamplesCount, walExemplarsCount int
|
||||
|
@ -534,10 +534,10 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
|
|||
|
||||
// Read back what was written to the WAL.
|
||||
var walExemplarsCount int
|
||||
sr, err := wal.NewSegmentsReader(s.wal.Dir())
|
||||
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
defer sr.Close()
|
||||
r := wal.NewReader(sr)
|
||||
r := wlog.NewReader(sr)
|
||||
|
||||
var dec record.Decoder
|
||||
for r.Next() {
|
||||
|
|
|
@ -37,7 +37,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
// In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped
|
||||
|
@ -489,7 +489,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
|
|||
return filepath.Join(dir, ulid.String())
|
||||
}
|
||||
|
||||
func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head {
|
||||
func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string) *Head {
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkDirRoot = chunkDir
|
||||
head, err := NewHead(nil, nil, w, nil, opts, nil)
|
||||
|
@ -530,7 +530,7 @@ func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir str
|
|||
return head
|
||||
}
|
||||
|
||||
func createHeadWithOOOSamples(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string, oooSampleFrequency int) *Head {
|
||||
func createHeadWithOOOSamples(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string, oooSampleFrequency int) *Head {
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkDirRoot = chunkDir
|
||||
opts.OutOfOrderTimeWindow.Store(10000000000)
|
||||
|
|
44
tsdb/db.go
44
tsdb/db.go
|
@ -45,7 +45,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met.
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -70,7 +70,7 @@ var ErrNotReady = errors.New("TSDB not ready")
|
|||
// millisecond precision timestamps.
|
||||
func DefaultOptions() *Options {
|
||||
return &Options{
|
||||
WALSegmentSize: wal.DefaultSegmentSize,
|
||||
WALSegmentSize: wlog.DefaultSegmentSize,
|
||||
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
|
||||
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
|
||||
MinBlockDuration: DefaultBlockDuration,
|
||||
|
@ -393,14 +393,14 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
|
|||
if len(blockReaders) > 0 {
|
||||
maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime
|
||||
}
|
||||
w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal"))
|
||||
w, err := wlog.Open(db.logger, filepath.Join(db.dir, "wal"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var wbl *wal.WAL
|
||||
wblDir := filepath.Join(db.dir, wal.WblDirName)
|
||||
var wbl *wlog.WL
|
||||
wblDir := filepath.Join(db.dir, wlog.WblDirName)
|
||||
if _, err := os.Stat(wblDir); !os.IsNotExist(err) {
|
||||
wbl, err = wal.Open(db.logger, wblDir)
|
||||
wbl, err = wlog.Open(db.logger, wblDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -477,14 +477,14 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
|
|||
if err := head.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal"))
|
||||
w, err := wlog.Open(db.logger, filepath.Join(db.dir, "wal"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var wbl *wal.WAL
|
||||
wblDir := filepath.Join(db.dir, wal.WblDirName)
|
||||
var wbl *wlog.WL
|
||||
wblDir := filepath.Join(db.dir, wlog.WblDirName)
|
||||
if _, err := os.Stat(wblDir); !os.IsNotExist(err) {
|
||||
wbl, err = wal.Open(db.logger, wblDir)
|
||||
wbl, err = wlog.Open(db.logger, wblDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -681,7 +681,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
}
|
||||
|
||||
walDir := filepath.Join(dir, "wal")
|
||||
wblDir := filepath.Join(dir, wal.WblDirName)
|
||||
wblDir := filepath.Join(dir, wlog.WblDirName)
|
||||
|
||||
// Migrate old WAL if one exists.
|
||||
if err := MigrateWAL(l, walDir); err != nil {
|
||||
|
@ -743,15 +743,15 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
}
|
||||
db.compactCancel = cancel
|
||||
|
||||
var wlog, wblog *wal.WAL
|
||||
segmentSize := wal.DefaultSegmentSize
|
||||
var wal, wbl *wlog.WL
|
||||
segmentSize := wlog.DefaultSegmentSize
|
||||
// Wal is enabled.
|
||||
if opts.WALSegmentSize >= 0 {
|
||||
// Wal is set to a custom size.
|
||||
if opts.WALSegmentSize > 0 {
|
||||
segmentSize = opts.WALSegmentSize
|
||||
}
|
||||
wlog, err = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
|
||||
wal, err = wlog.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -761,7 +761,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
return nil, err
|
||||
}
|
||||
if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 {
|
||||
wblog, err = wal.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
|
||||
wbl, err = wlog.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -786,7 +786,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
|
||||
headOpts.IsolationDisabled = opts.IsolationDisabled
|
||||
}
|
||||
db.head, err = NewHead(r, l, wlog, wblog, headOpts, stats.Head)
|
||||
db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -818,12 +818,12 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
isOOOErr := isErrLoadOOOWal(initErr)
|
||||
if isOOOErr {
|
||||
level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr)
|
||||
if err := wblog.Repair(initErr); err != nil {
|
||||
if err := wbl.Repair(initErr); err != nil {
|
||||
return nil, errors.Wrap(err, "repair corrupted OOO WAL")
|
||||
}
|
||||
} else {
|
||||
level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr)
|
||||
if err := wlog.Repair(initErr); err != nil {
|
||||
if err := wal.Repair(initErr); err != nil {
|
||||
return nil, errors.Wrap(err, "repair corrupted WAL")
|
||||
}
|
||||
}
|
||||
|
@ -952,19 +952,19 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
|
|||
}
|
||||
|
||||
// Create WBL if it was not present and if OOO is enabled with WAL enabled.
|
||||
var wblog *wal.WAL
|
||||
var wblog *wlog.WL
|
||||
var err error
|
||||
if db.head.wbl != nil {
|
||||
// The existing WBL from the disk might have been replayed while OOO was disabled.
|
||||
wblog = db.head.wbl
|
||||
} else if !db.oooWasEnabled.Load() && oooTimeWindow > 0 && db.opts.WALSegmentSize >= 0 {
|
||||
segmentSize := wal.DefaultSegmentSize
|
||||
segmentSize := wlog.DefaultSegmentSize
|
||||
// Wal is set to a custom size.
|
||||
if db.opts.WALSegmentSize > 0 {
|
||||
segmentSize = db.opts.WALSegmentSize
|
||||
}
|
||||
oooWalDir := filepath.Join(db.dir, wal.WblDirName)
|
||||
wblog, err = wal.NewSize(db.logger, db.registerer, oooWalDir, segmentSize, db.opts.WALCompression)
|
||||
oooWalDir := filepath.Join(db.dir, wlog.WblDirName)
|
||||
wblog, err = wlog.NewSize(db.logger, db.registerer, oooWalDir, segmentSize, db.opts.WALCompression)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
|
@ -244,7 +244,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
f, err := os.OpenFile(path.Join(db.Dir(), "wal", walFiles[0].Name()), os.O_RDWR, 0o666)
|
||||
require.NoError(t, err)
|
||||
r := wal.NewReader(bufio.NewReader(f))
|
||||
r := wlog.NewReader(bufio.NewReader(f))
|
||||
require.True(t, r.Next(), "reading the series record")
|
||||
require.True(t, r.Next(), "reading the first sample record")
|
||||
// Write an invalid record header to corrupt everything after the first wal sample.
|
||||
|
@ -1515,9 +1515,9 @@ func TestSizeRetention(t *testing.T) {
|
|||
require.Equal(t, expSize, actSize, "registered size doesn't match actual disk size")
|
||||
|
||||
// Create a WAL checkpoint, and compare sizes.
|
||||
first, last, err := wal.Segments(db.Head().wal.Dir())
|
||||
first, last, err := wlog.Segments(db.Head().wal.Dir())
|
||||
require.NoError(t, err)
|
||||
_, err = wal.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x chunks.HeadSeriesRef) bool { return false }, 0)
|
||||
_, err = wlog.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x chunks.HeadSeriesRef) bool { return false }, 0)
|
||||
require.NoError(t, err)
|
||||
blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
|
||||
walSize, err = db.Head().wal.Size()
|
||||
|
@ -1923,7 +1923,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
|||
dir := t.TempDir()
|
||||
|
||||
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
|
||||
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
|
||||
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false)
|
||||
require.NoError(t, err)
|
||||
|
||||
var enc record.Encoder
|
||||
|
@ -1965,7 +1965,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
|||
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
|
||||
|
||||
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
|
||||
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
|
||||
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false)
|
||||
require.NoError(t, err)
|
||||
|
||||
var enc record.Encoder
|
||||
|
@ -2365,7 +2365,7 @@ func TestDBReadOnly(t *testing.T) {
|
|||
}
|
||||
|
||||
// Add head to test DBReadOnly WAL reading capabilities.
|
||||
w, err := wal.New(logger, nil, filepath.Join(dbDir, "wal"), true)
|
||||
w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), true)
|
||||
require.NoError(t, err)
|
||||
h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir)
|
||||
require.NoError(t, h.Close())
|
||||
|
@ -3131,7 +3131,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
|
||||
// Check the existing WAL files.
|
||||
first, last, err := wal.Segments(db.head.wal.Dir())
|
||||
first, last, err := wlog.Segments(db.head.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, first)
|
||||
require.Equal(t, 60, last)
|
||||
|
@ -3146,14 +3146,14 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
|
|||
require.Equal(t, 58.0, prom_testutil.ToFloat64(db.head.metrics.headTruncateTotal))
|
||||
|
||||
// The compaction should have only truncated first 2/3 of WAL (while also rotating the files).
|
||||
first, last, err = wal.Segments(db.head.wal.Dir())
|
||||
first, last, err = wlog.Segments(db.head.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 40, first)
|
||||
require.Equal(t, 61, last)
|
||||
|
||||
// The first checkpoint would be for first 2/3rd of WAL, hence till 39.
|
||||
// That should be the last checkpoint.
|
||||
_, cno, err := wal.LastCheckpoint(db.head.wal.Dir())
|
||||
_, cno, err := wlog.LastCheckpoint(db.head.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 39, cno)
|
||||
|
||||
|
@ -3189,7 +3189,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
|
|||
require.Equal(t, newBlockMaxt, db.head.MinTime())
|
||||
|
||||
// Another WAL file was rotated.
|
||||
first, last, err = wal.Segments(db.head.wal.Dir())
|
||||
first, last, err = wlog.Segments(db.head.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 40, first)
|
||||
require.Equal(t, 62, last)
|
||||
|
@ -3202,14 +3202,14 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
|
|||
require.Equal(t, 59, len(db.Blocks()))
|
||||
|
||||
// The compaction should have only truncated first 2/3 of WAL (while also rotating the files).
|
||||
first, last, err = wal.Segments(db.head.wal.Dir())
|
||||
first, last, err = wlog.Segments(db.head.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 55, first)
|
||||
require.Equal(t, 63, last)
|
||||
|
||||
// The first checkpoint would be for first 2/3rd of WAL, hence till 54.
|
||||
// That should be the last checkpoint.
|
||||
_, cno, err = wal.LastCheckpoint(db.head.wal.Dir())
|
||||
_, cno, err = wlog.LastCheckpoint(db.head.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 54, cno)
|
||||
}
|
||||
|
@ -3657,9 +3657,9 @@ func TestOOOWALWrite(t *testing.T) {
|
|||
}
|
||||
|
||||
getRecords := func(walDir string) []interface{} {
|
||||
sr, err := wal.NewSegmentsReader(walDir)
|
||||
sr, err := wlog.NewSegmentsReader(walDir)
|
||||
require.NoError(t, err)
|
||||
r := wal.NewReader(sr)
|
||||
r := wlog.NewReader(sr)
|
||||
defer func() {
|
||||
require.NoError(t, sr.Close())
|
||||
}()
|
||||
|
@ -3696,7 +3696,7 @@ func TestOOOWALWrite(t *testing.T) {
|
|||
require.Equal(t, inOrderRecords, actRecs)
|
||||
|
||||
// The OOO WAL.
|
||||
actRecs = getRecords(path.Join(dir, wal.WblDirName))
|
||||
actRecs = getRecords(path.Join(dir, wlog.WblDirName))
|
||||
require.Equal(t, oooRecords, actRecs)
|
||||
}
|
||||
|
||||
|
@ -3890,16 +3890,16 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
|
||||
// Let's create a checkpoint.
|
||||
first, last, err := wal.Segments(w.Dir())
|
||||
first, last, err := wlog.Segments(w.Dir())
|
||||
require.NoError(t, err)
|
||||
keep := func(id chunks.HeadSeriesRef) bool {
|
||||
return id != 3
|
||||
}
|
||||
_, err = wal.Checkpoint(log.NewNopLogger(), w, first, last-1, keep, 0)
|
||||
_, err = wlog.Checkpoint(log.NewNopLogger(), w, first, last-1, keep, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Confirm there's been a checkpoint.
|
||||
cdir, _, err := wal.LastCheckpoint(w.Dir())
|
||||
cdir, _, err := wlog.LastCheckpoint(w.Dir())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Read in checkpoint and WAL.
|
||||
|
@ -4647,7 +4647,7 @@ func TestOOODisabled(t *testing.T) {
|
|||
"number of ooo/oob samples mismatch")
|
||||
|
||||
// Verifying that no OOO artifacts were generated.
|
||||
_, err = os.ReadDir(path.Join(db.Dir(), wal.WblDirName))
|
||||
_, err = os.ReadDir(path.Join(db.Dir(), wlog.WblDirName))
|
||||
require.True(t, os.IsNotExist(err))
|
||||
|
||||
ms, created, err := db.head.getOrCreate(s1.Hash(), s1)
|
||||
|
@ -4812,12 +4812,12 @@ func TestWBLAndMmapReplay(t *testing.T) {
|
|||
resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above.
|
||||
|
||||
// Removing m-map markers in WBL by rewriting it.
|
||||
newWbl, err := wal.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), false)
|
||||
newWbl, err := wlog.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), false)
|
||||
require.NoError(t, err)
|
||||
sr, err := wal.NewSegmentsReader(originalWblDir)
|
||||
sr, err := wlog.NewSegmentsReader(originalWblDir)
|
||||
require.NoError(t, err)
|
||||
var dec record.Decoder
|
||||
r, markers, addedRecs := wal.NewReader(sr), 0, 0
|
||||
r, markers, addedRecs := wlog.NewReader(sr), 0, 0
|
||||
for r.Next() {
|
||||
rec := r.Record()
|
||||
if dec.Type(rec) == record.MmapMarkers {
|
||||
|
|
42
tsdb/head.go
42
tsdb/head.go
|
@ -42,7 +42,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -76,7 +76,7 @@ type Head struct {
|
|||
|
||||
metrics *headMetrics
|
||||
opts *HeadOptions
|
||||
wal, wbl *wal.WAL
|
||||
wal, wbl *wlog.WL
|
||||
exemplarMetrics *ExemplarMetrics
|
||||
exemplars ExemplarStorage
|
||||
logger log.Logger
|
||||
|
@ -191,7 +191,7 @@ type SeriesLifecycleCallback interface {
|
|||
}
|
||||
|
||||
// NewHead opens the head block in dir.
|
||||
func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) {
|
||||
func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *HeadOptions, stats *HeadStats) (*Head, error) {
|
||||
var err error
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
|
@ -612,13 +612,13 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
|
||||
checkpointReplayStart := time.Now()
|
||||
// Backfill the checkpoint first if it exists.
|
||||
dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir())
|
||||
dir, startFrom, err := wlog.LastCheckpoint(h.wal.Dir())
|
||||
if err != nil && err != record.ErrNotFound {
|
||||
return errors.Wrap(err, "find last checkpoint")
|
||||
}
|
||||
|
||||
// Find the last segment.
|
||||
_, endAt, e := wal.Segments(h.wal.Dir())
|
||||
_, endAt, e := wlog.Segments(h.wal.Dir())
|
||||
if e != nil {
|
||||
return errors.Wrap(e, "finding WAL segments")
|
||||
}
|
||||
|
@ -627,7 +627,7 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
|
||||
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
|
||||
if err == nil && startFrom >= snapIdx {
|
||||
sr, err := wal.NewSegmentsReader(dir)
|
||||
sr, err := wlog.NewSegmentsReader(dir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open checkpoint")
|
||||
}
|
||||
|
@ -639,7 +639,7 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
|
||||
// A corrupted checkpoint is a hard error for now and requires user
|
||||
// intervention. There's likely little data that can be recovered anyway.
|
||||
if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil {
|
||||
if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil {
|
||||
return errors.Wrap(err, "backfill checkpoint")
|
||||
}
|
||||
h.updateWALReplayStatusRead(startFrom)
|
||||
|
@ -655,7 +655,7 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
}
|
||||
// Backfill segments from the most recent checkpoint onwards.
|
||||
for i := startFrom; i <= endAt; i++ {
|
||||
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
|
||||
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wal.Dir(), i))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
|
||||
}
|
||||
|
@ -664,7 +664,7 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
if i == snapIdx {
|
||||
offset = snapOffset
|
||||
}
|
||||
sr, err := wal.NewSegmentBufReaderWithOffset(offset, s)
|
||||
sr, err := wlog.NewSegmentBufReaderWithOffset(offset, s)
|
||||
if errors.Cause(err) == io.EOF {
|
||||
// File does not exist.
|
||||
continue
|
||||
|
@ -672,7 +672,7 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
if err != nil {
|
||||
return errors.Wrapf(err, "segment reader (offset=%d)", offset)
|
||||
}
|
||||
err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks)
|
||||
err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks)
|
||||
if err := sr.Close(); err != nil {
|
||||
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
|
||||
}
|
||||
|
@ -687,20 +687,20 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
wblReplayStart := time.Now()
|
||||
if h.wbl != nil {
|
||||
// Replay OOO WAL.
|
||||
startFrom, endAt, e = wal.Segments(h.wbl.Dir())
|
||||
startFrom, endAt, e = wlog.Segments(h.wbl.Dir())
|
||||
if e != nil {
|
||||
return errors.Wrap(e, "finding OOO WAL segments")
|
||||
}
|
||||
h.startWALReplayStatus(startFrom, endAt)
|
||||
|
||||
for i := startFrom; i <= endAt; i++ {
|
||||
s, err := wal.OpenReadSegment(wal.SegmentName(h.wbl.Dir(), i))
|
||||
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))
|
||||
}
|
||||
|
||||
sr := wal.NewSegmentBufReader(s)
|
||||
err = h.loadWBL(wal.NewReader(sr), multiRef, lastMmapRef)
|
||||
sr := wlog.NewSegmentBufReader(s)
|
||||
err = h.loadWBL(wlog.NewReader(sr), multiRef, lastMmapRef)
|
||||
if err := sr.Close(); err != nil {
|
||||
level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err)
|
||||
}
|
||||
|
@ -850,7 +850,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error) (map[chunks.HeadSeriesRef
|
|||
return mmappedChunks, oooMmappedChunks, lastRef, nil
|
||||
}
|
||||
|
||||
func (h *Head) ApplyConfig(cfg *config.Config, wbl *wal.WAL) {
|
||||
func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL) {
|
||||
oooTimeWindow := int64(0)
|
||||
if cfg.StorageConfig.TSDBConfig != nil {
|
||||
oooTimeWindow = cfg.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
|
||||
|
@ -882,7 +882,7 @@ func (h *Head) ApplyConfig(cfg *config.Config, wbl *wal.WAL) {
|
|||
|
||||
// SetOutOfOrderTimeWindow updates the out of order related parameters.
|
||||
// If the Head already has a WBL set, then the wbl will be ignored.
|
||||
func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wal.WAL) {
|
||||
func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wlog.WL) {
|
||||
if oooTimeWindow > 0 && h.wbl == nil {
|
||||
h.wbl = wbl
|
||||
}
|
||||
|
@ -1115,7 +1115,7 @@ func (h *Head) truncateWAL(mint int64) error {
|
|||
start := time.Now()
|
||||
h.lastWALTruncationTime.Store(mint)
|
||||
|
||||
first, last, err := wal.Segments(h.wal.Dir())
|
||||
first, last, err := wlog.Segments(h.wal.Dir())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get segment range")
|
||||
}
|
||||
|
@ -1147,9 +1147,9 @@ func (h *Head) truncateWAL(mint int64) error {
|
|||
return ok
|
||||
}
|
||||
h.metrics.checkpointCreationTotal.Inc()
|
||||
if _, err = wal.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil {
|
||||
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil {
|
||||
h.metrics.checkpointCreationFail.Inc()
|
||||
if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok {
|
||||
if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok {
|
||||
h.metrics.walCorruptionsTotal.Inc()
|
||||
}
|
||||
return errors.Wrap(err, "create checkpoint")
|
||||
|
@ -1172,7 +1172,7 @@ func (h *Head) truncateWAL(mint int64) error {
|
|||
h.deletedMtx.Unlock()
|
||||
|
||||
h.metrics.checkpointDeleteTotal.Inc()
|
||||
if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
|
||||
if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
|
||||
// Leftover old checkpoints do not cause problems down the line beyond
|
||||
// occupying disk space.
|
||||
// They will just be ignored since a higher checkpoint exists.
|
||||
|
@ -1415,7 +1415,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
|||
h.tombstones.TruncateBefore(mint)
|
||||
|
||||
if h.wal != nil {
|
||||
_, last, _ := wal.Segments(h.wal.Dir())
|
||||
_, last, _ := wlog.Segments(h.wal.Dir())
|
||||
h.deletedMtx.Lock()
|
||||
// Keep series records until we're past segment 'last'
|
||||
// because the WAL will still have samples records with
|
||||
|
|
|
@ -48,12 +48,12 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wal.WAL) {
|
||||
func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wlog.WL) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
|
@ -66,14 +66,14 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (
|
|||
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
|
||||
}
|
||||
|
||||
h, err := NewHead(nil, nil, wlog, nil, opts, nil)
|
||||
h, err := NewHead(nil, nil, wal, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
|
||||
return nil
|
||||
}))
|
||||
|
||||
return h, wlog
|
||||
return h, wal
|
||||
}
|
||||
|
||||
func BenchmarkCreateSeries(b *testing.B) {
|
||||
|
@ -91,7 +91,7 @@ func BenchmarkCreateSeries(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
|
||||
func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) {
|
||||
var enc record.Encoder
|
||||
for _, r := range recs {
|
||||
switch v := r.(type) {
|
||||
|
@ -108,12 +108,12 @@ func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
|
|||
}
|
||||
|
||||
func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
|
||||
sr, err := wal.NewSegmentsReader(dir)
|
||||
sr, err := wlog.NewSegmentsReader(dir)
|
||||
require.NoError(t, err)
|
||||
defer sr.Close()
|
||||
|
||||
var dec record.Decoder
|
||||
r := wal.NewReader(sr)
|
||||
r := wlog.NewReader(sr)
|
||||
|
||||
for r.Next() {
|
||||
rec := r.Record()
|
||||
|
@ -192,7 +192,7 @@ func BenchmarkLoadWAL(b *testing.B) {
|
|||
func(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
|
||||
w, err := wal.New(nil, nil, dir, false)
|
||||
w, err := wlog.New(nil, nil, dir, false)
|
||||
require.NoError(b, err)
|
||||
|
||||
// Write series.
|
||||
|
@ -574,7 +574,7 @@ func TestHead_WALMultiRef(t *testing.T) {
|
|||
require.NotEqual(t, ref1, ref2, "Refs are the same")
|
||||
require.NoError(t, head.Close())
|
||||
|
||||
w, err = wal.New(nil, nil, w.Dir(), false)
|
||||
w, err = wlog.New(nil, nil, w.Dir(), false)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
|
@ -882,7 +882,7 @@ func TestHeadDeleteSimple(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
|
||||
// Compare the samples for both heads - before and after the reloadBlocks.
|
||||
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks.
|
||||
reloadedW, err := wlog.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks.
|
||||
require.NoError(t, err)
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1000
|
||||
|
@ -1003,7 +1003,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
|
|||
require.NoError(t, hb.Close())
|
||||
|
||||
// Confirm there's been a checkpoint.
|
||||
cdir, _, err := wal.LastCheckpoint(w.Dir())
|
||||
cdir, _, err := wlog.LastCheckpoint(w.Dir())
|
||||
require.NoError(t, err)
|
||||
// Read in checkpoint and WAL.
|
||||
recs := readTestWAL(t, cdir)
|
||||
|
@ -1650,7 +1650,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
|||
|
||||
// Fill the wal and corrupt it.
|
||||
{
|
||||
w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress)
|
||||
w, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compress)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := 1; i <= test.totalRecs; i++ {
|
||||
|
@ -1671,7 +1671,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
|||
initErr := h.Init(math.MinInt64)
|
||||
|
||||
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
|
||||
_, corrErr := err.(*wal.CorruptionErr)
|
||||
_, corrErr := err.(*wlog.CorruptionErr)
|
||||
require.True(t, corrErr, "reading the wal didn't return corruption error")
|
||||
require.NoError(t, h.Close()) // Head will close the wal as well.
|
||||
}
|
||||
|
@ -1688,10 +1688,10 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
|||
|
||||
// Read the wal content after the repair.
|
||||
{
|
||||
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal"))
|
||||
sr, err := wlog.NewSegmentsReader(filepath.Join(dir, "wal"))
|
||||
require.NoError(t, err)
|
||||
defer sr.Close()
|
||||
r := wal.NewReader(sr)
|
||||
r := wlog.NewReader(sr)
|
||||
|
||||
var actRec int
|
||||
for r.Next() {
|
||||
|
@ -1713,7 +1713,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
|||
walDir := filepath.Join(dir, "wal")
|
||||
// Fill the chunk segments and corrupt it.
|
||||
{
|
||||
w, err := wal.New(nil, nil, walDir, false)
|
||||
w, err := wlog.New(nil, nil, walDir, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
|
@ -1775,7 +1775,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNewWalSegmentOnTruncate(t *testing.T) {
|
||||
h, wlog := newTestHead(t, 1000, false, false)
|
||||
h, wal := newTestHead(t, 1000, false, false)
|
||||
defer func() {
|
||||
require.NoError(t, h.Close())
|
||||
}()
|
||||
|
@ -1787,19 +1787,19 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
|
|||
}
|
||||
|
||||
add(0)
|
||||
_, last, err := wal.Segments(wlog.Dir())
|
||||
_, last, err := wlog.Segments(wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, last)
|
||||
|
||||
add(1)
|
||||
require.NoError(t, h.Truncate(1))
|
||||
_, last, err = wal.Segments(wlog.Dir())
|
||||
_, last, err = wlog.Segments(wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, last)
|
||||
|
||||
add(2)
|
||||
require.NoError(t, h.Truncate(2))
|
||||
_, last, err = wal.Segments(wlog.Dir())
|
||||
_, last, err = wlog.Segments(wal.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, last)
|
||||
}
|
||||
|
@ -1954,12 +1954,12 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
i = addSamples(hb)
|
||||
require.NoError(t, hb.Close())
|
||||
|
||||
wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false)
|
||||
wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1000
|
||||
opts.ChunkDirRoot = wlog.Dir()
|
||||
hb, err = NewHead(nil, nil, wlog, nil, opts, nil)
|
||||
opts.ChunkDirRoot = wal.Dir()
|
||||
hb, err = NewHead(nil, nil, wal, nil, opts, nil)
|
||||
defer func() { require.NoError(t, hb.Close()) }()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, hb.Init(0))
|
||||
|
@ -3021,7 +3021,7 @@ func TestChunkSnapshot(t *testing.T) {
|
|||
}
|
||||
|
||||
openHeadAndCheckReplay := func() {
|
||||
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
|
@ -3228,7 +3228,7 @@ func TestSnapshotError(t *testing.T) {
|
|||
require.NoError(t, f.Close())
|
||||
|
||||
// Create new Head which should replay this snapshot.
|
||||
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
// Testing https://github.com/prometheus/prometheus/issues/9437 with the registry.
|
||||
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
|
||||
|
@ -3613,7 +3613,7 @@ loop:
|
|||
// Tests https://github.com/prometheus/prometheus/issues/9725.
|
||||
func TestChunkSnapshotReplayBug(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write few series records and samples such that the series references are not in order in the WAL
|
||||
|
@ -3640,10 +3640,10 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
|
|||
|
||||
rec := enc.Series([]record.RefSeries{seriesRec}, buf)
|
||||
buf = rec[:0]
|
||||
require.NoError(t, wlog.Log(rec))
|
||||
require.NoError(t, wal.Log(rec))
|
||||
rec = enc.Samples([]record.RefSample{samplesRec}, buf)
|
||||
buf = rec[:0]
|
||||
require.NoError(t, wlog.Log(rec))
|
||||
require.NoError(t, wal.Log(rec))
|
||||
}
|
||||
|
||||
// Write a corrupt snapshot to fail the replay on startup.
|
||||
|
@ -3657,7 +3657,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
|
|||
opts := DefaultHeadOptions()
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.EnableMemorySnapshotOnShutdown = true
|
||||
head, err := NewHead(nil, nil, wlog, nil, opts, nil)
|
||||
head, err := NewHead(nil, nil, wal, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
defer func() {
|
||||
|
@ -3680,7 +3680,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
|
|||
|
||||
func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
|
||||
|
@ -3691,7 +3691,7 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
|||
opts := DefaultHeadOptions()
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.EnableMemorySnapshotOnShutdown = true
|
||||
head, err := NewHead(nil, nil, wlog, nil, opts, nil)
|
||||
head, err := NewHead(nil, nil, wlTemp, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
|
||||
|
@ -3718,9 +3718,9 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
|||
// TODO(codesome): Needs test for ooo WAL repair.
|
||||
func TestOOOWalReplay(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true)
|
||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
|
@ -3728,7 +3728,7 @@ func TestOOOWalReplay(t *testing.T) {
|
|||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
|
||||
|
||||
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil)
|
||||
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
|
@ -3765,11 +3765,11 @@ func TestOOOWalReplay(t *testing.T) {
|
|||
|
||||
// Restart head.
|
||||
require.NoError(t, h.Close())
|
||||
wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
oooWlog, err = wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true)
|
||||
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
||||
require.NoError(t, err)
|
||||
h, err = NewHead(nil, nil, wlog, oooWlog, opts, nil)
|
||||
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0)) // Replay happens here.
|
||||
|
||||
|
@ -3802,9 +3802,9 @@ func TestOOOWalReplay(t *testing.T) {
|
|||
// TestOOOMmapReplay checks the replay at a low level.
|
||||
func TestOOOMmapReplay(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true)
|
||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
|
@ -3813,7 +3813,7 @@ func TestOOOMmapReplay(t *testing.T) {
|
|||
opts.OutOfOrderCapMax.Store(30)
|
||||
opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds())
|
||||
|
||||
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil)
|
||||
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
|
@ -3853,11 +3853,11 @@ func TestOOOMmapReplay(t *testing.T) {
|
|||
// Restart head.
|
||||
require.NoError(t, h.Close())
|
||||
|
||||
wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
oooWlog, err = wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true)
|
||||
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
||||
require.NoError(t, err)
|
||||
h, err = NewHead(nil, nil, wlog, oooWlog, opts, nil)
|
||||
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0)) // Replay happens here.
|
||||
|
||||
|
@ -3927,9 +3927,9 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
|
|||
|
||||
require.NoError(t, h.Close())
|
||||
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false)
|
||||
require.NoError(t, err)
|
||||
h, err = NewHead(nil, nil, wlog, nil, h.opts, nil)
|
||||
h, err = NewHead(nil, nil, wal, nil, h.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
|
@ -3962,7 +3962,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding {
|
|||
// Tests https://github.com/prometheus/prometheus/issues/10277.
|
||||
func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
|
@ -3971,7 +3971,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
|
|||
opts.EnableExemplarStorage = true
|
||||
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
|
||||
|
||||
h, err := NewHead(nil, nil, wlog, nil, opts, nil)
|
||||
h, err := NewHead(nil, nil, wal, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
|
@ -3995,7 +3995,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
|
|||
addChunks()
|
||||
|
||||
require.NoError(t, h.Close())
|
||||
wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
||||
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
mmapFilePath := filepath.Join(dir, "chunks_head", "000001")
|
||||
|
@ -4005,7 +4005,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.NoError(t, f.Close())
|
||||
|
||||
h, err = NewHead(nil, nil, wlog, nil, opts, nil)
|
||||
h, err = NewHead(nil, nil, wal, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
|
@ -4021,7 +4021,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
|
|||
var err error
|
||||
|
||||
openHead := func() {
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
|
@ -4030,7 +4030,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
|
|||
opts.EnableMemorySnapshotOnShutdown = true
|
||||
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
|
||||
|
||||
h, err = NewHead(nil, nil, wlog, nil, opts, nil)
|
||||
h, err = NewHead(nil, nil, wal, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
}
|
||||
|
@ -4228,9 +4228,9 @@ func generateBigTestHistograms(n int) []*histogram.Histogram {
|
|||
|
||||
func TestOOOAppendWithNoSeries(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true)
|
||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
|
@ -4238,7 +4238,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
|||
opts.OutOfOrderCapMax.Store(30)
|
||||
opts.OutOfOrderTimeWindow.Store(120 * time.Minute.Milliseconds())
|
||||
|
||||
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil)
|
||||
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, h.Close())
|
||||
|
@ -4309,16 +4309,16 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
|||
|
||||
func TestHeadMinOOOTimeUpdate(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true)
|
||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
|
||||
|
||||
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil)
|
||||
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, h.Close())
|
||||
|
|
|
@ -39,10 +39,10 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
|
||||
func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
|
||||
// Track number of samples that referenced a series we don't know about
|
||||
// for error reporting.
|
||||
var unknownRefs atomic.Uint64
|
||||
|
@ -99,7 +99,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
|
||||
defer func() {
|
||||
// For CorruptionErr ensure to terminate all workers before exiting.
|
||||
_, ok := err.(*wal.CorruptionErr)
|
||||
_, ok := err.(*wlog.CorruptionErr)
|
||||
if ok || seriesCreationErr != nil {
|
||||
for i := 0; i < n; i++ {
|
||||
processors[i].closeAndDrain()
|
||||
|
@ -156,7 +156,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
series := seriesPool.Get().([]record.RefSeries)[:0]
|
||||
series, err = dec.Series(rec, series)
|
||||
if err != nil {
|
||||
decodeErr = &wal.CorruptionErr{
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode series"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -168,7 +168,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
samples := samplesPool.Get().([]record.RefSample)[:0]
|
||||
samples, err = dec.Samples(rec, samples)
|
||||
if err != nil {
|
||||
decodeErr = &wal.CorruptionErr{
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode samples"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -180,7 +180,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
tstones := tstonesPool.Get().([]tombstones.Stone)[:0]
|
||||
tstones, err = dec.Tombstones(rec, tstones)
|
||||
if err != nil {
|
||||
decodeErr = &wal.CorruptionErr{
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode tombstones"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -192,7 +192,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0]
|
||||
exemplars, err = dec.Exemplars(rec, exemplars)
|
||||
if err != nil {
|
||||
decodeErr = &wal.CorruptionErr{
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode exemplars"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -216,7 +216,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
meta := metadataPool.Get().([]record.RefMetadata)[:0]
|
||||
meta, err := dec.Metadata(rec, meta)
|
||||
if err != nil {
|
||||
decodeErr = &wal.CorruptionErr{
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode metadata"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -596,7 +596,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
|||
return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks
|
||||
}
|
||||
|
||||
func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
||||
func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
||||
// Track number of samples, m-map markers, that referenced a series we don't know about
|
||||
// for error reporting.
|
||||
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
|
||||
|
@ -628,7 +628,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
defer func() {
|
||||
// For CorruptionErr ensure to terminate all workers before exiting.
|
||||
// We also wrap it to identify OOO WBL corruption.
|
||||
_, ok := err.(*wal.CorruptionErr)
|
||||
_, ok := err.(*wlog.CorruptionErr)
|
||||
if ok {
|
||||
err = &errLoadWbl{err: err}
|
||||
for i := 0; i < n; i++ {
|
||||
|
@ -658,7 +658,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
samples := samplesPool.Get().([]record.RefSample)[:0]
|
||||
samples, err = dec.Samples(rec, samples)
|
||||
if err != nil {
|
||||
decodeErr = &wal.CorruptionErr{
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode samples"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -670,7 +670,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
markers := markersPool.Get().([]record.RefMmapMarker)[:0]
|
||||
markers, err = dec.MmapMarkers(rec, markers)
|
||||
if err != nil {
|
||||
decodeErr = &wal.CorruptionErr{
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: errors.Wrap(err, "decode mmap markers"),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
|
@ -1046,7 +1046,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
|
|||
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
|
||||
return stats, errors.Wrap(err, "create chunk snapshot dir")
|
||||
}
|
||||
cp, err := wal.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
|
||||
cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
|
||||
if err != nil {
|
||||
return stats, errors.Wrap(err, "open chunk snapshot")
|
||||
}
|
||||
|
@ -1285,7 +1285,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
|
|||
}
|
||||
|
||||
start := time.Now()
|
||||
sr, err := wal.NewSegmentsReader(dir)
|
||||
sr, err := wlog.NewSegmentsReader(dir)
|
||||
if err != nil {
|
||||
return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
|
||||
}
|
||||
|
@ -1356,7 +1356,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
|
|||
}(i, recordChan)
|
||||
}
|
||||
|
||||
r := wal.NewReader(sr)
|
||||
r := wlog.NewReader(sr)
|
||||
var loopErr error
|
||||
Outer:
|
||||
for r.Next() {
|
||||
|
|
|
@ -37,7 +37,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
// WALEntryType indicates what data a WAL entry contains.
|
||||
|
@ -89,7 +89,7 @@ func newWalMetrics(r prometheus.Registerer) *walMetrics {
|
|||
// WAL is a write ahead log that can log new series labels and samples.
|
||||
// It must be completely read before new entries are logged.
|
||||
//
|
||||
// DEPRECATED: use wal pkg combined with the record codex instead.
|
||||
// DEPRECATED: use wlog pkg combined with the record codex instead.
|
||||
type WAL interface {
|
||||
Reader() WALReader
|
||||
LogSeries([]record.RefSeries) error
|
||||
|
@ -146,7 +146,7 @@ func newCRC32() hash.Hash32 {
|
|||
|
||||
// SegmentWAL is a write ahead log for series data.
|
||||
//
|
||||
// DEPRECATED: use wal pkg combined with the record coders instead.
|
||||
// DEPRECATED: use wlog pkg combined with the record coders instead.
|
||||
type SegmentWAL struct {
|
||||
mtx sync.Mutex
|
||||
metrics *walMetrics
|
||||
|
@ -1229,7 +1229,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
|
|||
if err := os.RemoveAll(tmpdir); err != nil {
|
||||
return errors.Wrap(err, "cleanup replacement dir")
|
||||
}
|
||||
repl, err := wal.New(logger, nil, tmpdir, false)
|
||||
repl, err := wlog.New(logger, nil, tmpdir, false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open new WAL")
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
func TestSegmentWAL_cut(t *testing.T) {
|
||||
|
@ -450,7 +450,7 @@ func TestMigrateWAL_Empty(t *testing.T) {
|
|||
wdir := path.Join(dir, "wal")
|
||||
|
||||
// Initialize empty WAL.
|
||||
w, err := wal.New(nil, nil, wdir, false)
|
||||
w, err := wlog.New(nil, nil, wdir, false)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Close())
|
||||
|
||||
|
@ -493,7 +493,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) {
|
|||
// Perform migration.
|
||||
require.NoError(t, MigrateWAL(nil, wdir))
|
||||
|
||||
w, err := wal.New(nil, nil, wdir, false)
|
||||
w, err := wlog.New(nil, nil, wdir, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We can properly write some new data after migration.
|
||||
|
@ -505,10 +505,10 @@ func TestMigrateWAL_Fuzz(t *testing.T) {
|
|||
require.NoError(t, w.Close())
|
||||
|
||||
// Read back all data.
|
||||
sr, err := wal.NewSegmentsReader(wdir)
|
||||
sr, err := wlog.NewSegmentsReader(wdir)
|
||||
require.NoError(t, err)
|
||||
|
||||
r := wal.NewReader(sr)
|
||||
r := wlog.NewReader(sr)
|
||||
var res []interface{}
|
||||
var dec record.Decoder
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -93,7 +93,7 @@ const checkpointPrefix = "checkpoint."
|
|||
// segmented format as the original WAL itself.
|
||||
// This makes it easy to read it through the WAL package and concatenate
|
||||
// it with the original WAL.
|
||||
func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) {
|
||||
func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) {
|
||||
stats := &CheckpointStats{}
|
||||
var sgmReader io.ReadCloser
|
||||
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -301,7 +301,7 @@ func TestCheckpoint(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
||||
// Create a new wal with invalid data.
|
||||
// Create a new wlog with invalid data.
|
||||
dir := t.TempDir()
|
||||
w, err := NewSize(nil, nil, dir, 64*1024, false)
|
||||
require.NoError(t, err)
|
||||
|
@ -318,17 +318,17 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.NoError(t, f.Close())
|
||||
|
||||
// Run the checkpoint and since the wal contains corrupt data this should return an error.
|
||||
// Run the checkpoint and since the wlog contains corrupt data this should return an error.
|
||||
_, err = Checkpoint(log.NewNopLogger(), w, 0, 1, nil, 0)
|
||||
require.Error(t, err)
|
||||
|
||||
// Walk the wal dir to make sure there are no tmp folder left behind after the error.
|
||||
// Walk the wlog dir to make sure there are no tmp folder left behind after the error.
|
||||
err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "access err %q: %v", path, err)
|
||||
}
|
||||
if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") {
|
||||
return fmt.Errorf("wal dir contains temporary folder:%s", info.Name())
|
||||
return fmt.Errorf("wlog dir contains temporary folder:%s", info.Name())
|
||||
}
|
||||
return nil
|
||||
})
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -240,7 +240,7 @@ func TestReader_Live(t *testing.T) {
|
|||
|
||||
const fuzzLen = 500
|
||||
|
||||
func generateRandomEntries(w *WAL, records chan []byte) error {
|
||||
func generateRandomEntries(w *WL, records chan []byte) error {
|
||||
var recs [][]byte
|
||||
for i := 0; i < fuzzLen; i++ {
|
||||
var sz int64
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -304,7 +304,7 @@ func (w *Watcher) firstAndLast() (int, int, error) {
|
|||
return refs[0], refs[len(refs)-1], nil
|
||||
}
|
||||
|
||||
// Copied from tsdb/wal/wal.go so we do not have to open a WAL.
|
||||
// Copied from tsdb/wlog/wlog.go so we do not have to open a WAL.
|
||||
// Plan is to move WAL watcher to TSDB and dedupe these implementations.
|
||||
func (w *Watcher) segments(dir string) ([]int, error) {
|
||||
files, err := os.ReadDir(dir)
|
|
@ -10,7 +10,7 @@
|
|||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"fmt"
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
|
@ -133,7 +133,7 @@ func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) {
|
|||
// If it was torn mid-record, a full read (which the caller should do anyway
|
||||
// to ensure integrity) will detect it as a corruption by the end.
|
||||
if d := stat.Size() % pageSize; d != 0 {
|
||||
level.Warn(logger).Log("msg", "Last page of the wal is torn, filling it with zeros", "segment", segName)
|
||||
level.Warn(logger).Log("msg", "Last page of the wlog is torn, filling it with zeros", "segment", segName)
|
||||
if _, err := f.Write(make([]byte, pageSize-d)); err != nil {
|
||||
f.Close()
|
||||
return nil, errors.Wrap(err, "zero-pad torn page")
|
||||
|
@ -164,7 +164,7 @@ func OpenReadSegment(fn string) (*Segment, error) {
|
|||
return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil
|
||||
}
|
||||
|
||||
// WAL is a write ahead log that stores records in segment files.
|
||||
// WL is a write log that stores records in segment files.
|
||||
// It must be read from start to end once before logging new data.
|
||||
// If an error occurs during read, the repair procedure must be called
|
||||
// before it's safe to do further writes.
|
||||
|
@ -174,7 +174,7 @@ func OpenReadSegment(fn string) (*Segment, error) {
|
|||
// Records are never split across segments to allow full segments to be
|
||||
// safely truncated. It also ensures that torn writes never corrupt records
|
||||
// beyond the most recent segment.
|
||||
type WAL struct {
|
||||
type WL struct {
|
||||
dir string
|
||||
logger log.Logger
|
||||
segmentSize int
|
||||
|
@ -188,10 +188,10 @@ type WAL struct {
|
|||
compress bool
|
||||
snappyBuf []byte
|
||||
|
||||
metrics *walMetrics
|
||||
metrics *wlMetrics
|
||||
}
|
||||
|
||||
type walMetrics struct {
|
||||
type wlMetrics struct {
|
||||
fsyncDuration prometheus.Summary
|
||||
pageFlushes prometheus.Counter
|
||||
pageCompletions prometheus.Counter
|
||||
|
@ -201,12 +201,12 @@ type walMetrics struct {
|
|||
writesFailed prometheus.Counter
|
||||
}
|
||||
|
||||
func newWALMetrics(r prometheus.Registerer) *walMetrics {
|
||||
m := &walMetrics{}
|
||||
func newWLMetrics(r prometheus.Registerer) *wlMetrics {
|
||||
m := &wlMetrics{}
|
||||
|
||||
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Name: "fsync_duration_seconds",
|
||||
Help: "Duration of WAL fsync.",
|
||||
Help: "Duration of write log fsync.",
|
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||
})
|
||||
m.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
|
@ -219,19 +219,19 @@ func newWALMetrics(r prometheus.Registerer) *walMetrics {
|
|||
})
|
||||
m.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "truncations_failed_total",
|
||||
Help: "Total number of WAL truncations that failed.",
|
||||
Help: "Total number of write log truncations that failed.",
|
||||
})
|
||||
m.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "truncations_total",
|
||||
Help: "Total number of WAL truncations attempted.",
|
||||
Help: "Total number of write log truncations attempted.",
|
||||
})
|
||||
m.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "segment_current",
|
||||
Help: "WAL segment index that TSDB is currently writing to.",
|
||||
Help: "Write log segment index that TSDB is currently writing to.",
|
||||
})
|
||||
m.writesFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "writes_failed_total",
|
||||
Help: "Total number of WAL writes that failed.",
|
||||
Help: "Total number of write log writes that failed.",
|
||||
})
|
||||
|
||||
if r != nil {
|
||||
|
@ -250,13 +250,13 @@ func newWALMetrics(r prometheus.Registerer) *walMetrics {
|
|||
}
|
||||
|
||||
// New returns a new WAL over the given directory.
|
||||
func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) {
|
||||
func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WL, error) {
|
||||
return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
|
||||
}
|
||||
|
||||
// NewSize returns a new WAL over the given directory.
|
||||
// NewSize returns a new write log over the given directory.
|
||||
// New segments are created with the specified size.
|
||||
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) {
|
||||
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WL, error) {
|
||||
if segmentSize%pageSize != 0 {
|
||||
return nil, errors.New("invalid segment size")
|
||||
}
|
||||
|
@ -266,7 +266,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
|
|||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
w := &WAL{
|
||||
w := &WL{
|
||||
dir: dir,
|
||||
logger: logger,
|
||||
segmentSize: segmentSize,
|
||||
|
@ -277,9 +277,9 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
|
|||
}
|
||||
prefix := "prometheus_tsdb_wal_"
|
||||
if filepath.Base(dir) == WblDirName {
|
||||
prefix = "prometheus_tsdb_out_of_order_wal_"
|
||||
prefix = "prometheus_tsdb_out_of_order_wbl_"
|
||||
}
|
||||
w.metrics = newWALMetrics(prometheus.WrapRegistererWithPrefix(prefix, reg))
|
||||
w.metrics = newWLMetrics(prometheus.WrapRegistererWithPrefix(prefix, reg))
|
||||
|
||||
_, last, err := Segments(w.Dir())
|
||||
if err != nil {
|
||||
|
@ -308,11 +308,11 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
|
|||
}
|
||||
|
||||
// Open an existing WAL.
|
||||
func Open(logger log.Logger, dir string) (*WAL, error) {
|
||||
func Open(logger log.Logger, dir string) (*WL, error) {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
w := &WAL{
|
||||
w := &WL{
|
||||
dir: dir,
|
||||
logger: logger,
|
||||
}
|
||||
|
@ -321,16 +321,16 @@ func Open(logger log.Logger, dir string) (*WAL, error) {
|
|||
}
|
||||
|
||||
// CompressionEnabled returns if compression is enabled on this WAL.
|
||||
func (w *WAL) CompressionEnabled() bool {
|
||||
func (w *WL) CompressionEnabled() bool {
|
||||
return w.compress
|
||||
}
|
||||
|
||||
// Dir returns the directory of the WAL.
|
||||
func (w *WAL) Dir() string {
|
||||
func (w *WL) Dir() string {
|
||||
return w.dir
|
||||
}
|
||||
|
||||
func (w *WAL) run() {
|
||||
func (w *WL) run() {
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
|
@ -350,7 +350,7 @@ Loop:
|
|||
|
||||
// Repair attempts to repair the WAL based on the error.
|
||||
// It discards all data after the corruption.
|
||||
func (w *WAL) Repair(origErr error) error {
|
||||
func (w *WL) Repair(origErr error) error {
|
||||
// We could probably have a mode that only discards torn records right around
|
||||
// the corruption to preserve as data much as possible.
|
||||
// But that's not generally applicable if the records have any kind of causality.
|
||||
|
@ -466,7 +466,7 @@ func SegmentName(dir string, i int) string {
|
|||
|
||||
// NextSegment creates the next segment and closes the previous one asynchronously.
|
||||
// It returns the file number of the new file.
|
||||
func (w *WAL) NextSegment() (int, error) {
|
||||
func (w *WL) NextSegment() (int, error) {
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
return w.nextSegment(true)
|
||||
|
@ -474,7 +474,7 @@ func (w *WAL) NextSegment() (int, error) {
|
|||
|
||||
// NextSegmentSync creates the next segment and closes the previous one in sync.
|
||||
// It returns the file number of the new file.
|
||||
func (w *WAL) NextSegmentSync() (int, error) {
|
||||
func (w *WL) NextSegmentSync() (int, error) {
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
return w.nextSegment(false)
|
||||
|
@ -482,9 +482,9 @@ func (w *WAL) NextSegmentSync() (int, error) {
|
|||
|
||||
// nextSegment creates the next segment and closes the previous one.
|
||||
// It returns the file number of the new file.
|
||||
func (w *WAL) nextSegment(async bool) (int, error) {
|
||||
func (w *WL) nextSegment(async bool) (int, error) {
|
||||
if w.closed {
|
||||
return 0, errors.New("wal is closed")
|
||||
return 0, errors.New("wlog is closed")
|
||||
}
|
||||
|
||||
// Only flush the current page if it actually holds data.
|
||||
|
@ -519,7 +519,7 @@ func (w *WAL) nextSegment(async bool) (int, error) {
|
|||
return next.Index(), nil
|
||||
}
|
||||
|
||||
func (w *WAL) setSegment(segment *Segment) error {
|
||||
func (w *WL) setSegment(segment *Segment) error {
|
||||
w.segment = segment
|
||||
|
||||
// Correctly initialize donePages.
|
||||
|
@ -535,7 +535,7 @@ func (w *WAL) setSegment(segment *Segment) error {
|
|||
// flushPage writes the new contents of the page to disk. If no more records will fit into
|
||||
// the page, the remaining bytes will be set to zero and a new page will be started.
|
||||
// If clear is true, this is enforced regardless of how many bytes are left in the page.
|
||||
func (w *WAL) flushPage(clear bool) error {
|
||||
func (w *WL) flushPage(clear bool) error {
|
||||
w.metrics.pageFlushes.Inc()
|
||||
|
||||
p := w.page
|
||||
|
@ -601,13 +601,13 @@ func (t recType) String() string {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *WAL) pagesPerSegment() int {
|
||||
func (w *WL) pagesPerSegment() int {
|
||||
return w.segmentSize / pageSize
|
||||
}
|
||||
|
||||
// Log writes the records into the log.
|
||||
// Multiple records can be passed at once to reduce writes and increase throughput.
|
||||
func (w *WAL) Log(recs ...[]byte) error {
|
||||
func (w *WL) Log(recs ...[]byte) error {
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
// Callers could just implement their own list record format but adding
|
||||
|
@ -625,7 +625,7 @@ func (w *WAL) Log(recs ...[]byte) error {
|
|||
// - the final record of a batch
|
||||
// - the record is bigger than the page size
|
||||
// - the current page is full.
|
||||
func (w *WAL) log(rec []byte, final bool) error {
|
||||
func (w *WL) log(rec []byte, final bool) error {
|
||||
// When the last page flush failed the page will remain full.
|
||||
// When the page is full, need to flush it before trying to add more records to it.
|
||||
if w.page.full() {
|
||||
|
@ -721,7 +721,7 @@ func (w *WAL) log(rec []byte, final bool) error {
|
|||
|
||||
// LastSegmentAndOffset returns the last segment number of the WAL
|
||||
// and the offset in that file upto which the segment has been filled.
|
||||
func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) {
|
||||
func (w *WL) LastSegmentAndOffset() (seg, offset int, err error) {
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
|
||||
|
@ -736,7 +736,7 @@ func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) {
|
|||
}
|
||||
|
||||
// Truncate drops all segments before i.
|
||||
func (w *WAL) Truncate(i int) (err error) {
|
||||
func (w *WL) Truncate(i int) (err error) {
|
||||
w.metrics.truncateTotal.Inc()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
|
@ -758,27 +758,27 @@ func (w *WAL) Truncate(i int) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (w *WAL) fsync(f *Segment) error {
|
||||
func (w *WL) fsync(f *Segment) error {
|
||||
start := time.Now()
|
||||
err := f.Sync()
|
||||
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
|
||||
return err
|
||||
}
|
||||
|
||||
// Sync forces a file sync on the current wal segment. This function is meant
|
||||
// Sync forces a file sync on the current write log segment. This function is meant
|
||||
// to be used only on tests due to different behaviour on Operating Systems
|
||||
// like windows and linux
|
||||
func (w *WAL) Sync() error {
|
||||
func (w *WL) Sync() error {
|
||||
return w.fsync(w.segment)
|
||||
}
|
||||
|
||||
// Close flushes all writes and closes active segment.
|
||||
func (w *WAL) Close() (err error) {
|
||||
func (w *WL) Close() (err error) {
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
|
||||
if w.closed {
|
||||
return errors.New("wal already closed")
|
||||
return errors.New("wlog already closed")
|
||||
}
|
||||
|
||||
if w.segment == nil {
|
||||
|
@ -811,8 +811,8 @@ func (w *WAL) Close() (err error) {
|
|||
|
||||
// Segments returns the range [first, n] of currently existing segments.
|
||||
// If no segments are found, first and n are -1.
|
||||
func Segments(walDir string) (first, last int, err error) {
|
||||
refs, err := listSegments(walDir)
|
||||
func Segments(wlDir string) (first, last int, err error) {
|
||||
refs, err := listSegments(wlDir)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
@ -979,8 +979,8 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
|
|||
return n, nil
|
||||
}
|
||||
|
||||
// Computing size of the WAL.
|
||||
// Size computes the size of the write log.
|
||||
// We do this by adding the sizes of all the files under the WAL dir.
|
||||
func (w *WAL) Size() (int64, error) {
|
||||
func (w *WL) Size() (int64, error) {
|
||||
return fileutil.DirSize(w.Dir())
|
||||
}
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package wlog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -137,7 +137,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
|
|||
}
|
||||
first, last, err := Segments(w.Dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, 1+last-first, "wal creation didn't result in expected number of segments")
|
||||
require.Equal(t, 3, 1+last-first, "wlog creation didn't result in expected number of segments")
|
||||
|
||||
require.NoError(t, w.Close())
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "@prometheus-io/codemirror-promql",
|
||||
"version": "0.39.0",
|
||||
"version": "0.39.1",
|
||||
"description": "a CodeMirror mode for the PromQL language",
|
||||
"types": "dist/esm/index.d.ts",
|
||||
"module": "dist/esm/index.js",
|
||||
|
@ -29,7 +29,7 @@
|
|||
},
|
||||
"homepage": "https://github.com/prometheus/prometheus/blob/main/web/ui/module/codemirror-promql/README.md",
|
||||
"dependencies": {
|
||||
"@prometheus-io/lezer-promql": "^0.39.0",
|
||||
"@prometheus-io/lezer-promql": "^0.39.1",
|
||||
"lru-cache": "^6.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "@prometheus-io/lezer-promql",
|
||||
"version": "0.39.0",
|
||||
"version": "0.39.1",
|
||||
"description": "lezer-based PromQL grammar",
|
||||
"main": "index.cjs",
|
||||
"type": "module",
|
||||
|
|
14
web/ui/package-lock.json
generated
14
web/ui/package-lock.json
generated
|
@ -28,10 +28,10 @@
|
|||
},
|
||||
"module/codemirror-promql": {
|
||||
"name": "@prometheus-io/codemirror-promql",
|
||||
"version": "0.39.0",
|
||||
"version": "0.39.1",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@prometheus-io/lezer-promql": "^0.39.0",
|
||||
"@prometheus-io/lezer-promql": "^0.39.1",
|
||||
"lru-cache": "^6.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
@ -61,7 +61,7 @@
|
|||
},
|
||||
"module/lezer-promql": {
|
||||
"name": "@prometheus-io/lezer-promql",
|
||||
"version": "0.39.0",
|
||||
"version": "0.39.1",
|
||||
"license": "Apache-2.0",
|
||||
"devDependencies": {
|
||||
"@lezer/generator": "^1.1.1",
|
||||
|
@ -17625,7 +17625,7 @@
|
|||
},
|
||||
"react-app": {
|
||||
"name": "@prometheus-io/app",
|
||||
"version": "0.39.0",
|
||||
"version": "0.39.1",
|
||||
"dependencies": {
|
||||
"@codemirror/autocomplete": "^6.2.0",
|
||||
"@codemirror/commands": "^6.1.0",
|
||||
|
@ -17643,7 +17643,7 @@
|
|||
"@lezer/lr": "^1.2.3",
|
||||
"@nexucis/fuzzy": "^0.4.1",
|
||||
"@nexucis/kvsearch": "^0.8.1",
|
||||
"@prometheus-io/codemirror-promql": "^0.39.0",
|
||||
"@prometheus-io/codemirror-promql": "^0.39.1",
|
||||
"bootstrap": "^4.6.2",
|
||||
"css.escape": "^1.5.1",
|
||||
"downshift": "^6.1.11",
|
||||
|
@ -19883,7 +19883,7 @@
|
|||
"@lezer/lr": "^1.2.3",
|
||||
"@nexucis/fuzzy": "^0.4.1",
|
||||
"@nexucis/kvsearch": "^0.8.1",
|
||||
"@prometheus-io/codemirror-promql": "^0.39.0",
|
||||
"@prometheus-io/codemirror-promql": "^0.39.1",
|
||||
"@testing-library/react-hooks": "^7.0.2",
|
||||
"@types/enzyme": "^3.10.12",
|
||||
"@types/flot": "0.0.32",
|
||||
|
@ -19935,7 +19935,7 @@
|
|||
"@lezer/common": "^1.0.1",
|
||||
"@lezer/highlight": "^1.1.0",
|
||||
"@lezer/lr": "^1.2.3",
|
||||
"@prometheus-io/lezer-promql": "^0.39.0",
|
||||
"@prometheus-io/lezer-promql": "^0.39.1",
|
||||
"@types/lru-cache": "^5.1.1",
|
||||
"isomorphic-fetch": "^3.0.0",
|
||||
"lru-cache": "^6.0.0",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "@prometheus-io/app",
|
||||
"version": "0.39.0",
|
||||
"version": "0.39.1",
|
||||
"private": true,
|
||||
"dependencies": {
|
||||
"@codemirror/autocomplete": "^6.2.0",
|
||||
|
@ -19,7 +19,7 @@
|
|||
"@lezer/common": "^1.0.1",
|
||||
"@nexucis/fuzzy": "^0.4.1",
|
||||
"@nexucis/kvsearch": "^0.8.1",
|
||||
"@prometheus-io/codemirror-promql": "^0.39.0",
|
||||
"@prometheus-io/codemirror-promql": "^0.39.1",
|
||||
"bootstrap": "^4.6.2",
|
||||
"css.escape": "^1.5.1",
|
||||
"downshift": "^6.1.11",
|
||||
|
|
Loading…
Reference in a new issue