// Copyright 2021 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tsdb import ( "errors" "fmt" "math" "os" "path/filepath" "strconv" "strings" "sync" "time" "github.com/go-kit/log/level" "go.uber.org/atomic" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/zeropool" ) // histogramRecord combines both RefHistogramSample and RefFloatHistogramSample // to simplify the WAL replay. type histogramRecord struct { ref chunks.HeadSeriesRef t int64 h *histogram.Histogram fh *histogram.FloatHistogram } func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 var unknownExemplarRefs atomic.Uint64 var unknownHistogramRefs atomic.Uint64 var unknownMetadataRefs atomic.Uint64 // Track number of series records that had overlapping m-map chunks. var mmapOverlappingChunks atomic.Uint64 // Start workers that each process samples for a partition of the series ID space. var ( wg sync.WaitGroup concurrency = h.opts.WALReplayConcurrency processors = make([]walSubsetProcessor, concurrency) exemplarsInput chan record.RefExemplar shards = make([][]record.RefSample, concurrency) histogramShards = make([][]histogramRecord, concurrency) decoded = make(chan interface{}, 10) decodeErr, seriesCreationErr error seriesPool zeropool.Pool[[]record.RefSeries] samplesPool zeropool.Pool[[]record.RefSample] tstonesPool zeropool.Pool[[]tombstones.Stone] exemplarsPool zeropool.Pool[[]record.RefExemplar] histogramsPool zeropool.Pool[[]record.RefHistogramSample] floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] metadataPool zeropool.Pool[[]record.RefMetadata] ) defer func() { // For CorruptionErr ensure to terminate all workers before exiting. _, ok := err.(*wlog.CorruptionErr) if ok || seriesCreationErr != nil { for i := 0; i < concurrency; i++ { processors[i].closeAndDrain() } close(exemplarsInput) wg.Wait() } }() wg.Add(concurrency) for i := 0; i < concurrency; i++ { processors[i].setup() go func(wp *walSubsetProcessor) { unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks) unknownRefs.Add(unknown) mmapOverlappingChunks.Add(overlapping) unknownHistogramRefs.Add(unknownHistograms) wg.Done() }(&processors[i]) } wg.Add(1) exemplarsInput = make(chan record.RefExemplar, 300) go func(input <-chan record.RefExemplar) { var err error defer wg.Done() for e := range input { ms := h.series.getByID(e.Ref) if ms == nil { unknownExemplarRefs.Inc() continue } if e.T < h.minValidTime.Load() { continue } // At the moment the only possible error here is out of order exemplars, which we shouldn't see when // replaying the WAL, so lets just log the error if it's not that type. err = h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) if err != nil && errors.Is(err, storage.ErrOutOfOrderExemplar) { level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err) } } }(exemplarsInput) go func() { defer close(decoded) var err error dec := record.NewDecoder(syms) for r.Next() { rec := r.Record() switch dec.Type(rec) { case record.Series: series := seriesPool.Get()[:0] series, err = dec.Series(rec, series) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode series: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- series case record.Samples: samples := samplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode samples: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- samples case record.Tombstones: tstones := tstonesPool.Get()[:0] tstones, err = dec.Tombstones(rec, tstones) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode tombstones: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- tstones case record.Exemplars: exemplars := exemplarsPool.Get()[:0] exemplars, err = dec.Exemplars(rec, exemplars) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode exemplars: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- exemplars case record.HistogramSamples: hists := histogramsPool.Get()[:0] hists, err = dec.HistogramSamples(rec, hists) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode histograms: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- hists case record.FloatHistogramSamples: hists := floatHistogramsPool.Get()[:0] hists, err = dec.FloatHistogramSamples(rec, hists) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode float histograms: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- hists case record.Metadata: meta := metadataPool.Get()[:0] meta, err := dec.Metadata(rec, meta) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode metadata: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- meta default: // Noop. } } }() // The records are always replayed from the oldest to the newest. Outer: for d := range decoded { switch v := d.(type) { case []record.RefSeries: for _, walSeries := range v { mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels) if err != nil { seriesCreationErr = err break Outer } if chunks.HeadSeriesRef(h.lastSeriesID.Load()) < walSeries.Ref { h.lastSeriesID.Store(uint64(walSeries.Ref)) } if !created { multiRef[walSeries.Ref] = mSeries.ref } idx := uint64(mSeries.ref) % uint64(concurrency) processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries} } seriesPool.Put(v) case []record.RefSample: samples := v minValidTime := h.minValidTime.Load() // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { m := 5000 if len(samples) < m { m = len(samples) } for i := 0; i < concurrency; i++ { if shards[i] == nil { shards[i] = processors[i].reuseBuf() } } for _, sam := range samples[:m] { if sam.T < minValidTime { continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) shards[mod] = append(shards[mod], sam) } for i := 0; i < concurrency; i++ { if len(shards[i]) > 0 { processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]} shards[i] = nil } } samples = samples[m:] } samplesPool.Put(v) case []tombstones.Stone: for _, s := range v { for _, itv := range s.Intervals { if itv.Maxt < h.minValidTime.Load() { continue } if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil { unknownRefs.Inc() continue } h.tombstones.AddInterval(s.Ref, itv) } } tstonesPool.Put(v) case []record.RefExemplar: for _, e := range v { exemplarsInput <- e } exemplarsPool.Put(v) case []record.RefHistogramSample: samples := v minValidTime := h.minValidTime.Load() // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { m := 5000 if len(samples) < m { m = len(samples) } for i := 0; i < concurrency; i++ { if histogramShards[i] == nil { histogramShards[i] = processors[i].reuseHistogramBuf() } } for _, sam := range samples[:m] { if sam.T < minValidTime { continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) } for i := 0; i < concurrency; i++ { if len(histogramShards[i]) > 0 { processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]} histogramShards[i] = nil } } samples = samples[m:] } histogramsPool.Put(v) case []record.RefFloatHistogramSample: samples := v minValidTime := h.minValidTime.Load() // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { m := 5000 if len(samples) < m { m = len(samples) } for i := 0; i < concurrency; i++ { if histogramShards[i] == nil { histogramShards[i] = processors[i].reuseHistogramBuf() } } for _, sam := range samples[:m] { if sam.T < minValidTime { continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) } for i := 0; i < concurrency; i++ { if len(histogramShards[i]) > 0 { processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]} histogramShards[i] = nil } } samples = samples[m:] } floatHistogramsPool.Put(v) case []record.RefMetadata: for _, m := range v { s := h.series.getByID(m.Ref) if s == nil { unknownMetadataRefs.Inc() continue } s.meta = &metadata.Metadata{ Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help, } } metadataPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } } if decodeErr != nil { return decodeErr } if seriesCreationErr != nil { // Drain the channel to unblock the goroutine. for range decoded { } return seriesCreationErr } // Signal termination to each worker and wait for it to close its output channel. for i := 0; i < concurrency; i++ { processors[i].closeAndDrain() } close(exemplarsInput) wg.Wait() if err := r.Err(); err != nil { return fmt.Errorf("read records: %w", err) } if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 { level.Warn(h.logger).Log( "msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load(), "histograms", unknownHistogramRefs.Load(), "metadata", unknownMetadataRefs.Load(), ) } if count := mmapOverlappingChunks.Load(); count > 0 { level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", count) } return nil } func minInt64() int64 { return math.MinInt64 } // resetSeriesWithMMappedChunks is only used during the WAL replay. func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) { if mSeries.ref != walSeriesRef { // Checking if the new m-mapped chunks overlap with the already existing ones. if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { if overlapsClosedInterval( mSeries.mmappedChunks[0].minTime, mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, mmc[0].minTime, mmc[len(mmc)-1].maxTime, ) { level.Debug(h.logger).Log( "msg", "M-mapped chunks overlap on a duplicate series record", "series", mSeries.labels().String(), "oldref", mSeries.ref, "oldmint", mSeries.mmappedChunks[0].minTime, "oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, "newref", walSeriesRef, "newmint", mmc[0].minTime, "newmaxt", mmc[len(mmc)-1].maxTime, ) overlapped = true } } } h.metrics.chunksCreated.Add(float64(len(mmc) + len(oooMmc))) h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) h.metrics.chunks.Add(float64(len(mmc) + len(oooMmc) - len(mSeries.mmappedChunks))) if mSeries.ooo != nil { h.metrics.chunksRemoved.Add(float64(len(mSeries.ooo.oooMmappedChunks))) h.metrics.chunks.Sub(float64(len(mSeries.ooo.oooMmappedChunks))) } mSeries.mmappedChunks = mmc if len(oooMmc) == 0 { mSeries.ooo = nil } else { if mSeries.ooo == nil { mSeries.ooo = &memSeriesOOOFields{} } *mSeries.ooo = memSeriesOOOFields{oooMmappedChunks: oooMmc} } // Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject. if len(mmc) == 0 { mSeries.shardHashOrMemoryMappedMaxTime = uint64(minInt64()) } else { mmMaxTime := mmc[len(mmc)-1].maxTime mSeries.shardHashOrMemoryMappedMaxTime = uint64(mmMaxTime) h.updateMinMaxTime(mmc[0].minTime, mmMaxTime) } if len(oooMmc) != 0 { // Mint and maxt can be in any chunk, they are not sorted. mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for _, ch := range oooMmc { if ch.minTime < mint { mint = ch.minTime } if ch.maxTime > maxt { maxt = ch.maxTime } } h.updateMinOOOMaxOOOTime(mint, maxt) } // Any samples replayed till now would already be compacted. Resetting the head chunk. mSeries.nextAt = 0 mSeries.headChunks = nil mSeries.app = nil return } type walSubsetProcessor struct { input chan walSubsetProcessorInputItem output chan []record.RefSample histogramsOutput chan []histogramRecord } type walSubsetProcessorInputItem struct { samples []record.RefSample histogramSamples []histogramRecord existingSeries *memSeries walSeriesRef chunks.HeadSeriesRef } func (wp *walSubsetProcessor) setup() { wp.input = make(chan walSubsetProcessorInputItem, 300) wp.output = make(chan []record.RefSample, 300) wp.histogramsOutput = make(chan []histogramRecord, 300) } func (wp *walSubsetProcessor) closeAndDrain() { close(wp.input) for range wp.output { } for range wp.histogramsOutput { } } // If there is a buffer in the output chan, return it for reuse, otherwise return nil. func (wp *walSubsetProcessor) reuseBuf() []record.RefSample { select { case buf := <-wp.output: return buf[:0] default: } return nil } // If there is a buffer in the output chan, return it for reuse, otherwise return nil. func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord { select { case buf := <-wp.histogramsOutput: return buf[:0] default: } return nil } // processWALSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. // Samples before the minValidTime timestamp are discarded. func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, mmapOverlappingChunks uint64) { defer close(wp.output) defer close(wp.histogramsOutput) minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) appendChunkOpts := chunkOpts{ chunkDiskMapper: h.chunkDiskMapper, chunkRange: h.chunkRange.Load(), samplesPerChunk: h.opts.SamplesPerChunk, } for in := range wp.input { if in.existingSeries != nil { mmc := mmappedChunks[in.walSeriesRef] oooMmc := oooMmappedChunks[in.walSeriesRef] if h.resetSeriesWithMMappedChunks(in.existingSeries, mmc, oooMmc, in.walSeriesRef) { mmapOverlappingChunks++ } continue } for _, s := range in.samples { ms := h.series.getByID(s.Ref) if ms == nil { unknownRefs++ continue } if s.T <= ms.mmMaxTime() { continue } if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() _ = ms.mmapChunks(h.chunkDiskMapper) } if s.T > maxt { maxt = s.T } if s.T < mint { mint = s.T } } select { case wp.output <- in.samples: default: } for _, s := range in.histogramSamples { if s.t < minValidTime { continue } ms := h.series.getByID(s.ref) if ms == nil { unknownHistogramRefs++ continue } if s.t <= ms.mmMaxTime() { continue } var chunkCreated bool if s.h != nil { _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) } else { _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) } if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } if s.t > maxt { maxt = s.t } if s.t < mint { mint = s.t } } select { case wp.histogramsOutput <- in.histogramSamples: default: } } h.updateMinMaxTime(mint, maxt) return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks } func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { // Track number of samples, m-map markers, that referenced a series we don't know about // for error reporting. var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64 lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. var ( wg sync.WaitGroup concurrency = h.opts.WALReplayConcurrency processors = make([]wblSubsetProcessor, concurrency) dec = record.NewDecoder(syms) shards = make([][]record.RefSample, concurrency) decodedCh = make(chan interface{}, 10) decodeErr error samplesPool = sync.Pool{ New: func() interface{} { return []record.RefSample{} }, } markersPool = sync.Pool{ New: func() interface{} { return []record.RefMmapMarker{} }, } ) defer func() { // For CorruptionErr ensure to terminate all workers before exiting. // We also wrap it to identify OOO WBL corruption. _, ok := err.(*wlog.CorruptionErr) if ok { err = &errLoadWbl{err: err} for i := 0; i < concurrency; i++ { processors[i].closeAndDrain() } wg.Wait() } }() wg.Add(concurrency) for i := 0; i < concurrency; i++ { processors[i].setup() go func(wp *wblSubsetProcessor) { unknown := wp.processWBLSamples(h) unknownRefs.Add(unknown) wg.Done() }(&processors[i]) } go func() { defer close(decodedCh) for r.Next() { rec := r.Record() switch dec.Type(rec) { case record.Samples: samples := samplesPool.Get().([]record.RefSample)[:0] samples, err = dec.Samples(rec, samples) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode samples: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decodedCh <- samples case record.MmapMarkers: markers := markersPool.Get().([]record.RefMmapMarker)[:0] markers, err = dec.MmapMarkers(rec, markers) if err != nil { decodeErr = &wlog.CorruptionErr{ Err: fmt.Errorf("decode mmap markers: %w", err), Segment: r.Segment(), Offset: r.Offset(), } return } decodedCh <- markers default: // Noop. } } }() // The records are always replayed from the oldest to the newest. for d := range decodedCh { switch v := d.(type) { case []record.RefSample: samples := v // We split up the samples into parts of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { m := 5000 if len(samples) < m { m = len(samples) } for i := 0; i < concurrency; i++ { if shards[i] == nil { shards[i] = processors[i].reuseBuf() } } for _, sam := range samples[:m] { if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) shards[mod] = append(shards[mod], sam) } for i := 0; i < concurrency; i++ { if len(shards[i]) > 0 { processors[i].input <- wblSubsetProcessorInputItem{samples: shards[i]} shards[i] = nil } } samples = samples[m:] } samplesPool.Put(d) case []record.RefMmapMarker: markers := v for _, rm := range markers { seq, off := rm.MmapRef.Unpack() if seq > lastSeq || (seq == lastSeq && off > lastOff) { // This m-map chunk from markers was not present during // the load of mmapped chunks that happened in the head // initialization. continue } if r, ok := multiRef[rm.Ref]; ok { rm.Ref = r } ms := h.series.getByID(rm.Ref) if ms == nil { mmapMarkerUnknownRefs.Inc() continue } idx := uint64(ms.ref) % uint64(concurrency) processors[idx].input <- wblSubsetProcessorInputItem{mmappedSeries: ms} } default: panic(fmt.Errorf("unexpected decodedCh type: %T", d)) } } if decodeErr != nil { return decodeErr } // Signal termination to each worker and wait for it to close its output channel. for i := 0; i < concurrency; i++ { processors[i].closeAndDrain() } wg.Wait() if err := r.Err(); err != nil { return fmt.Errorf("read records: %w", err) } if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 { level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load()) } return nil } type errLoadWbl struct { err error } func (e errLoadWbl) Error() string { return e.err.Error() } func (e errLoadWbl) Cause() error { return e.err } func (e errLoadWbl) Unwrap() error { return e.err } type wblSubsetProcessor struct { input chan wblSubsetProcessorInputItem output chan []record.RefSample } type wblSubsetProcessorInputItem struct { mmappedSeries *memSeries samples []record.RefSample } func (wp *wblSubsetProcessor) setup() { wp.output = make(chan []record.RefSample, 300) wp.input = make(chan wblSubsetProcessorInputItem, 300) } func (wp *wblSubsetProcessor) closeAndDrain() { close(wp.input) for range wp.output { } } // If there is a buffer in the output chan, return it for reuse, otherwise return nil. func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample { select { case buf := <-wp.output: return buf[:0] default: } return nil } // processWBLSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { defer close(wp.output) oooCapMax := h.opts.OutOfOrderCapMax.Load() // We don't check for minValidTime for ooo samples. mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for in := range wp.input { if in.mmappedSeries != nil && in.mmappedSeries.ooo != nil { // All samples till now have been m-mapped. Hence clear out the headChunk. // In case some samples slipped through and went into m-map chunks because of changed // chunk size parameters, we are not taking care of that here. // TODO(codesome): see if there is a way to avoid duplicate m-map chunks if // the size of ooo chunk was reduced between restart. in.mmappedSeries.ooo.oooHeadChunk = nil continue } for _, s := range in.samples { ms := h.series.getByID(s.Ref) if ms == nil { unknownRefs++ continue } ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } if ok { if s.T < mint { mint = s.T } if s.T > maxt { maxt = s.T } } } select { case wp.output <- in.samples: default: } } h.updateMinOOOMaxOOOTime(mint, maxt) return unknownRefs } const ( chunkSnapshotRecordTypeSeries uint8 = 1 chunkSnapshotRecordTypeTombstones uint8 = 2 chunkSnapshotRecordTypeExemplars uint8 = 3 ) type chunkSnapshotRecord struct { ref chunks.HeadSeriesRef lset labels.Labels mc *memChunk lastValue float64 lastHistogramValue *histogram.Histogram lastFloatHistogramValue *histogram.FloatHistogram } func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf := encoding.Encbuf{B: b} buf.PutByte(chunkSnapshotRecordTypeSeries) buf.PutBE64(uint64(s.ref)) record.EncodeLabels(&buf, s.labels()) buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused. s.Lock() if s.headChunks == nil { buf.PutUvarint(0) } else { enc := s.headChunks.chunk.Encoding() buf.PutUvarint(1) buf.PutBE64int64(s.headChunks.minTime) buf.PutBE64int64(s.headChunks.maxTime) buf.PutByte(byte(enc)) buf.PutUvarintBytes(s.headChunks.chunk.Bytes()) switch enc { case chunkenc.EncXOR: // Backwards compatibility for old sampleBuf which had last 4 samples. for i := 0; i < 3; i++ { buf.PutBE64int64(0) buf.PutBEFloat64(0) } buf.PutBE64int64(0) buf.PutBEFloat64(s.lastValue) case chunkenc.EncHistogram: record.EncodeHistogram(&buf, s.lastHistogramValue) default: // chunkenc.FloatHistogram. record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue) } } s.Unlock() return buf.Get() } func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapshotRecord, err error) { dec := encoding.Decbuf{B: b} if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries { return csr, fmt.Errorf("invalid record type %x", flag) } csr.ref = chunks.HeadSeriesRef(dec.Be64()) // The label set written to the disk is already sorted. // TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it. csr.lset = d.DecodeLabels(&dec) _ = dec.Be64int64() // Was chunkRange but now unused. if dec.Uvarint() == 0 { return } csr.mc = &memChunk{} csr.mc.minTime = dec.Be64int64() csr.mc.maxTime = dec.Be64int64() enc := chunkenc.Encoding(dec.Byte()) // The underlying bytes gets re-used later, so make a copy. chunkBytes := dec.UvarintBytes() chunkBytesCopy := make([]byte, len(chunkBytes)) copy(chunkBytesCopy, chunkBytes) chk, err := chunkenc.FromData(enc, chunkBytesCopy) if err != nil { return csr, fmt.Errorf("chunk from data: %w", err) } csr.mc.chunk = chk switch enc { case chunkenc.EncXOR: // Backwards-compatibility for old sampleBuf which had last 4 samples. for i := 0; i < 3; i++ { _ = dec.Be64int64() _ = dec.Be64Float64() } _ = dec.Be64int64() csr.lastValue = dec.Be64Float64() case chunkenc.EncHistogram: csr.lastHistogramValue = &histogram.Histogram{} record.DecodeHistogram(&dec, csr.lastHistogramValue) default: // chunkenc.FloatHistogram. csr.lastFloatHistogramValue = &histogram.FloatHistogram{} record.DecodeFloatHistogram(&dec, csr.lastFloatHistogramValue) } err = dec.Err() if err != nil && len(dec.B) > 0 { err = fmt.Errorf("unexpected %d bytes left in entry", len(dec.B)) } return } func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) { buf := encoding.Encbuf{} buf.PutByte(chunkSnapshotRecordTypeTombstones) b, err := tombstones.Encode(tr) if err != nil { return nil, fmt.Errorf("encode tombstones: %w", err) } buf.PutUvarintBytes(b) return buf.Get(), nil } func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) { dec := encoding.Decbuf{B: b} if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones { return nil, fmt.Errorf("invalid record type %x", flag) } tr, err := tombstones.Decode(dec.UvarintBytes()) if err != nil { return tr, fmt.Errorf("decode tombstones: %w", err) } return tr, nil } const chunkSnapshotPrefix = "chunk_snapshot." // ChunkSnapshot creates a snapshot of all the series and tombstones in the head. // It deletes the old chunk snapshots if the chunk snapshot creation is successful. // // The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written // using the WAL package. N is the last WAL segment present during snapshotting and // M is the offset in segment N upto which data was written. // // The snapshot first contains all series (each in individual records and not sorted), followed by // tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they // were written to the circular buffer. func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { if h.wal == nil { // If we are not storing any WAL, does not make sense to take a snapshot too. level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled") return &ChunkSnapshotStats{}, nil } h.chunkSnapshotMtx.Lock() defer h.chunkSnapshotMtx.Unlock() stats := &ChunkSnapshotStats{} wlast, woffset, err := h.wal.LastSegmentAndOffset() if err != nil && !errors.Is(err, record.ErrNotFound) { return stats, fmt.Errorf("get last wal segment and offset: %w", err) } _, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) if err != nil && !errors.Is(err, record.ErrNotFound) { return stats, fmt.Errorf("find last chunk snapshot: %w", err) } if wlast == cslast && woffset == csoffset { // Nothing has been written to the WAL/Head since the last snapshot. return stats, nil } snapshotName := chunkSnapshotDir(wlast, woffset) cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName) cpdirtmp := cpdir + ".tmp" stats.Dir = cpdir if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return stats, fmt.Errorf("create chunk snapshot dir: %w", err) } cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionType()) if err != nil { return stats, fmt.Errorf("open chunk snapshot: %w", err) } // Ensures that an early return caused by an error doesn't leave any tmp files. defer func() { cp.Close() os.RemoveAll(cpdirtmp) }() var ( buf []byte recs [][]byte ) // Add all series to the snapshot. stripeSize := h.series.size for i := 0; i < stripeSize; i++ { h.series.locks[i].RLock() for _, s := range h.series.series[i] { start := len(buf) buf = s.encodeToSnapshotRecord(buf) if len(buf[start:]) == 0 { continue // All contents discarded. } recs = append(recs, buf[start:]) // Flush records in 10 MB increments. if len(buf) > 10*1024*1024 { if err := cp.Log(recs...); err != nil { h.series.locks[i].RUnlock() return stats, fmt.Errorf("flush records: %w", err) } buf, recs = buf[:0], recs[:0] } } stats.TotalSeries += len(h.series.series[i]) h.series.locks[i].RUnlock() } // Add tombstones to the snapshot. tombstonesReader, err := h.Tombstones() if err != nil { return stats, fmt.Errorf("get tombstones: %w", err) } rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader) if err != nil { return stats, fmt.Errorf("encode tombstones: %w", err) } recs = append(recs, rec) // Flush remaining series records and tombstones. if err := cp.Log(recs...); err != nil { return stats, fmt.Errorf("flush records: %w", err) } buf = buf[:0] // Add exemplars in the snapshot. // We log in batches, with each record having upto 10000 exemplars. // Assuming 100 bytes (overestimate) per exemplar, that's ~1MB. maxExemplarsPerRecord := 10000 batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord) enc := record.Encoder{} flushExemplars := func() error { if len(batch) == 0 { return nil } buf = buf[:0] encbuf := encoding.Encbuf{B: buf} encbuf.PutByte(chunkSnapshotRecordTypeExemplars) enc.EncodeExemplarsIntoBuffer(batch, &encbuf) if err := cp.Log(encbuf.Get()); err != nil { return fmt.Errorf("log exemplars: %w", err) } buf, batch = buf[:0], batch[:0] return nil } err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error { if len(batch) >= maxExemplarsPerRecord { if err := flushExemplars(); err != nil { return fmt.Errorf("flush exemplars: %w", err) } } ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels) if ms == nil { // It is possible that exemplar refers to some old series. We discard such exemplars. return nil } batch = append(batch, record.RefExemplar{ Ref: ms.ref, T: e.Ts, V: e.Value, Labels: e.Labels, }) return nil }) if err != nil { return stats, fmt.Errorf("iterate exemplars: %w", err) } // Flush remaining exemplars. if err := flushExemplars(); err != nil { return stats, fmt.Errorf("flush exemplars at the end: %w", err) } if err := cp.Close(); err != nil { return stats, fmt.Errorf("close chunk snapshot: %w", err) } if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { return stats, fmt.Errorf("rename chunk snapshot directory: %w", err) } if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil { // Leftover old chunk snapshots do not cause problems down the line beyond // occupying disk space. // They will just be ignored since a higher chunk snapshot exists. level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err) } return stats, nil } func chunkSnapshotDir(wlast, woffset int) string { return fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset) } func (h *Head) performChunkSnapshot() error { level.Info(h.logger).Log("msg", "creating chunk snapshot") startTime := time.Now() stats, err := h.ChunkSnapshot() elapsed := time.Since(startTime) if err == nil { level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir) } if err != nil { return fmt.Errorf("chunk snapshot: %w", err) } return nil } // ChunkSnapshotStats returns stats about a created chunk snapshot. type ChunkSnapshotStats struct { TotalSeries int Dir string } // LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. // If dir does not contain any chunk snapshots, ErrNotFound is returned. func LastChunkSnapshot(dir string) (string, int, int, error) { files, err := os.ReadDir(dir) if err != nil { return "", 0, 0, err } maxIdx, maxOffset := -1, -1 maxFileName := "" for i := 0; i < len(files); i++ { fi := files[i] if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { continue } if !fi.IsDir() { return "", 0, 0, fmt.Errorf("chunk snapshot %s is not a directory", fi.Name()) } splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") if len(splits) != 2 { // Chunk snapshots is not in the right format, we do not care about it. continue } idx, err := strconv.Atoi(splits[0]) if err != nil { continue } offset, err := strconv.Atoi(splits[1]) if err != nil { continue } if idx > maxIdx || (idx == maxIdx && offset > maxOffset) { maxIdx, maxOffset = idx, offset maxFileName = filepath.Join(dir, fi.Name()) } } if maxFileName == "" { return "", 0, 0, record.ErrNotFound } return maxFileName, maxIdx, maxOffset, nil } // DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index. func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error { files, err := os.ReadDir(dir) if err != nil { return err } errs := tsdb_errors.NewMulti() for _, fi := range files { if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { continue } splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") if len(splits) != 2 { continue } idx, err := strconv.Atoi(splits[0]) if err != nil { continue } offset, err := strconv.Atoi(splits[1]) if err != nil { continue } if idx < maxIndex || (idx == maxIndex && offset < maxOffset) { if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { errs.Add(err) } } } return errs.Err() } // loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned, // it is the responsibility of the caller to clear the contents of the Head. func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSeries, error) { dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) if err != nil { if errors.Is(err, record.ErrNotFound) { return snapIdx, snapOffset, nil, nil } return snapIdx, snapOffset, nil, fmt.Errorf("find last chunk snapshot: %w", err) } start := time.Now() sr, err := wlog.NewSegmentsReader(dir) if err != nil { return snapIdx, snapOffset, nil, fmt.Errorf("open chunk snapshot: %w", err) } defer func() { if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) } }() var ( numSeries = 0 unknownRefs = int64(0) concurrency = h.opts.WALReplayConcurrency wg sync.WaitGroup recordChan = make(chan chunkSnapshotRecord, 5*concurrency) shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, concurrency) errChan = make(chan error, concurrency) refSeries map[chunks.HeadSeriesRef]*memSeries exemplarBuf []record.RefExemplar syms = labels.NewSymbolTable() // New table for the whole snapshot. dec = record.NewDecoder(syms) ) wg.Add(concurrency) for i := 0; i < concurrency; i++ { go func(idx int, rc <-chan chunkSnapshotRecord) { defer wg.Done() defer func() { // If there was an error, drain the channel // to unblock the main thread. for range rc { } }() shardedRefSeries[idx] = make(map[chunks.HeadSeriesRef]*memSeries) localRefSeries := shardedRefSeries[idx] for csr := range rc { series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset) if err != nil { errChan <- err return } localRefSeries[csr.ref] = series for { seriesID := uint64(series.ref) lastSeriesID := h.lastSeriesID.Load() if lastSeriesID >= seriesID || h.lastSeriesID.CompareAndSwap(lastSeriesID, seriesID) { break } } if csr.mc == nil { continue } series.nextAt = csr.mc.maxTime // This will create a new chunk on append. series.headChunks = csr.mc series.lastValue = csr.lastValue series.lastHistogramValue = csr.lastHistogramValue series.lastFloatHistogramValue = csr.lastFloatHistogramValue app, err := series.headChunks.chunk.Appender() if err != nil { errChan <- err return } series.app = app h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime) } }(i, recordChan) } r := wlog.NewReader(sr) var loopErr error Outer: for r.Next() { select { case err := <-errChan: errChan <- err break Outer default: } rec := r.Record() switch rec[0] { case chunkSnapshotRecordTypeSeries: numSeries++ csr, err := decodeSeriesFromChunkSnapshot(&dec, rec) if err != nil { loopErr = fmt.Errorf("decode series record: %w", err) break Outer } recordChan <- csr case chunkSnapshotRecordTypeTombstones: tr, err := decodeTombstonesSnapshotRecord(rec) if err != nil { loopErr = fmt.Errorf("decode tombstones: %w", err) break Outer } if err = tr.Iter(func(ref storage.SeriesRef, ivs tombstones.Intervals) error { h.tombstones.AddInterval(ref, ivs...) return nil }); err != nil { loopErr = fmt.Errorf("iterate tombstones: %w", err) break Outer } case chunkSnapshotRecordTypeExemplars: // Exemplars are at the end of snapshot. So all series are loaded at this point. if len(refSeries) == 0 { close(recordChan) wg.Wait() refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries) for _, shard := range shardedRefSeries { for k, v := range shard { refSeries[k] = v } } } if !h.opts.EnableExemplarStorage || h.opts.MaxExemplars.Load() <= 0 { // Exemplar storage is disabled. continue Outer } decbuf := encoding.Decbuf{B: rec[1:]} exemplarBuf = exemplarBuf[:0] exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf) if err != nil { loopErr = fmt.Errorf("exemplars from buffer: %w", err) break Outer } for _, e := range exemplarBuf { ms, ok := refSeries[e.Ref] if !ok { unknownRefs++ continue } if err := h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{ Labels: e.Labels, Value: e.V, Ts: e.T, }); err != nil { loopErr = fmt.Errorf("add exemplar: %w", err) break Outer } } default: // This is a record type we don't understand. It is either an old format from earlier versions, // or a new format and the code was rolled back to old version. loopErr = fmt.Errorf("unsupported snapshot record type 0b%b", rec[0]) break Outer } } if len(refSeries) == 0 { close(recordChan) wg.Wait() } close(errChan) merr := tsdb_errors.NewMulti() if loopErr != nil { merr.Add(fmt.Errorf("decode loop: %w", loopErr)) } for err := range errChan { merr.Add(fmt.Errorf("record processing: %w", err)) } if err := merr.Err(); err != nil { return -1, -1, nil, err } if err := r.Err(); err != nil { return -1, -1, nil, fmt.Errorf("read records: %w", err) } if len(refSeries) == 0 { // We had no exemplar record, so we have to build the map here. refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries) for _, shard := range shardedRefSeries { for k, v := range shard { refSeries[k] = v } } } elapsed := time.Since(start) level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String()) if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs) } return snapIdx, snapOffset, refSeries, nil }