mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge 2a09367bca
into 7cbf749096
This commit is contained in:
commit
c8c7dc2282
|
@ -16,6 +16,7 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"maps"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -50,13 +51,33 @@ type histogramRecord struct {
|
||||||
fh *histogram.FloatHistogram
|
fh *histogram.FloatHistogram
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type seriesRefSet struct {
|
||||||
|
refs map[chunks.HeadSeriesRef]struct{}
|
||||||
|
mtx sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seriesRefSet) merge(other map[chunks.HeadSeriesRef]struct{}) {
|
||||||
|
s.mtx.Lock()
|
||||||
|
defer s.mtx.Unlock()
|
||||||
|
maps.Copy(s.refs, other)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seriesRefSet) count() int {
|
||||||
|
s.mtx.Lock()
|
||||||
|
defer s.mtx.Unlock()
|
||||||
|
return len(s.refs)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
|
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
|
||||||
// Track number of samples that referenced a series we don't know about
|
// Track number of missing series records that were referenced by other records.
|
||||||
|
unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}}
|
||||||
|
// Track number of different records that referenced a series we don't know about
|
||||||
// for error reporting.
|
// for error reporting.
|
||||||
var unknownRefs atomic.Uint64
|
var unknownSampleRefs atomic.Uint64
|
||||||
var unknownExemplarRefs atomic.Uint64
|
var unknownExemplarRefs atomic.Uint64
|
||||||
var unknownHistogramRefs atomic.Uint64
|
var unknownHistogramRefs atomic.Uint64
|
||||||
var unknownMetadataRefs atomic.Uint64
|
var unknownMetadataRefs atomic.Uint64
|
||||||
|
var unknownTombstoneRefs atomic.Uint64
|
||||||
// Track number of series records that had overlapping m-map chunks.
|
// Track number of series records that had overlapping m-map chunks.
|
||||||
var mmapOverlappingChunks atomic.Uint64
|
var mmapOverlappingChunks atomic.Uint64
|
||||||
|
|
||||||
|
@ -91,8 +112,9 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
processors[i].setup()
|
processors[i].setup()
|
||||||
|
|
||||||
go func(wp *walSubsetProcessor) {
|
go func(wp *walSubsetProcessor) {
|
||||||
unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
|
missingSeries, unknownSamples, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
|
||||||
unknownRefs.Add(unknown)
|
unknownSeriesRefs.merge(missingSeries)
|
||||||
|
unknownSampleRefs.Add(unknownSamples)
|
||||||
mmapOverlappingChunks.Add(overlapping)
|
mmapOverlappingChunks.Add(overlapping)
|
||||||
unknownHistogramRefs.Add(unknownHistograms)
|
unknownHistogramRefs.Add(unknownHistograms)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -102,12 +124,14 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
exemplarsInput = make(chan record.RefExemplar, 300)
|
exemplarsInput = make(chan record.RefExemplar, 300)
|
||||||
go func(input <-chan record.RefExemplar) {
|
go func(input <-chan record.RefExemplar) {
|
||||||
|
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||||
var err error
|
var err error
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for e := range input {
|
for e := range input {
|
||||||
ms := h.series.getByID(e.Ref)
|
ms := h.series.getByID(e.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
unknownExemplarRefs.Inc()
|
unknownExemplarRefs.Inc()
|
||||||
|
missingSeries[e.Ref] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,6 +145,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
h.logger.Warn("Unexpected error when replaying WAL on exemplar record", "err", err)
|
h.logger.Warn("Unexpected error when replaying WAL on exemplar record", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
unknownSeriesRefs.merge(missingSeries)
|
||||||
}(exemplarsInput)
|
}(exemplarsInput)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -221,6 +246,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// The records are always replayed from the oldest to the newest.
|
// The records are always replayed from the oldest to the newest.
|
||||||
|
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||||
Outer:
|
Outer:
|
||||||
for d := range decoded {
|
for d := range decoded {
|
||||||
switch v := d.(type) {
|
switch v := d.(type) {
|
||||||
|
@ -286,7 +312,8 @@ Outer:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
|
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
|
||||||
unknownRefs.Inc()
|
unknownTombstoneRefs.Inc()
|
||||||
|
missingSeries[chunks.HeadSeriesRef(s.Ref)] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
h.tombstones.AddInterval(s.Ref, itv)
|
h.tombstones.AddInterval(s.Ref, itv)
|
||||||
|
@ -375,6 +402,7 @@ Outer:
|
||||||
s := h.series.getByID(m.Ref)
|
s := h.series.getByID(m.Ref)
|
||||||
if s == nil {
|
if s == nil {
|
||||||
unknownMetadataRefs.Inc()
|
unknownMetadataRefs.Inc()
|
||||||
|
missingSeries[m.Ref] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.meta = &metadata.Metadata{
|
s.meta = &metadata.Metadata{
|
||||||
|
@ -388,6 +416,7 @@ Outer:
|
||||||
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
unknownSeriesRefs.merge(missingSeries)
|
||||||
|
|
||||||
if decodeErr != nil {
|
if decodeErr != nil {
|
||||||
return decodeErr
|
return decodeErr
|
||||||
|
@ -410,13 +439,15 @@ Outer:
|
||||||
return fmt.Errorf("read records: %w", err)
|
return fmt.Errorf("read records: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 {
|
if unknownSampleRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load()+unknownTombstoneRefs.Load() > 0 {
|
||||||
h.logger.Warn(
|
h.logger.Warn(
|
||||||
"Unknown series references",
|
"Unknown series references",
|
||||||
"samples", unknownRefs.Load(),
|
"series", unknownSeriesRefs.count(),
|
||||||
|
"samples", unknownSampleRefs.Load(),
|
||||||
"exemplars", unknownExemplarRefs.Load(),
|
"exemplars", unknownExemplarRefs.Load(),
|
||||||
"histograms", unknownHistogramRefs.Load(),
|
"histograms", unknownHistogramRefs.Load(),
|
||||||
"metadata", unknownMetadataRefs.Load(),
|
"metadata", unknownMetadataRefs.Load(),
|
||||||
|
"tombstones", unknownTombstoneRefs.Load(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if count := mmapOverlappingChunks.Load(); count > 0 {
|
if count := mmapOverlappingChunks.Load(); count > 0 {
|
||||||
|
@ -547,10 +578,13 @@ func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord {
|
||||||
// processWALSamples adds the samples it receives to the head and passes
|
// processWALSamples adds the samples it receives to the head and passes
|
||||||
// the buffer received to an output channel for reuse.
|
// the buffer received to an output channel for reuse.
|
||||||
// Samples before the minValidTime timestamp are discarded.
|
// Samples before the minValidTime timestamp are discarded.
|
||||||
func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, mmapOverlappingChunks uint64) {
|
func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64, uint64) {
|
||||||
defer close(wp.output)
|
defer close(wp.output)
|
||||||
defer close(wp.histogramsOutput)
|
defer close(wp.histogramsOutput)
|
||||||
|
|
||||||
|
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||||
|
var unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks uint64
|
||||||
|
|
||||||
minValidTime := h.minValidTime.Load()
|
minValidTime := h.minValidTime.Load()
|
||||||
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
||||||
appendChunkOpts := chunkOpts{
|
appendChunkOpts := chunkOpts{
|
||||||
|
@ -572,7 +606,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||||
for _, s := range in.samples {
|
for _, s := range in.samples {
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
unknownRefs++
|
unknownSampleRefs++
|
||||||
|
missingSeries[s.Ref] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if s.T <= ms.mmMaxTime {
|
if s.T <= ms.mmMaxTime {
|
||||||
|
@ -602,6 +637,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||||
ms := h.series.getByID(s.ref)
|
ms := h.series.getByID(s.ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
unknownHistogramRefs++
|
unknownHistogramRefs++
|
||||||
|
missingSeries[s.ref] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if s.t <= ms.mmMaxTime {
|
if s.t <= ms.mmMaxTime {
|
||||||
|
@ -632,13 +668,15 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||||
}
|
}
|
||||||
h.updateMinMaxTime(mint, maxt)
|
h.updateMinMaxTime(mint, maxt)
|
||||||
|
|
||||||
return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks
|
return missingSeries, unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
||||||
// Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about
|
// Track number of missing series records that were referenced by other records.
|
||||||
|
unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}}
|
||||||
|
// Track number of samples, histogram samples, and m-map markers that referenced a series we don't know about
|
||||||
// for error reporting.
|
// for error reporting.
|
||||||
var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64
|
var unknownSampleRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64
|
||||||
|
|
||||||
lastSeq, lastOff := lastMmapRef.Unpack()
|
lastSeq, lastOff := lastMmapRef.Unpack()
|
||||||
// Start workers that each process samples for a partition of the series ID space.
|
// Start workers that each process samples for a partition of the series ID space.
|
||||||
|
@ -672,8 +710,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
processors[i].setup()
|
processors[i].setup()
|
||||||
|
|
||||||
go func(wp *wblSubsetProcessor) {
|
go func(wp *wblSubsetProcessor) {
|
||||||
unknown, unknownHistograms := wp.processWBLSamples(h)
|
missingSeries, unknownSamples, unknownHistograms := wp.processWBLSamples(h)
|
||||||
unknownRefs.Add(unknown)
|
unknownSeriesRefs.merge(missingSeries)
|
||||||
|
unknownSampleRefs.Add(unknownSamples)
|
||||||
unknownHistogramRefs.Add(unknownHistograms)
|
unknownHistogramRefs.Add(unknownHistograms)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(&processors[i])
|
}(&processors[i])
|
||||||
|
@ -741,6 +780,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// The records are always replayed from the oldest to the newest.
|
// The records are always replayed from the oldest to the newest.
|
||||||
|
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||||
for d := range decodedCh {
|
for d := range decodedCh {
|
||||||
switch v := d.(type) {
|
switch v := d.(type) {
|
||||||
case []record.RefSample:
|
case []record.RefSample:
|
||||||
|
@ -793,6 +833,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
ms := h.series.getByID(rm.Ref)
|
ms := h.series.getByID(rm.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
mmapMarkerUnknownRefs.Inc()
|
mmapMarkerUnknownRefs.Inc()
|
||||||
|
missingSeries[rm.Ref] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
idx := uint64(ms.ref) % uint64(concurrency)
|
idx := uint64(ms.ref) % uint64(concurrency)
|
||||||
|
@ -866,6 +907,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
|
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
unknownSeriesRefs.merge(missingSeries)
|
||||||
|
|
||||||
if decodeErr != nil {
|
if decodeErr != nil {
|
||||||
return decodeErr
|
return decodeErr
|
||||||
|
@ -881,9 +923,16 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
return fmt.Errorf("read records: %w", err)
|
return fmt.Errorf("read records: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
|
if unknownSampleRefs.Load()+unknownHistogramRefs.Load()+mmapMarkerUnknownRefs.Load() > 0 {
|
||||||
h.logger.Warn("Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load())
|
h.logger.Warn(
|
||||||
|
"Unknown series references for ooo WAL replay",
|
||||||
|
"series", unknownSeriesRefs.count(),
|
||||||
|
"samples", unknownSampleRefs.Load(),
|
||||||
|
"histograms", unknownHistogramRefs.Load(),
|
||||||
|
"mmap_markers", mmapMarkerUnknownRefs.Load(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -951,10 +1000,13 @@ func (wp *wblSubsetProcessor) reuseHistogramBuf() []histogramRecord {
|
||||||
|
|
||||||
// processWBLSamples adds the samples it receives to the head and passes
|
// processWBLSamples adds the samples it receives to the head and passes
|
||||||
// the buffer received to an output channel for reuse.
|
// the buffer received to an output channel for reuse.
|
||||||
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) {
|
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64) {
|
||||||
defer close(wp.output)
|
defer close(wp.output)
|
||||||
defer close(wp.histogramsOutput)
|
defer close(wp.histogramsOutput)
|
||||||
|
|
||||||
|
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||||
|
var unknownSampleRefs, unknownHistogramRefs uint64
|
||||||
|
|
||||||
oooCapMax := h.opts.OutOfOrderCapMax.Load()
|
oooCapMax := h.opts.OutOfOrderCapMax.Load()
|
||||||
// We don't check for minValidTime for ooo samples.
|
// We don't check for minValidTime for ooo samples.
|
||||||
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
||||||
|
@ -971,7 +1023,8 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
|
||||||
for _, s := range in.samples {
|
for _, s := range in.samples {
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
unknownRefs++
|
unknownSampleRefs++
|
||||||
|
missingSeries[s.Ref] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||||
|
@ -996,6 +1049,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
|
||||||
ms := h.series.getByID(s.ref)
|
ms := h.series.getByID(s.ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
unknownHistogramRefs++
|
unknownHistogramRefs++
|
||||||
|
missingSeries[s.ref] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var chunkCreated bool
|
var chunkCreated bool
|
||||||
|
@ -1026,7 +1080,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
|
||||||
|
|
||||||
h.updateMinOOOMaxOOOTime(mint, maxt)
|
h.updateMinOOOMaxOOOTime(mint, maxt)
|
||||||
|
|
||||||
return unknownRefs, unknownHistogramRefs
|
return missingSeries, unknownSampleRefs, unknownHistogramRefs
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
Loading…
Reference in a new issue