diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 7ae656c680..e01b95eeb3 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -353,8 +353,10 @@ func checkConfig(agentMode bool, filename string) ([]string, error) { } for _, scfg := range cfg.ScrapeConfigs { - if err := checkFileExists(scfg.HTTPClientConfig.BearerTokenFile); err != nil { - return nil, errors.Wrapf(err, "error checking bearer token file %q", scfg.HTTPClientConfig.BearerTokenFile) + if scfg.HTTPClientConfig.Authorization != nil { + if err := checkFileExists(scfg.HTTPClientConfig.Authorization.CredentialsFile); err != nil { + return nil, errors.Wrapf(err, "error checking authorization credentials or bearer token file %q", scfg.HTTPClientConfig.Authorization.CredentialsFile) + } } if err := checkTLSConfig(scfg.HTTPClientConfig.TLSConfig); err != nil { diff --git a/cmd/promtool/main_test.go b/cmd/promtool/main_test.go index 1a8a470601..59a15b7a7d 100644 --- a/cmd/promtool/main_test.go +++ b/cmd/promtool/main_test.go @@ -203,3 +203,33 @@ func TestCheckTargetConfig(t *testing.T) { }) } } + +func TestAuthorizationConfig(t *testing.T) { + cases := []struct { + name string + file string + err string + }{ + { + name: "authorization_credentials_file.bad", + file: "authorization_credentials_file.bad.yml", + err: "error checking authorization credentials or bearer token file", + }, + { + name: "authorization_credentials_file.good", + file: "authorization_credentials_file.good.yml", + err: "", + }, + } + + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + _, err := checkConfig(false, "testdata/"+test.file) + if test.err != "" { + require.Contains(t, err.Error(), test.err, "Expected error to contain %q, got %q", test.err, err.Error()) + return + } + require.NoError(t, err) + }) + } +} diff --git a/cmd/promtool/testdata/authorization_credentials_file.bad.yml b/cmd/promtool/testdata/authorization_credentials_file.bad.yml new file mode 100644 index 0000000000..854b8c2293 --- /dev/null +++ b/cmd/promtool/testdata/authorization_credentials_file.bad.yml @@ -0,0 +1,4 @@ +scrape_configs: + - job_name: test + authorization: + credentials_file: "/random/file/which/does/not/exist.yml" diff --git a/cmd/promtool/testdata/authorization_credentials_file.good.yml b/cmd/promtool/testdata/authorization_credentials_file.good.yml new file mode 100644 index 0000000000..f6a5a76500 --- /dev/null +++ b/cmd/promtool/testdata/authorization_credentials_file.good.yml @@ -0,0 +1,4 @@ +scrape_configs: + - job_name: test + authorization: + credentials_file: "." diff --git a/go.mod b/go.mod index 6b58e238c1..6fd98e1568 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,13 @@ module github.com/prometheus/prometheus go 1.14 require ( - github.com/Azure/azure-sdk-for-go v58.3.0+incompatible + github.com/Azure/azure-sdk-for-go v59.4.0+incompatible github.com/Azure/go-autorest/autorest v0.11.22 github.com/Azure/go-autorest/autorest/adal v0.9.17 github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a - github.com/aws/aws-sdk-go v1.42.10 + github.com/aws/aws-sdk-go v1.42.15 github.com/cespare/xxhash/v2 v2.1.2 github.com/containerd/containerd v1.5.7 // indirect github.com/dennwc/varint v1.0.0 @@ -23,7 +23,7 @@ require ( github.com/fsnotify/fsnotify v1.5.1 github.com/go-kit/log v0.2.0 github.com/go-logfmt/logfmt v0.5.1 - github.com/go-openapi/strfmt v0.21.0 + github.com/go-openapi/strfmt v0.21.1 github.com/go-zookeeper/zk v1.0.2 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 diff --git a/go.sum b/go.sum index 824edb52c3..0fedb5291a 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v41.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v58.3.0+incompatible h1:lb9OWePNuJMiibdxg9XvdbiOldR0Yifge37L4LoOxIs= -github.com/Azure/azure-sdk-for-go v58.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go v59.4.0+incompatible h1:gDA8odnngdNd3KYHL2NoK1j9vpWBgEnFSjKKLpkC8Aw= +github.com/Azure/azure-sdk-for-go v59.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= @@ -187,8 +187,8 @@ github.com/aws/aws-sdk-go v1.30.12/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.40.11/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= -github.com/aws/aws-sdk-go v1.42.10 h1:PW9G/hnsuKttbFtOcgNKD0vQrp4yfNrtACA+X0p9mjM= -github.com/aws/aws-sdk-go v1.42.10/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/aws/aws-sdk-go v1.42.15 h1:RcUChuF7KzrrTqx9LAzJbLBX00LkUY7cH9T1VdxNdqk= +github.com/aws/aws-sdk-go v1.42.15/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/benbjohnson/immutable v0.2.1/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= github.com/benbjohnson/tmpl v1.0.0/go.mod h1:igT620JFIi44B6awvU9IsDhR77IXWtFigTLil/RPdps= @@ -547,8 +547,8 @@ github.com/go-openapi/strfmt v0.19.5/go.mod h1:eftuHTlB/dI8Uq8JJOyRlieZf+WkkxUuk github.com/go-openapi/strfmt v0.19.11/go.mod h1:UukAYgTaQfqJuAFlNxxMWNvMYiwiXtLsF2VwmoFtbtc= github.com/go-openapi/strfmt v0.20.0/go.mod h1:UukAYgTaQfqJuAFlNxxMWNvMYiwiXtLsF2VwmoFtbtc= github.com/go-openapi/strfmt v0.20.1/go.mod h1:43urheQI9dNtE5lTZQfuFJvjYJKPrxicATpEfZwHUNk= -github.com/go-openapi/strfmt v0.21.0 h1:hX2qEZKmYks+t0hKeb4VTJpUm2UYsdL3+DCid5swxIs= -github.com/go-openapi/strfmt v0.21.0/go.mod h1:ZRQ409bWMj+SOgXofQAGTIo2Ebu72Gs+WaRADcS5iNg= +github.com/go-openapi/strfmt v0.21.1 h1:G6s2t5V5kGCHLVbSdZ/6lI8Wm4OzoPFkc3/cjAsKQrM= +github.com/go-openapi/strfmt v0.21.1/go.mod h1:I/XVKeLc5+MM5oPNN7P6urMOpuLXEcNrCX/rPGuWb0k= github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I= github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg= github.com/go-openapi/swag v0.18.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg= @@ -1351,8 +1351,8 @@ go.mongodb.org/mongo-driver v1.4.3/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4S go.mongodb.org/mongo-driver v1.4.4/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc= go.mongodb.org/mongo-driver v1.4.6/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc= go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw= -go.mongodb.org/mongo-driver v1.7.3 h1:G4l/eYY9VrQAK/AUgkV0koQKzQnyddnWxrd/Etf0jIs= -go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg= +go.mongodb.org/mongo-driver v1.7.5 h1:ny3p0reEpgsR2cfA5cjgwFZg3Cv/ofFh/8jbhGtz9VI= +go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng= go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index e4c30bfc72..6df0db2d49 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -16,9 +16,11 @@ package agent import ( "context" "fmt" + "math" "path/filepath" "sync" "time" + "unicode/utf8" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -94,6 +96,8 @@ type dbMetrics struct { numActiveSeries prometheus.Gauge numWALSeriesPendingDeletion prometheus.Gauge totalAppendedSamples prometheus.Counter + totalAppendedExemplars prometheus.Counter + totalOutOfOrderSamples prometheus.Counter walTruncateDuration prometheus.Summary walCorruptionsTotal prometheus.Counter walTotalReplayDuration prometheus.Gauge @@ -120,6 +124,16 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { Help: "Total number of samples appended to the storage", }) + m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_exemplars_appended_total", + Help: "Total number of exemplars appended to the storage", + }) + + m.totalOutOfOrderSamples = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_out_of_order_samples_total", + Help: "Total number of out of order samples ingestion failed attempts.", + }) + m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "prometheus_agent_truncate_duration_seconds", Help: "Duration of WAL truncation.", @@ -160,6 +174,8 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { m.numActiveSeries, m.numWALSeriesPendingDeletion, m.totalAppendedSamples, + m.totalAppendedExemplars, + m.totalOutOfOrderSamples, m.walTruncateDuration, m.walCorruptionsTotal, m.walTotalReplayDuration, @@ -181,6 +197,15 @@ func (m *dbMetrics) Unregister() { m.numActiveSeries, m.numWALSeriesPendingDeletion, m.totalAppendedSamples, + m.totalAppendedExemplars, + m.totalOutOfOrderSamples, + m.walTruncateDuration, + m.walCorruptionsTotal, + m.walTotalReplayDuration, + m.checkpointDeleteFail, + m.checkpointDeleteTotal, + m.checkpointCreationFail, + m.checkpointCreationTotal, } for _, c := range cs { m.r.Unregister(c) @@ -258,9 +283,10 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin db.appenderPool.New = func() interface{} { return &appender{ - DB: db, - pendingSeries: make([]record.RefSeries, 0, 100), - pendingSamples: make([]record.RefSample, 0, 100), + DB: db, + pendingSeries: make([]record.RefSeries, 0, 100), + pendingSamples: make([]record.RefSample, 0, 100), + pendingExamplars: make([]record.RefExemplar, 0, 10), } } @@ -412,11 +438,8 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He return } decoded <- samples - case record.Tombstones: - // We don't care about tombstones - continue - case record.Exemplars: - // We don't care about exemplars + case record.Tombstones, record.Exemplars: + // We don't care about tombstones or exemplars during replay. continue default: errCh <- &wal.CorruptionErr{ @@ -666,82 +689,114 @@ func (db *DB) Close() error { type appender struct { *DB - pendingSeries []record.RefSeries - pendingSamples []record.RefSample + pendingSeries []record.RefSeries + pendingSamples []record.RefSample + pendingExamplars []record.RefExemplar + + // Pointers to the series referenced by each element of pendingSamples. + // Series lock is not held on elements. + sampleSeries []*memSeries } func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if ref == 0 { - r, err := a.Add(l, t, v) - return storage.SeriesRef(r), err - } - return ref, a.AddFast(chunks.HeadSeriesRef(ref), t, v) -} + // series references and chunk references are identical for agent mode. + headRef := chunks.HeadSeriesRef(ref) -func (a *appender) Add(l labels.Labels, t int64, v float64) (chunks.HeadSeriesRef, error) { - hash := l.Hash() - series := a.series.GetByHash(hash, l) - if series != nil { - return series.ref, a.AddFast(series.ref, t, v) - } - - // Ensure no empty or duplicate labels have gotten through. This mirrors the - // equivalent validation code in the TSDB's headAppender. - l = l.WithoutEmpty() - if len(l) == 0 { - return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") - } - - if lbl, dup := l.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) - } - - ref := chunks.HeadSeriesRef(a.nextRef.Inc()) - series = &memSeries{ref: ref, lset: l, lastTs: t} - - a.pendingSeries = append(a.pendingSeries, record.RefSeries{ - Ref: ref, - Labels: l, - }) - a.pendingSamples = append(a.pendingSamples, record.RefSample{ - Ref: ref, - T: t, - V: v, - }) - - a.series.Set(hash, series) - - a.metrics.numActiveSeries.Inc() - a.metrics.totalAppendedSamples.Inc() - - return series.ref, nil -} - -func (a *appender) AddFast(ref chunks.HeadSeriesRef, t int64, v float64) error { - series := a.series.GetByID(ref) + series := a.series.GetByID(headRef) if series == nil { - return storage.ErrNotFound + // Ensure no empty or duplicate labels have gotten through. This mirrors the + // equivalent validation code in the TSDB's headAppender. + l = l.WithoutEmpty() + if len(l) == 0 { + return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + } + + var created bool + series, created = a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: series.ref, + Labels: l, + }) + + a.metrics.numActiveSeries.Inc() + } } + series.Lock() defer series.Unlock() - // Update last recorded timestamp. Used by Storage.gc to determine if a - // series is dead. - series.lastTs = t + if t < series.lastTs { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + // NOTE: always modify pendingSamples and sampleSeries together a.pendingSamples = append(a.pendingSamples, record.RefSample{ - Ref: ref, + Ref: series.ref, T: t, V: v, }) + a.sampleSeries = append(a.sampleSeries, series) a.metrics.totalAppendedSamples.Inc() - return nil + return storage.SeriesRef(series.ref), nil +} + +func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) { + hash := l.Hash() + + series = a.series.GetByHash(hash, l) + if series != nil { + return series, false + } + + ref := chunks.HeadSeriesRef(a.nextRef.Inc()) + series = &memSeries{ref: ref, lset: l, lastTs: math.MinInt64} + a.series.Set(hash, series) + return series, true } func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - // remote_write doesn't support exemplars yet, so do nothing here. - return 0, nil + // series references and chunk references are identical for agent mode. + headRef := chunks.HeadSeriesRef(ref) + + s := a.series.GetByID(headRef) + if s == nil { + return 0, fmt.Errorf("unknown series ref when trying to add exemplar: %d", ref) + } + + // Ensure no empty labels have gotten through. + e.Labels = e.Labels.WithoutEmpty() + + if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(tsdb.ErrInvalidExemplar, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + } + + // Exemplar label length does not include chars involved in text rendering such as quotes + // equals sign, or commas. See definition of const ExemplarMaxLabelLength. + labelSetLen := 0 + for _, l := range e.Labels { + labelSetLen += utf8.RuneCountInString(l.Name) + labelSetLen += utf8.RuneCountInString(l.Value) + + if labelSetLen > exemplar.ExemplarMaxLabelSetLength { + return 0, storage.ErrExemplarLabelLength + } + } + + a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ + Ref: s.ref, + T: e.Ts, + V: e.Value, + Labels: e.Labels, + }) + + return storage.SeriesRef(s.ref), nil } func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { @@ -773,6 +828,22 @@ func (a *appender) Commit() error { buf = buf[:0] } + if len(a.pendingExamplars) > 0 { + buf = encoder.Exemplars(a.pendingExamplars, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + var series *memSeries + for i, s := range a.pendingSamples { + series = a.sampleSeries[i] + if !series.updateTimestamp(s.T) { + a.metrics.totalOutOfOrderSamples.Inc() + } + } + //nolint:staticcheck a.bufPool.Put(buf) return a.Rollback() @@ -781,6 +852,8 @@ func (a *appender) Commit() error { func (a *appender) Rollback() error { a.pendingSeries = a.pendingSeries[:0] a.pendingSamples = a.pendingSamples[:0] + a.pendingExamplars = a.pendingExamplars[:0] + a.sampleSeries = a.sampleSeries[:0] a.appenderPool.Put(a) return nil } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 4a196180db..1e5ea11180 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -15,8 +15,8 @@ package agent import ( "context" + "path/filepath" "strconv" - "sync" "testing" "time" @@ -26,25 +26,70 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/util/testutil" ) -func TestUnsupported(t *testing.T) { - promAgentDir := t.TempDir() +func TestDB_InvalidSeries(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + defer s.Close() - opts := DefaultOptions() - logger := log.NewNopLogger() + app := s.Appender(context.Background()) - s, err := Open(logger, prometheus.NewRegistry(), nil, promAgentDir, opts) + t.Run("Samples", func(t *testing.T) { + _, err := app.Append(0, labels.Labels{}, 0, 0) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels") + + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}, 0, 0) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels") + }) + + t.Run("Exemplars", func(t *testing.T) { + sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0) + require.NoError(t, err, "should not reject valid series") + + _, err = app.AppendExemplar(0, nil, exemplar.Exemplar{}) + require.EqualError(t, err, "unknown series ref when trying to add exemplar: 0") + + e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}} + _, err = app.AppendExemplar(sRef, nil, e) + require.ErrorIs(t, err, tsdb.ErrInvalidExemplar, "should reject duplicate labels") + + e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a_somewhat_long_trace_id", Value: "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa"}}} + _, err = app.AppendExemplar(sRef, nil, e) + require.ErrorIs(t, err, storage.ErrExemplarLabelLength, "should reject too long label length") + + // Inverse check + e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true} + _, err = app.AppendExemplar(sRef, nil, e) + require.NoError(t, err, "should not reject valid exemplars") + }) +} + +func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) *DB { + t.Helper() + + dbDir := t.TempDir() + rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil) + t.Cleanup(func() { + require.NoError(t, rs.Close()) + }) + + db, err := Open(log.NewNopLogger(), reg, rs, dbDir, opts) require.NoError(t, err) - defer func() { - require.NoError(t, s.Close()) - }() + return db +} + +func TestUnsupportedFunctions(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + defer s.Close() t.Run("Querier", func(t *testing.T) { _, err := s.Querier(context.TODO(), 0, 0) @@ -68,93 +113,74 @@ func TestCommit(t *testing.T) { numSeries = 8 ) - promAgentDir := t.TempDir() + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) - opts := DefaultOptions() - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func(rs *remote.Storage) { - require.NoError(t, rs.Close()) - }(remoteStorage) - - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) - - a := s.Appender(context.TODO()) - for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { sample := tsdbutil.GenerateSamples(0, 1) - _, err := a.Append(0, lset, sample[0].T(), sample[0].V()) + ref, err := app.Append(0, lset, sample[0].T(), sample[0].V()) + require.NoError(t, err) + + e := exemplar.Exemplar{ + Labels: lset, + Ts: sample[0].T(), + Value: sample[0].V(), + HasTs: true, + } + _, err = app.AppendExemplar(ref, lset, e) require.NoError(t, err) } } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) - // Read records from WAL and check for expected count of series and samples. - walSeriesCount := 0 - walSamplesCount := 0 - - reg = prometheus.NewRegistry() - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - - s1, err := Open(logger, nil, remoteStorage, promAgentDir, opts) + sr, err := wal.NewSegmentsReader(s.wal.Dir()) require.NoError(t, err) defer func() { - require.NoError(t, s1.Close()) + require.NoError(t, sr.Close()) }() - var dec record.Decoder + // Read records from WAL and check for expected count of series, samples, and exemplars. + var ( + r = wal.NewReader(sr) + dec record.Decoder - if err == nil { - sr, err := wal.NewSegmentsReader(s1.wal.Dir()) - require.NoError(t, err) - defer func() { - require.NoError(t, sr.Close()) - }() + walSeriesCount, walSamplesCount, walExemplarsCount int + ) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + var series []record.RefSeries + series, err = dec.Series(rec, series) + require.NoError(t, err) + walSeriesCount += len(series) - r := wal.NewReader(sr) - seriesPool := sync.Pool{ - New: func() interface{} { - return []record.RefSeries{} - }, - } - samplesPool := sync.Pool{ - New: func() interface{} { - return []record.RefSample{} - }, - } + case record.Samples: + var samples []record.RefSample + samples, err = dec.Samples(rec, samples) + require.NoError(t, err) + walSamplesCount += len(samples) - for r.Next() { - rec := r.Record() - switch dec.Type(rec) { - case record.Series: - series := seriesPool.Get().([]record.RefSeries)[:0] - series, _ = dec.Series(rec, series) - walSeriesCount += len(series) - case record.Samples: - samples := samplesPool.Get().([]record.RefSample)[:0] - samples, _ = dec.Samples(rec, samples) - walSamplesCount += len(samples) - default: - } + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + + default: } } - // Retrieved series count from WAL should match the count of series been added to the WAL. - require.Equal(t, walSeriesCount, numSeries) - - // Retrieved samples count from WAL should match the count of samples been added to the WAL. - require.Equal(t, walSamplesCount, numSeries*numDatapoints) + // Check that the WAL contained the same number of commited series/samples/exemplars. + require.Equal(t, numSeries, walSeriesCount, "unexpected number of series") + require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples") + require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars") } func TestRollback(t *testing.T) { @@ -163,93 +189,68 @@ func TestRollback(t *testing.T) { numSeries = 8 ) - promAgentDir := t.TempDir() + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) - opts := DefaultOptions() - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func(rs *remote.Storage) { - require.NoError(t, rs.Close()) - }(remoteStorage) - - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) - - a := s.Appender(context.TODO()) - for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { sample := tsdbutil.GenerateSamples(0, 1) - _, err := a.Append(0, lset, sample[0].T(), sample[0].V()) + _, err := app.Append(0, lset, sample[0].T(), sample[0].V()) require.NoError(t, err) } } - require.NoError(t, a.Rollback()) + // Do a rollback, which should clear uncommitted data. A followup call to + // commit should persist nothing to the WAL. + require.NoError(t, app.Rollback()) + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) - // Read records from WAL and check for expected count of series and samples. - walSeriesCount := 0 - walSamplesCount := 0 - - reg = prometheus.NewRegistry() - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - - s1, err := Open(logger, nil, remoteStorage, promAgentDir, opts) + sr, err := wal.NewSegmentsReader(s.wal.Dir()) require.NoError(t, err) defer func() { - require.NoError(t, s1.Close()) + require.NoError(t, sr.Close()) }() - var dec record.Decoder + // Read records from WAL and check for expected count of series and samples. + var ( + r = wal.NewReader(sr) + dec record.Decoder - if err == nil { - sr, err := wal.NewSegmentsReader(s1.wal.Dir()) - require.NoError(t, err) - defer func() { - require.NoError(t, sr.Close()) - }() + walSeriesCount, walSamplesCount, walExemplarsCount int + ) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + var series []record.RefSeries + series, err = dec.Series(rec, series) + require.NoError(t, err) + walSeriesCount += len(series) - r := wal.NewReader(sr) - seriesPool := sync.Pool{ - New: func() interface{} { - return []record.RefSeries{} - }, - } - samplesPool := sync.Pool{ - New: func() interface{} { - return []record.RefSample{} - }, - } + case record.Samples: + var samples []record.RefSample + samples, err = dec.Samples(rec, samples) + require.NoError(t, err) + walSamplesCount += len(samples) - for r.Next() { - rec := r.Record() - switch dec.Type(rec) { - case record.Series: - series := seriesPool.Get().([]record.RefSeries)[:0] - series, _ = dec.Series(rec, series) - walSeriesCount += len(series) - case record.Samples: - samples := samplesPool.Get().([]record.RefSample)[:0] - samples, _ = dec.Samples(rec, samples) - walSamplesCount += len(samples) - default: - } + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + + default: } } - // Retrieved series count from WAL should be zero. - require.Equal(t, walSeriesCount, 0) - - // Retrieved samples count from WAL should be zero. - require.Equal(t, walSamplesCount, 0) + // Check that the rollback ensured nothing got stored. + require.Equal(t, 0, walSeriesCount, "series should not have been written to WAL") + require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL") + require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL") } func TestFullTruncateWAL(t *testing.T) { @@ -259,34 +260,25 @@ func TestFullTruncateWAL(t *testing.T) { lastTs = 500 ) - promAgentDir := t.TempDir() - - lbls := labelsForTest(t.Name(), numSeries) + reg := prometheus.NewRegistry() opts := DefaultOptions() opts.TruncateFrequency = time.Minute * 2 - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) + s := createTestAgentDB(t, reg, opts) defer func() { require.NoError(t, s.Close()) }() + app := s.Appender(context.TODO()) - a := s.Appender(context.TODO()) - + lbls := labelsForTest(t.Name(), numSeries) for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { - _, err := a.Append(0, lset, int64(lastTs), 0) + _, err := app.Append(0, lset, int64(lastTs), 0) require.NoError(t, err) } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) } // Truncate WAL with mint to GC all the samples. @@ -302,52 +294,40 @@ func TestPartialTruncateWAL(t *testing.T) { numSeries = 800 ) - promAgentDir := t.TempDir() - opts := DefaultOptions() opts.TruncateFrequency = time.Minute * 2 - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) + reg := prometheus.NewRegistry() + s := createTestAgentDB(t, reg, opts) defer func() { require.NoError(t, s.Close()) }() - - a := s.Appender(context.TODO()) - - var lastTs int64 + app := s.Appender(context.TODO()) // Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500. - lastTs = 500 + var lastTs int64 = 500 lbls := labelsForTest(t.Name()+"batch-1", numSeries) for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { - _, err := a.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, lastTs, 0) require.NoError(t, err) } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) } // Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600. lastTs = 600 - lbls = labelsForTest(t.Name()+"batch-2", numSeries) for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { - _, err := a.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, lastTs, 0) require.NoError(t, err) } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) } // Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series. @@ -364,53 +344,41 @@ func TestWALReplay(t *testing.T) { lastTs = 500 ) - promAgentDir := t.TempDir() + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) - opts := DefaultOptions() - - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) - - a := s.Appender(context.TODO()) - for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { - _, err := a.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, lastTs, 0) require.NoError(t, err) } } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) - restartOpts := DefaultOptions() - restartLogger := log.NewNopLogger() - restartReg := prometheus.NewRegistry() + // Hack: s.wal.Dir() is the /wal subdirectory of the original storage path. + // We need the original directory so we can recreate the storage for replay. + storageDir := filepath.Dir(s.wal.Dir()) - // Open a new DB with the same WAL to check that series from the previous DB - // get replayed. - replayDB, err := Open(restartLogger, restartReg, nil, promAgentDir, restartOpts) - require.NoError(t, err) + reg := prometheus.NewRegistry() + replayStorage, err := Open(s.logger, reg, nil, storageDir, s.opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } defer func() { - require.NoError(t, replayDB.Close()) + require.NoError(t, replayStorage.Close()) }() // Check if all the series are retrieved back from the WAL. - m := gatherFamily(t, restartReg, "prometheus_agent_active_series") + m := gatherFamily(t, reg, "prometheus_agent_active_series") require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count") // Check if lastTs of the samples retrieved from the WAL is retained. - metrics := replayDB.series.series + metrics := replayStorage.series.series for i := 0; i < len(metrics); i++ { mp := metrics[i] for _, v := range mp { diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index 73fcb60099..f30ff96200 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -24,11 +24,26 @@ import ( type memSeries struct { sync.Mutex - ref chunks.HeadSeriesRef - lset labels.Labels + ref chunks.HeadSeriesRef + lset labels.Labels + + // Last recorded timestamp. Used by Storage.gc to determine if a series is + // stale. lastTs int64 } +// updateTimestamp obtains the lock on s and will attempt to update lastTs. +// fails if newTs < lastTs. +func (m *memSeries) updateTimestamp(newTs int64) bool { + m.Lock() + defer m.Unlock() + if newTs >= m.lastTs { + m.lastTs = newTs + return true + } + return false +} + // seriesHashmap is a simple hashmap for memSeries by their label set. // It is built on top of a regular hashmap and holds a slice of series to // resolve hash collisions. Its methods require the hash to be submitted diff --git a/web/ui/module/codemirror-promql/package.json b/web/ui/module/codemirror-promql/package.json index e400a4df73..6acc255c97 100644 --- a/web/ui/module/codemirror-promql/package.json +++ b/web/ui/module/codemirror-promql/package.json @@ -46,7 +46,7 @@ "@types/chai": "^4.2.22", "@types/lru-cache": "^5.1.0", "@types/mocha": "^9.0.0", - "@types/node": "^16.11.9", + "@types/node": "^16.11.10", "@typescript-eslint/eslint-plugin": "^5.3.1", "@typescript-eslint/parser": "^5.3.1", "chai": "^4.2.0", diff --git a/web/ui/package-lock.json b/web/ui/package-lock.json index a4f6408924..8e39eb51db 100644 --- a/web/ui/package-lock.json +++ b/web/ui/package-lock.json @@ -32,7 +32,7 @@ "@types/chai": "^4.2.22", "@types/lru-cache": "^5.1.0", "@types/mocha": "^9.0.0", - "@types/node": "^16.11.9", + "@types/node": "^16.11.10", "@typescript-eslint/eslint-plugin": "^5.3.1", "@typescript-eslint/parser": "^5.3.1", "chai": "^4.2.0", @@ -1775,9 +1775,9 @@ "license": "MIT" }, "node_modules/@types/node": { - "version": "16.11.9", - "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.9.tgz", - "integrity": "sha512-MKmdASMf3LtPzwLyRrFjtFFZ48cMf8jmX5VRYrDQiJa8Ybu5VAmkqBWqKU8fdCwD8ysw4mQ9nrEHvzg6gunR7A==", + "version": "16.11.10", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.10.tgz", + "integrity": "sha512-3aRnHa1KlOEEhJ6+CvyHKK5vE9BcLGjtUpwvqYLRvYNQKMfabu3BwfJaA/SLW8dxe28LsNDjtHwePTuzn3gmOA==", "dev": true }, "node_modules/@types/prop-types": { @@ -7252,7 +7252,7 @@ "@types/flot": "0.0.32", "@types/jest": "^27.0.3", "@types/jquery": "^3.5.9", - "@types/node": "^16.11.9", + "@types/node": "^16.11.10", "@types/react": "^17.0.36", "@types/react-copy-to-clipboard": "^5.0.2", "@types/react-dom": "^17.0.11", @@ -27808,9 +27808,9 @@ "dev": true }, "@types/node": { - "version": "16.11.9", - "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.9.tgz", - "integrity": "sha512-MKmdASMf3LtPzwLyRrFjtFFZ48cMf8jmX5VRYrDQiJa8Ybu5VAmkqBWqKU8fdCwD8ysw4mQ9nrEHvzg6gunR7A==", + "version": "16.11.10", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.10.tgz", + "integrity": "sha512-3aRnHa1KlOEEhJ6+CvyHKK5vE9BcLGjtUpwvqYLRvYNQKMfabu3BwfJaA/SLW8dxe28LsNDjtHwePTuzn3gmOA==", "dev": true }, "@types/prop-types": { @@ -28413,7 +28413,7 @@ "@types/chai": "^4.2.22", "@types/lru-cache": "^5.1.0", "@types/mocha": "^9.0.0", - "@types/node": "^16.11.9", + "@types/node": "^16.11.10", "@typescript-eslint/eslint-plugin": "^5.3.1", "@typescript-eslint/parser": "^5.3.1", "chai": "^4.2.0", @@ -29645,7 +29645,7 @@ "@types/flot": "0.0.32", "@types/jest": "^27.0.3", "@types/jquery": "^3.5.9", - "@types/node": "^16.11.9", + "@types/node": "^16.11.10", "@types/react": "^17.0.36", "@types/react-copy-to-clipboard": "^5.0.2", "@types/react-dom": "^17.0.11", diff --git a/web/ui/react-app/package.json b/web/ui/react-app/package.json index 0a7254be62..2e5b2c5d63 100644 --- a/web/ui/react-app/package.json +++ b/web/ui/react-app/package.json @@ -69,7 +69,7 @@ "@types/flot": "0.0.32", "@types/jest": "^27.0.3", "@types/jquery": "^3.5.9", - "@types/node": "^16.11.9", + "@types/node": "^16.11.10", "@types/react": "^17.0.36", "@types/react-copy-to-clipboard": "^5.0.2", "@types/react-dom": "^17.0.11", diff --git a/web/ui/react-app/public/index.html b/web/ui/react-app/public/index.html index eac493853c..a3f7b7a850 100755 --- a/web/ui/react-app/public/index.html +++ b/web/ui/react-app/public/index.html @@ -15,10 +15,11 @@ It will render a "Consoles" link in the navbar when it is non-empty. - PROMETHEUS_AGENT_MODE is replaced by a boolean indicating if Prometheus is running in agent mode. It true, it will disable querying capacities in the UI and generally adapt the UI to the agent mode. + It has to be represented as a string, because booleans can be mangled to !1 in production builds. -->