diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ad03fa4766..c93523681a 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -16,6 +16,7 @@ package tsdb import ( "errors" "fmt" + "maps" "math" "os" "path/filepath" @@ -50,13 +51,33 @@ type histogramRecord struct { fh *histogram.FloatHistogram } +type seriesRefSet struct { + refs map[chunks.HeadSeriesRef]struct{} + mtx sync.Mutex +} + +func (s *seriesRefSet) merge(other map[chunks.HeadSeriesRef]struct{}) { + s.mtx.Lock() + defer s.mtx.Unlock() + maps.Copy(s.refs, other) +} + +func (s *seriesRefSet) count() int { + s.mtx.Lock() + defer s.mtx.Unlock() + return len(s.refs) +} + func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { - // Track number of samples that referenced a series we don't know about + // Track number of missing series records that were referenced by other records. + unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}} + // Track number of different records that referenced a series we don't know about // for error reporting. - var unknownRefs atomic.Uint64 + var unknownSampleRefs atomic.Uint64 var unknownExemplarRefs atomic.Uint64 var unknownHistogramRefs atomic.Uint64 var unknownMetadataRefs atomic.Uint64 + var unknownTombstoneRefs atomic.Uint64 // Track number of series records that had overlapping m-map chunks. var mmapOverlappingChunks atomic.Uint64 @@ -91,8 +112,9 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch processors[i].setup() go func(wp *walSubsetProcessor) { - unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks) - unknownRefs.Add(unknown) + missingSeries, unknownSamples, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks) + unknownSeriesRefs.merge(missingSeries) + unknownSampleRefs.Add(unknownSamples) mmapOverlappingChunks.Add(overlapping) unknownHistogramRefs.Add(unknownHistograms) wg.Done() @@ -102,12 +124,14 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch wg.Add(1) exemplarsInput = make(chan record.RefExemplar, 300) go func(input <-chan record.RefExemplar) { + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) var err error defer wg.Done() for e := range input { ms := h.series.getByID(e.Ref) if ms == nil { unknownExemplarRefs.Inc() + missingSeries[e.Ref] = struct{}{} continue } @@ -121,6 +145,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch h.logger.Warn("Unexpected error when replaying WAL on exemplar record", "err", err) } } + unknownSeriesRefs.merge(missingSeries) }(exemplarsInput) go func() { @@ -221,6 +246,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch }() // The records are always replayed from the oldest to the newest. + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) Outer: for d := range decoded { switch v := d.(type) { @@ -286,7 +312,8 @@ Outer: continue } if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil { - unknownRefs.Inc() + unknownTombstoneRefs.Inc() + missingSeries[chunks.HeadSeriesRef(s.Ref)] = struct{}{} continue } h.tombstones.AddInterval(s.Ref, itv) @@ -375,6 +402,7 @@ Outer: s := h.series.getByID(m.Ref) if s == nil { unknownMetadataRefs.Inc() + missingSeries[m.Ref] = struct{}{} continue } s.meta = &metadata.Metadata{ @@ -388,6 +416,7 @@ Outer: panic(fmt.Errorf("unexpected decoded type: %T", d)) } } + unknownSeriesRefs.merge(missingSeries) if decodeErr != nil { return decodeErr @@ -410,13 +439,15 @@ Outer: return fmt.Errorf("read records: %w", err) } - if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 { + if unknownSampleRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load()+unknownTombstoneRefs.Load() > 0 { h.logger.Warn( "Unknown series references", - "samples", unknownRefs.Load(), + "series", unknownSeriesRefs.count(), + "samples", unknownSampleRefs.Load(), "exemplars", unknownExemplarRefs.Load(), "histograms", unknownHistogramRefs.Load(), "metadata", unknownMetadataRefs.Load(), + "tombstones", unknownTombstoneRefs.Load(), ) } if count := mmapOverlappingChunks.Load(); count > 0 { @@ -547,10 +578,13 @@ func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord { // processWALSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. // Samples before the minValidTime timestamp are discarded. -func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, mmapOverlappingChunks uint64) { +func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64, uint64) { defer close(wp.output) defer close(wp.histogramsOutput) + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) + var unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks uint64 + minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) appendChunkOpts := chunkOpts{ @@ -572,7 +606,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp for _, s := range in.samples { ms := h.series.getByID(s.Ref) if ms == nil { - unknownRefs++ + unknownSampleRefs++ + missingSeries[s.Ref] = struct{}{} continue } if s.T <= ms.mmMaxTime { @@ -602,6 +637,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp ms := h.series.getByID(s.ref) if ms == nil { unknownHistogramRefs++ + missingSeries[s.ref] = struct{}{} continue } if s.t <= ms.mmMaxTime { @@ -632,13 +668,15 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp } h.updateMinMaxTime(mint, maxt) - return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks + return missingSeries, unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks } func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { - // Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about + // Track number of missing series records that were referenced by other records. + unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}} + // Track number of samples, histogram samples, and m-map markers that referenced a series we don't know about // for error reporting. - var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64 + var unknownSampleRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64 lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. @@ -672,8 +710,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch processors[i].setup() go func(wp *wblSubsetProcessor) { - unknown, unknownHistograms := wp.processWBLSamples(h) - unknownRefs.Add(unknown) + missingSeries, unknownSamples, unknownHistograms := wp.processWBLSamples(h) + unknownSeriesRefs.merge(missingSeries) + unknownSampleRefs.Add(unknownSamples) unknownHistogramRefs.Add(unknownHistograms) wg.Done() }(&processors[i]) @@ -741,6 +780,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch }() // The records are always replayed from the oldest to the newest. + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) for d := range decodedCh { switch v := d.(type) { case []record.RefSample: @@ -793,6 +833,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch ms := h.series.getByID(rm.Ref) if ms == nil { mmapMarkerUnknownRefs.Inc() + missingSeries[rm.Ref] = struct{}{} continue } idx := uint64(ms.ref) % uint64(concurrency) @@ -866,6 +907,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch panic(fmt.Errorf("unexpected decodedCh type: %T", d)) } } + unknownSeriesRefs.merge(missingSeries) if decodeErr != nil { return decodeErr @@ -881,9 +923,16 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return fmt.Errorf("read records: %w", err) } - if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 { - h.logger.Warn("Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load()) + if unknownSampleRefs.Load()+unknownHistogramRefs.Load()+mmapMarkerUnknownRefs.Load() > 0 { + h.logger.Warn( + "Unknown series references for ooo WAL replay", + "series", unknownSeriesRefs.count(), + "samples", unknownSampleRefs.Load(), + "histograms", unknownHistogramRefs.Load(), + "mmap_markers", mmapMarkerUnknownRefs.Load(), + ) } + return nil } @@ -951,10 +1000,13 @@ func (wp *wblSubsetProcessor) reuseHistogramBuf() []histogramRecord { // processWBLSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. -func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) { +func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64) { defer close(wp.output) defer close(wp.histogramsOutput) + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) + var unknownSampleRefs, unknownHistogramRefs uint64 + oooCapMax := h.opts.OutOfOrderCapMax.Load() // We don't check for minValidTime for ooo samples. mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) @@ -971,7 +1023,8 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi for _, s := range in.samples { ms := h.series.getByID(s.Ref) if ms == nil { - unknownRefs++ + unknownSampleRefs++ + missingSeries[s.Ref] = struct{}{} continue } ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) @@ -996,6 +1049,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi ms := h.series.getByID(s.ref) if ms == nil { unknownHistogramRefs++ + missingSeries[s.ref] = struct{}{} continue } var chunkCreated bool @@ -1026,7 +1080,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi h.updateMinOOOMaxOOOTime(mint, maxt) - return unknownRefs, unknownHistogramRefs + return missingSeries, unknownSampleRefs, unknownHistogramRefs } const (