// Copyright 2021 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tsdb import ( "fmt" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/encoding" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "io/ioutil" "math" "os" "path/filepath" "runtime" "strconv" "strings" "sync" "time" "github.com/go-kit/log/level" "github.com/pkg/errors" "go.uber.org/atomic" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/wal" ) func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 var unknownExemplarRefs atomic.Uint64 // Track number of series records that had overlapping m-map chunks. var mmapOverlappingChunks uint64 // Start workers that each process samples for a partition of the series ID space. // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( wg sync.WaitGroup n = runtime.GOMAXPROCS(0) inputs = make([]chan []record.RefSample, n) outputs = make([]chan []record.RefSample, n) exemplarsInput chan record.RefExemplar dec record.Decoder shards = make([][]record.RefSample, n) decoded = make(chan interface{}, 10) decodeErr, seriesCreationErr error seriesPool = sync.Pool{ New: func() interface{} { return []record.RefSeries{} }, } samplesPool = sync.Pool{ New: func() interface{} { return []record.RefSample{} }, } tstonesPool = sync.Pool{ New: func() interface{} { return []tombstones.Stone{} }, } exemplarsPool = sync.Pool{ New: func() interface{} { return []record.RefExemplar{} }, } ) defer func() { // For CorruptionErr ensure to terminate all workers before exiting. _, ok := err.(*wal.CorruptionErr) if ok || seriesCreationErr != nil { for i := 0; i < n; i++ { close(inputs[i]) for range outputs[i] { } } close(exemplarsInput) wg.Wait() } }() wg.Add(n) for i := 0; i < n; i++ { outputs[i] = make(chan []record.RefSample, 300) inputs[i] = make(chan []record.RefSample, 300) go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { unknown := h.processWALSamples(h.minValidTime.Load(), input, output) unknownRefs.Add(unknown) wg.Done() }(inputs[i], outputs[i]) } wg.Add(1) exemplarsInput = make(chan record.RefExemplar, 300) go func(input <-chan record.RefExemplar) { var err error defer wg.Done() for e := range input { ms := h.series.getByID(e.Ref) if ms == nil { unknownExemplarRefs.Inc() continue } if e.T < h.minValidTime.Load() { continue } // At the moment the only possible error here is out of order exemplars, which we shouldn't see when // replaying the WAL, so lets just log the error if it's not that type. err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) if err != nil && err == storage.ErrOutOfOrderExemplar { level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err) } } }(exemplarsInput) go func() { defer close(decoded) for r.Next() { rec := r.Record() switch dec.Type(rec) { case record.Series: series := seriesPool.Get().([]record.RefSeries)[:0] series, err = dec.Series(rec, series) if err != nil { decodeErr = &wal.CorruptionErr{ Err: errors.Wrap(err, "decode series"), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- series case record.Samples: samples := samplesPool.Get().([]record.RefSample)[:0] samples, err = dec.Samples(rec, samples) if err != nil { decodeErr = &wal.CorruptionErr{ Err: errors.Wrap(err, "decode samples"), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- samples case record.Tombstones: tstones := tstonesPool.Get().([]tombstones.Stone)[:0] tstones, err = dec.Tombstones(rec, tstones) if err != nil { decodeErr = &wal.CorruptionErr{ Err: errors.Wrap(err, "decode tombstones"), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- tstones case record.Exemplars: exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0] exemplars, err = dec.Exemplars(rec, exemplars) if err != nil { decodeErr = &wal.CorruptionErr{ Err: errors.Wrap(err, "decode exemplars"), Segment: r.Segment(), Offset: r.Offset(), } return } decoded <- exemplars default: // Noop. } } }() // The records are always replayed from the oldest to the newest. Outer: for d := range decoded { switch v := d.(type) { case []record.RefSeries: for _, walSeries := range v { mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels) if err != nil { seriesCreationErr = err break Outer } if h.lastSeriesID.Load() < walSeries.Ref { h.lastSeriesID.Store(walSeries.Ref) } mmc := mmappedChunks[walSeries.Ref] if created { // This is the first WAL series record for this series. h.setMMappedChunks(mSeries, mmc) continue } // There's already a different ref for this series. // A duplicate series record is only possible when the old samples were already compacted into a block. // Hence we can discard all the samples and m-mapped chunks replayed till now for this series. multiRef[walSeries.Ref] = mSeries.ref idx := mSeries.ref % uint64(n) // It is possible that some old sample is being processed in processWALSamples that // could cause race below. So we wait for the goroutine to empty input the buffer and finish // processing all old samples after emptying the buffer. select { case <-outputs[idx]: // allow output side to drain to avoid deadlock default: } inputs[idx] <- []record.RefSample{} for len(inputs[idx]) != 0 { time.Sleep(1 * time.Millisecond) select { case <-outputs[idx]: // allow output side to drain to avoid deadlock default: } } // Checking if the new m-mapped chunks overlap with the already existing ones. if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { if overlapsClosedInterval( mSeries.mmappedChunks[0].minTime, mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, mmc[0].minTime, mmc[len(mmc)-1].maxTime, ) { mmapOverlappingChunks++ level.Debug(h.logger).Log( "msg", "M-mapped chunks overlap on a duplicate series record", "series", mSeries.lset.String(), "oldref", mSeries.ref, "oldmint", mSeries.mmappedChunks[0].minTime, "oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, "newref", walSeries.Ref, "newmint", mmc[0].minTime, "newmaxt", mmc[len(mmc)-1].maxTime, ) } } // Replacing m-mapped chunks with the new ones (could be empty). h.setMMappedChunks(mSeries, mmc) // Any samples replayed till now would already be compacted. Resetting the head chunk. mSeries.nextAt = 0 mSeries.headChunk = nil mSeries.app = nil } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. seriesPool.Put(v) case []record.RefSample: samples := v // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { m := 5000 if len(samples) < m { m = len(samples) } for i := 0; i < n; i++ { var buf []record.RefSample select { case buf = <-outputs[i]: default: } shards[i] = buf[:0] } for _, sam := range samples[:m] { if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } mod := sam.Ref % uint64(n) shards[mod] = append(shards[mod], sam) } for i := 0; i < n; i++ { inputs[i] <- shards[i] } samples = samples[m:] } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. samplesPool.Put(v) case []tombstones.Stone: for _, s := range v { for _, itv := range s.Intervals { if itv.Maxt < h.minValidTime.Load() { continue } if m := h.series.getByID(s.Ref); m == nil { unknownRefs.Inc() continue } h.tombstones.AddInterval(s.Ref, itv) } } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. tstonesPool.Put(v) case []record.RefExemplar: for _, e := range v { exemplarsInput <- e } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. exemplarsPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } } if decodeErr != nil { return decodeErr } if seriesCreationErr != nil { // Drain the channel to unblock the goroutine. for range decoded { } return seriesCreationErr } // Signal termination to each worker and wait for it to close its output channel. for i := 0; i < n; i++ { close(inputs[i]) for range outputs[i] { } } close(exemplarsInput) wg.Wait() if r.Err() != nil { return errors.Wrap(r.Err(), "read records") } if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 { level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load()) } if mmapOverlappingChunks > 0 { level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", mmapOverlappingChunks) } return nil } func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { h.metrics.chunksCreated.Add(float64(len(mmc))) h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) mSeries.mmappedChunks = mmc // Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject. if len(mmc) == 0 { mSeries.mmMaxTime = math.MinInt64 } else { mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) } } // processWALSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. // Samples before the minValidTime timestamp are discarded. func (h *Head) processWALSamples( minValidTime int64, input <-chan []record.RefSample, output chan<- []record.RefSample, ) (unknownRefs uint64) { defer close(output) mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range input { for _, s := range samples { if s.T < minValidTime { continue } ms := h.series.getByID(s.Ref) if ms == nil { unknownRefs++ continue } if s.T <= ms.mmMaxTime { continue } if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } if s.T > maxt { maxt = s.T } if s.T < mint { mint = s.T } } output <- samples } h.updateMinMaxTime(mint, maxt) return unknownRefs } const ( chunkSnapshotRecordTypeSeries uint8 = 1 chunkSnapshotRecordTypeTombstones uint8 = 2 chunkSnapshotRecordTypeExemplars uint8 = 3 ) type chunkSnapshotRecord struct { ref uint64 lset labels.Labels chunkRange int64 mc *memChunk sampleBuf [4]sample } func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf := encoding.Encbuf{B: b} buf.PutByte(chunkSnapshotRecordTypeSeries) buf.PutBE64(s.ref) buf.PutUvarint(len(s.lset)) for _, l := range s.lset { buf.PutUvarintStr(l.Name) buf.PutUvarintStr(l.Value) } buf.PutBE64int64(s.chunkRange) s.Lock() if s.headChunk == nil { buf.PutUvarint(0) } else { buf.PutUvarint(1) buf.PutBE64int64(s.headChunk.minTime) buf.PutBE64int64(s.headChunk.maxTime) buf.PutByte(byte(s.headChunk.chunk.Encoding())) buf.PutUvarintBytes(s.headChunk.chunk.Bytes()) // Put the sample buf. for _, smpl := range s.sampleBuf { buf.PutBE64int64(smpl.t) buf.PutBEFloat64(smpl.v) } } s.Unlock() return buf.Get() } func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error) { dec := encoding.Decbuf{B: b} if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries { return csr, errors.Errorf("invalid record type %x", flag) } csr.ref = dec.Be64() // The label set written to the disk is already sorted. csr.lset = make(labels.Labels, dec.Uvarint()) for i := range csr.lset { csr.lset[i].Name = dec.UvarintStr() csr.lset[i].Value = dec.UvarintStr() } csr.chunkRange = dec.Be64int64() if dec.Uvarint() == 0 { return } csr.mc = &memChunk{} csr.mc.minTime = dec.Be64int64() csr.mc.maxTime = dec.Be64int64() enc := chunkenc.Encoding(dec.Byte()) // The underlying bytes gets re-used later, so make a copy. chunkBytes := dec.UvarintBytes() chunkBytesCopy := make([]byte, len(chunkBytes)) copy(chunkBytesCopy, chunkBytes) chk, err := chunkenc.FromData(enc, chunkBytesCopy) if err != nil { return csr, errors.Wrap(err, "chunk from data") } csr.mc.chunk = chk for i := range csr.sampleBuf { csr.sampleBuf[i].t = dec.Be64int64() csr.sampleBuf[i].v = dec.Be64Float64() } err = dec.Err() if err != nil && len(dec.B) > 0 { err = errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) } return } func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) { buf := encoding.Encbuf{} buf.PutByte(chunkSnapshotRecordTypeTombstones) b, err := tombstones.Encode(tr) if err != nil { return nil, errors.Wrap(err, "encode tombstones") } buf.PutUvarintBytes(b) return buf.Get(), nil } func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) { dec := encoding.Decbuf{B: b} if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones { return nil, errors.Errorf("invalid record type %x", flag) } tr, err := tombstones.Decode(dec.UvarintBytes()) return tr, errors.Wrap(err, "decode tombstones") } const chunkSnapshotPrefix = "chunk_snapshot." // ChunkSnapshot creates a snapshot of all the series and tombstones in the head. // It deletes the old chunk snapshots if the chunk snapshot creation is successful. // // The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written // using the WAL package. N is the last WAL segment present during snapshotting and // M is the offset in segment N upto which data was written. // // The snapshot first contains all series (each in individual records and not sorted), followed by // tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they // were written to the circular buffer. func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { if h.wal == nil { // If we are not storing any WAL, does not make sense to take a snapshot too. level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled") return &ChunkSnapshotStats{}, nil } h.chunkSnapshotMtx.Lock() defer h.chunkSnapshotMtx.Unlock() stats := &ChunkSnapshotStats{} wlast, woffset, err := h.wal.LastSegmentAndOffset() if err != nil && err != record.ErrNotFound { return stats, errors.Wrap(err, "get last wal segment and offset") } _, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) if err != nil && err != record.ErrNotFound { return stats, errors.Wrap(err, "find last chunk snapshot") } if wlast == cslast && woffset == csoffset { // Nothing has been written to the WAL/Head since the last snapshot. return stats, nil } snapshotName := chunkSnapshotDir(wlast, woffset) cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName) cpdirtmp := cpdir + ".tmp" stats.Dir = cpdir if err := os.MkdirAll(cpdirtmp, 0777); err != nil { return stats, errors.Wrap(err, "create chunk snapshot dir") } cp, err := wal.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled()) if err != nil { return stats, errors.Wrap(err, "open chunk snapshot") } // Ensures that an early return caused by an error doesn't leave any tmp files. defer func() { cp.Close() os.RemoveAll(cpdirtmp) }() var ( buf []byte recs [][]byte ) // Add all series to the snapshot. stripeSize := h.series.size for i := 0; i < stripeSize; i++ { h.series.locks[i].RLock() for _, s := range h.series.series[i] { start := len(buf) buf = s.encodeToSnapshotRecord(buf) if len(buf[start:]) == 0 { continue // All contents discarded. } recs = append(recs, buf[start:]) // Flush records in 10 MB increments. if len(buf) > 10*1024*1024 { if err := cp.Log(recs...); err != nil { h.series.locks[i].RUnlock() return stats, errors.Wrap(err, "flush records") } buf, recs = buf[:0], recs[:0] } } stats.TotalSeries += len(h.series.series[i]) h.series.locks[i].RUnlock() } // Add tombstones to the snapshot. tombstonesReader, err := h.Tombstones() if err != nil { return stats, errors.Wrap(err, "get tombstones") } rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader) if err != nil { return stats, errors.Wrap(err, "encode tombstones") } recs = append(recs, rec) // Flush remaining series records and tombstones. if err := cp.Log(recs...); err != nil { return stats, errors.Wrap(err, "flush records") } buf = buf[:0] // Add exemplars in the snapshot. // We log in batches, with each record having upto 10000 exemplars. // Assuming 100 bytes (overestimate) per exemplar, that's ~1MB. maxExemplarsPerRecord := 10000 batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord) enc := record.Encoder{} flushExemplars := func() error { if len(batch) == 0 { return nil } buf = buf[:0] encbuf := encoding.Encbuf{B: buf} encbuf.PutByte(chunkSnapshotRecordTypeExemplars) enc.EncodeExemplarsIntoBuffer(batch, &encbuf) if err := cp.Log(encbuf.Get()); err != nil { return errors.Wrap(err, "log exemplars") } buf, batch = buf[:0], batch[:0] return nil } err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error { if len(batch) >= maxExemplarsPerRecord { if err := flushExemplars(); err != nil { return errors.Wrap(err, "flush exemplars") } } ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels) if ms == nil { // It is possible that exemplar refers to some old series. We discard such exemplars. return nil } batch = append(batch, record.RefExemplar{ Ref: ms.ref, T: e.Ts, V: e.Value, Labels: e.Labels, }) return nil }) if err != nil { return stats, errors.Wrap(err, "iterate exemplars") } // Flush remaining exemplars. if err := flushExemplars(); err != nil { return stats, errors.Wrap(err, "flush exemplars at the end") } if err := cp.Close(); err != nil { return stats, errors.Wrap(err, "close chunk snapshot") } if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { return stats, errors.Wrap(err, "rename chunk snapshot directory") } if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil { // Leftover old chunk snapshots do not cause problems down the line beyond // occupying disk space. // They will just be ignored since a higher chunk snapshot exists. level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err) } return stats, nil } func chunkSnapshotDir(wlast, woffset int) string { return fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset) } func (h *Head) performChunkSnapshot() error { level.Info(h.logger).Log("msg", "creating chunk snapshot") startTime := time.Now() stats, err := h.ChunkSnapshot() elapsed := time.Since(startTime) if err == nil { level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir) } return errors.Wrap(err, "chunk snapshot") } // ChunkSnapshotStats returns stats about a created chunk snapshot. type ChunkSnapshotStats struct { TotalSeries int Dir string } // LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. // If dir does not contain any chunk snapshots, ErrNotFound is returned. func LastChunkSnapshot(dir string) (string, int, int, error) { files, err := ioutil.ReadDir(dir) if err != nil { return "", 0, 0, err } maxIdx, maxOffset := -1, -1 maxFileName := "" for i := 0; i < len(files); i++ { fi := files[i] if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { continue } if !fi.IsDir() { return "", 0, 0, errors.Errorf("chunk snapshot %s is not a directory", fi.Name()) } splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") if len(splits) != 2 { // Chunk snapshots is not in the right format, we do not care about it. continue } idx, err := strconv.Atoi(splits[0]) if err != nil { continue } offset, err := strconv.Atoi(splits[1]) if err != nil { continue } if idx > maxIdx || (idx == maxIdx && offset > maxOffset) { maxIdx, maxOffset = idx, offset maxFileName = filepath.Join(dir, fi.Name()) } } if maxFileName == "" { return "", 0, 0, record.ErrNotFound } return maxFileName, maxIdx, maxOffset, nil } // DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index. func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error { files, err := ioutil.ReadDir(dir) if err != nil { return err } errs := tsdb_errors.NewMulti() for _, fi := range files { if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { continue } splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") if len(splits) != 2 { continue } idx, err := strconv.Atoi(splits[0]) if err != nil { continue } offset, err := strconv.Atoi(splits[1]) if err != nil { continue } if idx < maxIndex || (idx == maxIndex && offset < maxOffset) { if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { errs.Add(err) } } } return errs.Err() } // loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned, // it is the responsibility of the caller to clear the contents of the Head. func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) { dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) if err != nil { if err == record.ErrNotFound { return snapIdx, snapOffset, nil, nil } return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot") } start := time.Now() sr, err := wal.NewSegmentsReader(dir) if err != nil { return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot") } defer func() { if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) } }() var ( numSeries = 0 unknownRefs = int64(0) n = runtime.GOMAXPROCS(0) wg sync.WaitGroup recordChan = make(chan chunkSnapshotRecord, 5*n) shardedRefSeries = make([]map[uint64]*memSeries, n) errChan = make(chan error, n) refSeries map[uint64]*memSeries exemplarBuf []record.RefExemplar dec record.Decoder ) wg.Add(n) for i := 0; i < n; i++ { go func(idx int, rc <-chan chunkSnapshotRecord) { defer wg.Done() defer func() { // If there was an error, drain the channel // to unblock the main thread. for range rc { } }() shardedRefSeries[idx] = make(map[uint64]*memSeries) localRefSeries := shardedRefSeries[idx] for csr := range rc { series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset) if err != nil { errChan <- err return } localRefSeries[csr.ref] = series if h.lastSeriesID.Load() < series.ref { h.lastSeriesID.Store(series.ref) } series.chunkRange = csr.chunkRange if csr.mc == nil { continue } series.nextAt = csr.mc.maxTime // This will create a new chunk on append. series.headChunk = csr.mc for i := range series.sampleBuf { series.sampleBuf[i].t = csr.sampleBuf[i].t series.sampleBuf[i].v = csr.sampleBuf[i].v } app, err := series.headChunk.chunk.Appender() if err != nil { errChan <- err return } series.app = app h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime) } }(i, recordChan) } r := wal.NewReader(sr) var loopErr error Outer: for r.Next() { select { case err := <-errChan: errChan <- err break Outer default: } rec := r.Record() switch rec[0] { case chunkSnapshotRecordTypeSeries: numSeries++ csr, err := decodeSeriesFromChunkSnapshot(rec) if err != nil { loopErr = errors.Wrap(err, "decode series record") break Outer } recordChan <- csr case chunkSnapshotRecordTypeTombstones: tr, err := decodeTombstonesSnapshotRecord(rec) if err != nil { loopErr = errors.Wrap(err, "decode tombstones") break Outer } if err = tr.Iter(func(ref uint64, ivs tombstones.Intervals) error { h.tombstones.AddInterval(ref, ivs...) return nil }); err != nil { loopErr = errors.Wrap(err, "iterate tombstones") break Outer } case chunkSnapshotRecordTypeExemplars: // Exemplars are at the end of snapshot. So all series are loaded at this point. if len(refSeries) == 0 { close(recordChan) wg.Wait() refSeries = make(map[uint64]*memSeries, numSeries) for _, shard := range shardedRefSeries { for k, v := range shard { refSeries[k] = v } } } if !h.opts.EnableExemplarStorage || h.opts.MaxExemplars.Load() <= 0 { // Exemplar storage is disabled. continue Outer } decbuf := encoding.Decbuf{B: rec[1:]} exemplarBuf = exemplarBuf[:0] exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf) if err != nil { loopErr = errors.Wrap(err, "exemplars from buffer") break Outer } for _, e := range exemplarBuf { ms, ok := refSeries[e.Ref] if !ok { unknownRefs++ continue } if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{ Labels: e.Labels, Value: e.V, Ts: e.T, }); err != nil { loopErr = errors.Wrap(err, "add exemplar") break Outer } } default: // This is a record type we don't understand. It is either and old format from earlier versions, // or a new format and the code was rolled back to old version. loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0]) break Outer } } if len(refSeries) == 0 { close(recordChan) wg.Wait() } close(errChan) merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop")) for err := range errChan { merr.Add(errors.Wrap(err, "record processing")) } if err := merr.Err(); err != nil { return -1, -1, nil, err } if r.Err() != nil { return -1, -1, nil, errors.Wrap(r.Err(), "read records") } if len(refSeries) == 0 { // We had no exemplar record, so we have to build the map here. refSeries = make(map[uint64]*memSeries, numSeries) for _, shard := range shardedRefSeries { for k, v := range shard { refSeries[k] = v } } } elapsed := time.Since(start) level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String()) if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs) } return snapIdx, snapOffset, refSeries, nil }