mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
TSDB: Lock around access to labels in head under -tags dedupelabels (#14322)
* TSDB: Document what needs locking in memSeries * TSDB: Lock around access to series labels So we can modify them to reset the symbol-table. * TSDB: Make label locking conditional on build tag --------- Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
9198952f7c
commit
709c5d6fc3
21
tsdb/head.go
21
tsdb/head.go
|
@ -1759,12 +1759,12 @@ type seriesHashmap struct {
|
|||
|
||||
func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
|
||||
if s, found := m.unique[hash]; found {
|
||||
if labels.Equal(s.lset, lset) {
|
||||
if labels.Equal(s.labels(), lset) {
|
||||
return s
|
||||
}
|
||||
}
|
||||
for _, s := range m.conflicts[hash] {
|
||||
if labels.Equal(s.lset, lset) {
|
||||
if labels.Equal(s.labels(), lset) {
|
||||
return s
|
||||
}
|
||||
}
|
||||
|
@ -1772,7 +1772,7 @@ func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
|
|||
}
|
||||
|
||||
func (m *seriesHashmap) set(hash uint64, s *memSeries) {
|
||||
if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) {
|
||||
if existing, found := m.unique[hash]; !found || labels.Equal(existing.labels(), s.labels()) {
|
||||
m.unique[hash] = s
|
||||
return
|
||||
}
|
||||
|
@ -1781,7 +1781,7 @@ func (m *seriesHashmap) set(hash uint64, s *memSeries) {
|
|||
}
|
||||
l := m.conflicts[hash]
|
||||
for i, prev := range l {
|
||||
if labels.Equal(prev.lset, s.lset) {
|
||||
if labels.Equal(prev.labels(), s.labels()) {
|
||||
l[i] = s
|
||||
return
|
||||
}
|
||||
|
@ -1931,7 +1931,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||
s.hashes[hashShard].del(hash, series.ref)
|
||||
delete(s.series[refShard], series.ref)
|
||||
deletedForCallback[series.ref] = series.lset
|
||||
deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function.
|
||||
}
|
||||
|
||||
s.iterForDeletion(check)
|
||||
|
@ -2023,7 +2023,7 @@ func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries fu
|
|||
}
|
||||
// Setting the series in the s.hashes marks the creation of series
|
||||
// as any further calls to this methods would return that series.
|
||||
s.seriesLifecycleCallback.PostCreation(series.lset)
|
||||
s.seriesLifecycleCallback.PostCreation(series.labels())
|
||||
|
||||
i = uint64(series.ref) & uint64(s.size-1)
|
||||
|
||||
|
@ -2064,16 +2064,19 @@ func (s sample) Type() chunkenc.ValueType {
|
|||
// memSeries is the in-memory representation of a series. None of its methods
|
||||
// are goroutine safe and it is the caller's responsibility to lock it.
|
||||
type memSeries struct {
|
||||
sync.Mutex
|
||||
|
||||
// Members up to the Mutex are not changed after construction, so can be accessed without a lock.
|
||||
ref chunks.HeadSeriesRef
|
||||
lset labels.Labels
|
||||
meta *metadata.Metadata
|
||||
|
||||
// Series labels hash to use for sharding purposes. The value is always 0 when sharding has not
|
||||
// been explicitly enabled in TSDB.
|
||||
shardHash uint64
|
||||
|
||||
// Everything after here should only be accessed with the lock held.
|
||||
sync.Mutex
|
||||
|
||||
lset labels.Labels // Locking required with -tags dedupelabels, not otherwise.
|
||||
|
||||
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
|
||||
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:
|
||||
//
|
||||
|
|
|
@ -554,7 +554,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
|
|||
// Ensure no empty labels have gotten through.
|
||||
e.Labels = e.Labels.WithoutEmpty()
|
||||
|
||||
err := a.head.exemplars.ValidateExemplar(s.lset, e)
|
||||
err := a.head.exemplars.ValidateExemplar(s.labels(), e)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrDuplicateExemplar) || errors.Is(err, storage.ErrExemplarsDisabled) {
|
||||
// Duplicate, don't return an error but don't accept the exemplar.
|
||||
|
@ -708,7 +708,7 @@ func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
|
|||
return 0, labels.EmptyLabels()
|
||||
}
|
||||
// returned labels must be suitable to pass to Append()
|
||||
return storage.SeriesRef(s.ref), s.lset
|
||||
return storage.SeriesRef(s.ref), s.labels()
|
||||
}
|
||||
|
||||
// log writes all headAppender's data to the WAL.
|
||||
|
@ -816,7 +816,7 @@ func (a *headAppender) Commit() (err error) {
|
|||
continue
|
||||
}
|
||||
// We don't instrument exemplar appends here, all is instrumented by storage.
|
||||
if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil {
|
||||
if err := a.head.exemplars.AddExemplar(s.labels(), e.exemplar); err != nil {
|
||||
if errors.Is(err, storage.ErrOutOfOrderExemplar) {
|
||||
continue
|
||||
}
|
||||
|
|
27
tsdb/head_dedupelabels.go
Normal file
27
tsdb/head_dedupelabels.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build dedupelabels
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
)
|
||||
|
||||
// Helper method to access labels under lock.
|
||||
func (s *memSeries) labels() labels.Labels {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
return s.lset
|
||||
}
|
25
tsdb/head_other.go
Normal file
25
tsdb/head_other.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !dedupelabels
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
)
|
||||
|
||||
// Helper method to access labels; trivial when not using dedupelabels.
|
||||
func (s *memSeries) labels() labels.Labels {
|
||||
return s.lset
|
||||
}
|
|
@ -142,7 +142,7 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
|||
}
|
||||
|
||||
slices.SortFunc(series, func(a, b *memSeries) int {
|
||||
return labels.Compare(a.lset, b.lset)
|
||||
return labels.Compare(a.labels(), b.labels())
|
||||
})
|
||||
|
||||
// Convert back to list.
|
||||
|
@ -189,7 +189,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
|||
h.head.metrics.seriesNotFound.Inc()
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
builder.Assign(s.lset)
|
||||
builder.Assign(s.labels())
|
||||
|
||||
if chks == nil {
|
||||
return nil
|
||||
|
@ -259,7 +259,7 @@ func (h *headIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef,
|
|||
return "", storage.ErrNotFound
|
||||
}
|
||||
|
||||
value := memSeries.lset.Get(label)
|
||||
value := memSeries.labels().Get(label)
|
||||
if value == "" {
|
||||
return "", storage.ErrNotFound
|
||||
}
|
||||
|
@ -283,7 +283,7 @@ func (h *headIndexReader) LabelNamesFor(ctx context.Context, series index.Postin
|
|||
// when series was garbage collected after the caller got the series IDs.
|
||||
continue
|
||||
}
|
||||
memSeries.lset.Range(func(lbl labels.Label) {
|
||||
memSeries.labels().Range(func(lbl labels.Label) {
|
||||
namesMap[lbl.Name] = struct{}{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
|
||||
// replaying the WAL, so lets just log the error if it's not that type.
|
||||
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
|
||||
err = h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
|
||||
if err != nil && errors.Is(err, storage.ErrOutOfOrderExemplar) {
|
||||
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
|
||||
}
|
||||
|
@ -448,7 +448,7 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
|
|||
) {
|
||||
level.Debug(h.logger).Log(
|
||||
"msg", "M-mapped chunks overlap on a duplicate series record",
|
||||
"series", mSeries.lset.String(),
|
||||
"series", mSeries.labels().String(),
|
||||
"oldref", mSeries.ref,
|
||||
"oldmint", mSeries.mmappedChunks[0].minTime,
|
||||
"oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
|
||||
|
@ -932,7 +932,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
|
|||
|
||||
buf.PutByte(chunkSnapshotRecordTypeSeries)
|
||||
buf.PutBE64(uint64(s.ref))
|
||||
record.EncodeLabels(&buf, s.lset)
|
||||
record.EncodeLabels(&buf, s.labels())
|
||||
buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused.
|
||||
|
||||
s.Lock()
|
||||
|
@ -1485,7 +1485,7 @@ Outer:
|
|||
continue
|
||||
}
|
||||
|
||||
if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{
|
||||
if err := h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{
|
||||
Labels: e.Labels,
|
||||
Value: e.V,
|
||||
Ts: e.T,
|
||||
|
|
|
@ -78,7 +78,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
|||
oh.head.metrics.seriesNotFound.Inc()
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
builder.Assign(s.lset)
|
||||
builder.Assign(s.labels())
|
||||
|
||||
if chks == nil {
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue