mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-15 07:47:31 -08:00
c9b85afd93
Adds `WALReplayConcurrency` as an option on tsdb `Options` and `HeadOptions`. If it is not set or set <=0, then `GOMAXPROCS` is used, which matches the previous behaviour. Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
1566 lines
43 KiB
Go
1566 lines
43 KiB
Go
// Copyright 2021 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tsdb
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-kit/log/level"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/metadata"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/encoding"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
|
)
|
|
|
|
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
|
|
// to simplify the WAL replay.
|
|
type histogramRecord struct {
|
|
ref chunks.HeadSeriesRef
|
|
t int64
|
|
h *histogram.Histogram
|
|
fh *histogram.FloatHistogram
|
|
}
|
|
|
|
func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
|
|
// Track number of samples that referenced a series we don't know about
|
|
// for error reporting.
|
|
var unknownRefs atomic.Uint64
|
|
var unknownExemplarRefs atomic.Uint64
|
|
var unknownHistogramRefs atomic.Uint64
|
|
var unknownMetadataRefs atomic.Uint64
|
|
// Track number of series records that had overlapping m-map chunks.
|
|
var mmapOverlappingChunks atomic.Uint64
|
|
|
|
// Start workers that each process samples for a partition of the series ID space.
|
|
var (
|
|
wg sync.WaitGroup
|
|
concurrency = h.opts.WALReplayConcurrency
|
|
processors = make([]walSubsetProcessor, concurrency)
|
|
exemplarsInput chan record.RefExemplar
|
|
|
|
dec record.Decoder
|
|
shards = make([][]record.RefSample, concurrency)
|
|
histogramShards = make([][]histogramRecord, concurrency)
|
|
|
|
decoded = make(chan interface{}, 10)
|
|
decodeErr, seriesCreationErr error
|
|
seriesPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefSeries{}
|
|
},
|
|
}
|
|
samplesPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefSample{}
|
|
},
|
|
}
|
|
tstonesPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []tombstones.Stone{}
|
|
},
|
|
}
|
|
exemplarsPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefExemplar{}
|
|
},
|
|
}
|
|
histogramsPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefHistogramSample{}
|
|
},
|
|
}
|
|
floatHistogramsPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefFloatHistogramSample{}
|
|
},
|
|
}
|
|
metadataPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefMetadata{}
|
|
},
|
|
}
|
|
)
|
|
|
|
defer func() {
|
|
// For CorruptionErr ensure to terminate all workers before exiting.
|
|
_, ok := err.(*wlog.CorruptionErr)
|
|
if ok || seriesCreationErr != nil {
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].closeAndDrain()
|
|
}
|
|
close(exemplarsInput)
|
|
wg.Wait()
|
|
}
|
|
}()
|
|
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].setup()
|
|
|
|
go func(wp *walSubsetProcessor) {
|
|
unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
|
|
unknownRefs.Add(unknown)
|
|
mmapOverlappingChunks.Add(overlapping)
|
|
unknownHistogramRefs.Add(unknownHistograms)
|
|
wg.Done()
|
|
}(&processors[i])
|
|
}
|
|
|
|
wg.Add(1)
|
|
exemplarsInput = make(chan record.RefExemplar, 300)
|
|
go func(input <-chan record.RefExemplar) {
|
|
var err error
|
|
defer wg.Done()
|
|
for e := range input {
|
|
ms := h.series.getByID(e.Ref)
|
|
if ms == nil {
|
|
unknownExemplarRefs.Inc()
|
|
continue
|
|
}
|
|
|
|
if e.T < h.minValidTime.Load() {
|
|
continue
|
|
}
|
|
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
|
|
// replaying the WAL, so lets just log the error if it's not that type.
|
|
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
|
|
if err != nil && err == storage.ErrOutOfOrderExemplar {
|
|
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
|
|
}
|
|
}
|
|
}(exemplarsInput)
|
|
|
|
go func() {
|
|
defer close(decoded)
|
|
var err error
|
|
for r.Next() {
|
|
rec := r.Record()
|
|
switch dec.Type(rec) {
|
|
case record.Series:
|
|
series := seriesPool.Get().([]record.RefSeries)[:0]
|
|
series, err = dec.Series(rec, series)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode series"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- series
|
|
case record.Samples:
|
|
samples := samplesPool.Get().([]record.RefSample)[:0]
|
|
samples, err = dec.Samples(rec, samples)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode samples"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- samples
|
|
case record.Tombstones:
|
|
tstones := tstonesPool.Get().([]tombstones.Stone)[:0]
|
|
tstones, err = dec.Tombstones(rec, tstones)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode tombstones"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- tstones
|
|
case record.Exemplars:
|
|
exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0]
|
|
exemplars, err = dec.Exemplars(rec, exemplars)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode exemplars"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- exemplars
|
|
case record.HistogramSamples:
|
|
hists := histogramsPool.Get().([]record.RefHistogramSample)[:0]
|
|
hists, err = dec.HistogramSamples(rec, hists)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode histograms"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- hists
|
|
case record.FloatHistogramSamples:
|
|
hists := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0]
|
|
hists, err = dec.FloatHistogramSamples(rec, hists)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode float histograms"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- hists
|
|
case record.Metadata:
|
|
meta := metadataPool.Get().([]record.RefMetadata)[:0]
|
|
meta, err := dec.Metadata(rec, meta)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode metadata"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- meta
|
|
default:
|
|
// Noop.
|
|
}
|
|
}
|
|
}()
|
|
|
|
// The records are always replayed from the oldest to the newest.
|
|
Outer:
|
|
for d := range decoded {
|
|
switch v := d.(type) {
|
|
case []record.RefSeries:
|
|
for _, walSeries := range v {
|
|
mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels)
|
|
if err != nil {
|
|
seriesCreationErr = err
|
|
break Outer
|
|
}
|
|
|
|
if chunks.HeadSeriesRef(h.lastSeriesID.Load()) < walSeries.Ref {
|
|
h.lastSeriesID.Store(uint64(walSeries.Ref))
|
|
}
|
|
if !created {
|
|
multiRef[walSeries.Ref] = mSeries.ref
|
|
}
|
|
|
|
idx := uint64(mSeries.ref) % uint64(concurrency)
|
|
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
|
|
}
|
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
seriesPool.Put(v)
|
|
case []record.RefSample:
|
|
samples := v
|
|
minValidTime := h.minValidTime.Load()
|
|
// We split up the samples into chunks of 5000 samples or less.
|
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
|
// cause thousands of very large in flight buffers occupying large amounts
|
|
// of unused memory.
|
|
for len(samples) > 0 {
|
|
m := 5000
|
|
if len(samples) < m {
|
|
m = len(samples)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if shards[i] == nil {
|
|
shards[i] = processors[i].reuseBuf()
|
|
}
|
|
}
|
|
for _, sam := range samples[:m] {
|
|
if sam.T < minValidTime {
|
|
continue // Before minValidTime: discard.
|
|
}
|
|
if r, ok := multiRef[sam.Ref]; ok {
|
|
sam.Ref = r
|
|
}
|
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
|
shards[mod] = append(shards[mod], sam)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if len(shards[i]) > 0 {
|
|
processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]}
|
|
shards[i] = nil
|
|
}
|
|
}
|
|
samples = samples[m:]
|
|
}
|
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
samplesPool.Put(v)
|
|
case []tombstones.Stone:
|
|
for _, s := range v {
|
|
for _, itv := range s.Intervals {
|
|
if itv.Maxt < h.minValidTime.Load() {
|
|
continue
|
|
}
|
|
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
|
|
unknownRefs.Inc()
|
|
continue
|
|
}
|
|
h.tombstones.AddInterval(storage.SeriesRef(s.Ref), itv)
|
|
}
|
|
}
|
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
tstonesPool.Put(v)
|
|
case []record.RefExemplar:
|
|
for _, e := range v {
|
|
exemplarsInput <- e
|
|
}
|
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
exemplarsPool.Put(v)
|
|
case []record.RefHistogramSample:
|
|
samples := v
|
|
minValidTime := h.minValidTime.Load()
|
|
// We split up the samples into chunks of 5000 samples or less.
|
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
|
// cause thousands of very large in flight buffers occupying large amounts
|
|
// of unused memory.
|
|
for len(samples) > 0 {
|
|
m := 5000
|
|
if len(samples) < m {
|
|
m = len(samples)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if histogramShards[i] == nil {
|
|
histogramShards[i] = processors[i].reuseHistogramBuf()
|
|
}
|
|
}
|
|
for _, sam := range samples[:m] {
|
|
if sam.T < minValidTime {
|
|
continue // Before minValidTime: discard.
|
|
}
|
|
if r, ok := multiRef[sam.Ref]; ok {
|
|
sam.Ref = r
|
|
}
|
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
|
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if len(histogramShards[i]) > 0 {
|
|
processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
|
|
histogramShards[i] = nil
|
|
}
|
|
}
|
|
samples = samples[m:]
|
|
}
|
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
histogramsPool.Put(v)
|
|
case []record.RefFloatHistogramSample:
|
|
samples := v
|
|
minValidTime := h.minValidTime.Load()
|
|
// We split up the samples into chunks of 5000 samples or less.
|
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
|
// cause thousands of very large in flight buffers occupying large amounts
|
|
// of unused memory.
|
|
for len(samples) > 0 {
|
|
m := 5000
|
|
if len(samples) < m {
|
|
m = len(samples)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if histogramShards[i] == nil {
|
|
histogramShards[i] = processors[i].reuseHistogramBuf()
|
|
}
|
|
}
|
|
for _, sam := range samples[:m] {
|
|
if sam.T < minValidTime {
|
|
continue // Before minValidTime: discard.
|
|
}
|
|
if r, ok := multiRef[sam.Ref]; ok {
|
|
sam.Ref = r
|
|
}
|
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
|
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if len(histogramShards[i]) > 0 {
|
|
processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
|
|
histogramShards[i] = nil
|
|
}
|
|
}
|
|
samples = samples[m:]
|
|
}
|
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
floatHistogramsPool.Put(v)
|
|
case []record.RefMetadata:
|
|
for _, m := range v {
|
|
s := h.series.getByID(chunks.HeadSeriesRef(m.Ref))
|
|
if s == nil {
|
|
unknownMetadataRefs.Inc()
|
|
continue
|
|
}
|
|
s.meta = &metadata.Metadata{
|
|
Type: record.ToTextparseMetricType(m.Type),
|
|
Unit: m.Unit,
|
|
Help: m.Help,
|
|
}
|
|
}
|
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
metadataPool.Put(v)
|
|
default:
|
|
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
|
}
|
|
}
|
|
|
|
if decodeErr != nil {
|
|
return decodeErr
|
|
}
|
|
if seriesCreationErr != nil {
|
|
// Drain the channel to unblock the goroutine.
|
|
for range decoded {
|
|
}
|
|
return seriesCreationErr
|
|
}
|
|
|
|
// Signal termination to each worker and wait for it to close its output channel.
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].closeAndDrain()
|
|
}
|
|
close(exemplarsInput)
|
|
wg.Wait()
|
|
|
|
if r.Err() != nil {
|
|
return errors.Wrap(r.Err(), "read records")
|
|
}
|
|
|
|
if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 {
|
|
level.Warn(h.logger).Log(
|
|
"msg", "Unknown series references",
|
|
"samples", unknownRefs.Load(),
|
|
"exemplars", unknownExemplarRefs.Load(),
|
|
"histograms", unknownHistogramRefs.Load(),
|
|
"metadata", unknownMetadataRefs.Load(),
|
|
)
|
|
}
|
|
if count := mmapOverlappingChunks.Load(); count > 0 {
|
|
level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", count)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// resetSeriesWithMMappedChunks is only used during the WAL replay.
|
|
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) {
|
|
if mSeries.ref != walSeriesRef {
|
|
// Checking if the new m-mapped chunks overlap with the already existing ones.
|
|
if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 {
|
|
if overlapsClosedInterval(
|
|
mSeries.mmappedChunks[0].minTime,
|
|
mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
|
|
mmc[0].minTime,
|
|
mmc[len(mmc)-1].maxTime,
|
|
) {
|
|
level.Debug(h.logger).Log(
|
|
"msg", "M-mapped chunks overlap on a duplicate series record",
|
|
"series", mSeries.lset.String(),
|
|
"oldref", mSeries.ref,
|
|
"oldmint", mSeries.mmappedChunks[0].minTime,
|
|
"oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
|
|
"newref", walSeriesRef,
|
|
"newmint", mmc[0].minTime,
|
|
"newmaxt", mmc[len(mmc)-1].maxTime,
|
|
)
|
|
overlapped = true
|
|
}
|
|
}
|
|
}
|
|
|
|
h.metrics.chunksCreated.Add(float64(len(mmc) + len(oooMmc)))
|
|
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
|
|
h.metrics.chunks.Add(float64(len(mmc) + len(oooMmc) - len(mSeries.mmappedChunks)))
|
|
|
|
if mSeries.ooo != nil {
|
|
h.metrics.chunksRemoved.Add(float64(len(mSeries.ooo.oooMmappedChunks)))
|
|
h.metrics.chunks.Sub(float64(len(mSeries.ooo.oooMmappedChunks)))
|
|
}
|
|
|
|
mSeries.mmappedChunks = mmc
|
|
if len(oooMmc) == 0 {
|
|
mSeries.ooo = nil
|
|
} else {
|
|
if mSeries.ooo == nil {
|
|
mSeries.ooo = &memSeriesOOOFields{}
|
|
}
|
|
*mSeries.ooo = memSeriesOOOFields{oooMmappedChunks: oooMmc}
|
|
}
|
|
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
|
|
if len(mmc) == 0 {
|
|
mSeries.mmMaxTime = math.MinInt64
|
|
} else {
|
|
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
|
|
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
|
|
}
|
|
if len(oooMmc) != 0 {
|
|
// Mint and maxt can be in any chunk, they are not sorted.
|
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
|
for _, ch := range oooMmc {
|
|
if ch.minTime < mint {
|
|
mint = ch.minTime
|
|
}
|
|
if ch.maxTime > maxt {
|
|
maxt = ch.maxTime
|
|
}
|
|
}
|
|
h.updateMinOOOMaxOOOTime(mint, maxt)
|
|
}
|
|
|
|
// Any samples replayed till now would already be compacted. Resetting the head chunk.
|
|
mSeries.nextAt = 0
|
|
mSeries.headChunk = nil
|
|
mSeries.app = nil
|
|
return
|
|
}
|
|
|
|
type walSubsetProcessor struct {
|
|
input chan walSubsetProcessorInputItem
|
|
output chan []record.RefSample
|
|
histogramsOutput chan []histogramRecord
|
|
}
|
|
|
|
type walSubsetProcessorInputItem struct {
|
|
samples []record.RefSample
|
|
histogramSamples []histogramRecord
|
|
existingSeries *memSeries
|
|
walSeriesRef chunks.HeadSeriesRef
|
|
}
|
|
|
|
func (wp *walSubsetProcessor) setup() {
|
|
wp.input = make(chan walSubsetProcessorInputItem, 300)
|
|
wp.output = make(chan []record.RefSample, 300)
|
|
wp.histogramsOutput = make(chan []histogramRecord, 300)
|
|
}
|
|
|
|
func (wp *walSubsetProcessor) closeAndDrain() {
|
|
close(wp.input)
|
|
for range wp.output {
|
|
}
|
|
for range wp.histogramsOutput {
|
|
}
|
|
}
|
|
|
|
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
|
|
func (wp *walSubsetProcessor) reuseBuf() []record.RefSample {
|
|
select {
|
|
case buf := <-wp.output:
|
|
return buf[:0]
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
|
|
func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord {
|
|
select {
|
|
case buf := <-wp.histogramsOutput:
|
|
return buf[:0]
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processWALSamples adds the samples it receives to the head and passes
|
|
// the buffer received to an output channel for reuse.
|
|
// Samples before the minValidTime timestamp are discarded.
|
|
func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, mmapOverlappingChunks uint64) {
|
|
defer close(wp.output)
|
|
defer close(wp.histogramsOutput)
|
|
|
|
minValidTime := h.minValidTime.Load()
|
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
|
chunkRange := h.chunkRange.Load()
|
|
|
|
for in := range wp.input {
|
|
if in.existingSeries != nil {
|
|
mmc := mmappedChunks[in.walSeriesRef]
|
|
oooMmc := oooMmappedChunks[in.walSeriesRef]
|
|
if h.resetSeriesWithMMappedChunks(in.existingSeries, mmc, oooMmc, in.walSeriesRef) {
|
|
mmapOverlappingChunks++
|
|
}
|
|
continue
|
|
}
|
|
|
|
for _, s := range in.samples {
|
|
ms := h.series.getByID(s.Ref)
|
|
if ms == nil {
|
|
unknownRefs++
|
|
continue
|
|
}
|
|
if s.T <= ms.mmMaxTime {
|
|
continue
|
|
}
|
|
if s.T <= ms.mmMaxTime {
|
|
continue
|
|
}
|
|
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange); chunkCreated {
|
|
h.metrics.chunksCreated.Inc()
|
|
h.metrics.chunks.Inc()
|
|
}
|
|
if s.T > maxt {
|
|
maxt = s.T
|
|
}
|
|
if s.T < mint {
|
|
mint = s.T
|
|
}
|
|
}
|
|
select {
|
|
case wp.output <- in.samples:
|
|
default:
|
|
}
|
|
|
|
for _, s := range in.histogramSamples {
|
|
if s.t < minValidTime {
|
|
continue
|
|
}
|
|
ms := h.series.getByID(s.ref)
|
|
if ms == nil {
|
|
unknownHistogramRefs++
|
|
continue
|
|
}
|
|
if s.t <= ms.mmMaxTime {
|
|
continue
|
|
}
|
|
var chunkCreated bool
|
|
if s.h != nil {
|
|
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange)
|
|
} else {
|
|
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange)
|
|
}
|
|
if chunkCreated {
|
|
h.metrics.chunksCreated.Inc()
|
|
h.metrics.chunks.Inc()
|
|
}
|
|
if s.t > maxt {
|
|
maxt = s.t
|
|
}
|
|
if s.t < mint {
|
|
mint = s.t
|
|
}
|
|
}
|
|
|
|
select {
|
|
case wp.histogramsOutput <- in.histogramSamples:
|
|
default:
|
|
}
|
|
}
|
|
h.updateMinMaxTime(mint, maxt)
|
|
|
|
return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks
|
|
}
|
|
|
|
func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
|
// Track number of samples, m-map markers, that referenced a series we don't know about
|
|
// for error reporting.
|
|
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
|
|
|
|
lastSeq, lastOff := lastMmapRef.Unpack()
|
|
// Start workers that each process samples for a partition of the series ID space.
|
|
var (
|
|
wg sync.WaitGroup
|
|
concurrency = h.opts.WALReplayConcurrency
|
|
processors = make([]wblSubsetProcessor, concurrency)
|
|
|
|
dec record.Decoder
|
|
shards = make([][]record.RefSample, concurrency)
|
|
|
|
decodedCh = make(chan interface{}, 10)
|
|
decodeErr error
|
|
samplesPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefSample{}
|
|
},
|
|
}
|
|
markersPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefMmapMarker{}
|
|
},
|
|
}
|
|
)
|
|
|
|
defer func() {
|
|
// For CorruptionErr ensure to terminate all workers before exiting.
|
|
// We also wrap it to identify OOO WBL corruption.
|
|
_, ok := err.(*wlog.CorruptionErr)
|
|
if ok {
|
|
err = &errLoadWbl{err: err}
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].closeAndDrain()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
}()
|
|
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].setup()
|
|
|
|
go func(wp *wblSubsetProcessor) {
|
|
unknown := wp.processWBLSamples(h)
|
|
unknownRefs.Add(unknown)
|
|
wg.Done()
|
|
}(&processors[i])
|
|
}
|
|
|
|
go func() {
|
|
defer close(decodedCh)
|
|
for r.Next() {
|
|
rec := r.Record()
|
|
switch dec.Type(rec) {
|
|
case record.Samples:
|
|
samples := samplesPool.Get().([]record.RefSample)[:0]
|
|
samples, err = dec.Samples(rec, samples)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode samples"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decodedCh <- samples
|
|
case record.MmapMarkers:
|
|
markers := markersPool.Get().([]record.RefMmapMarker)[:0]
|
|
markers, err = dec.MmapMarkers(rec, markers)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode mmap markers"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decodedCh <- markers
|
|
default:
|
|
// Noop.
|
|
}
|
|
}
|
|
}()
|
|
|
|
// The records are always replayed from the oldest to the newest.
|
|
for d := range decodedCh {
|
|
switch v := d.(type) {
|
|
case []record.RefSample:
|
|
samples := v
|
|
// We split up the samples into parts of 5000 samples or less.
|
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
|
// cause thousands of very large in flight buffers occupying large amounts
|
|
// of unused memory.
|
|
for len(samples) > 0 {
|
|
m := 5000
|
|
if len(samples) < m {
|
|
m = len(samples)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
shards[i] = processors[i].reuseBuf()
|
|
}
|
|
for _, sam := range samples[:m] {
|
|
if r, ok := multiRef[sam.Ref]; ok {
|
|
sam.Ref = r
|
|
}
|
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
|
shards[mod] = append(shards[mod], sam)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].input <- shards[i]
|
|
}
|
|
samples = samples[m:]
|
|
}
|
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
samplesPool.Put(d)
|
|
case []record.RefMmapMarker:
|
|
markers := v
|
|
for _, rm := range markers {
|
|
seq, off := rm.MmapRef.Unpack()
|
|
if seq > lastSeq || (seq == lastSeq && off > lastOff) {
|
|
// This m-map chunk from markers was not present during
|
|
// the load of mmapped chunks that happened in the head
|
|
// initialization.
|
|
continue
|
|
}
|
|
|
|
if r, ok := multiRef[rm.Ref]; ok {
|
|
rm.Ref = r
|
|
}
|
|
|
|
ms := h.series.getByID(rm.Ref)
|
|
if ms == nil {
|
|
mmapMarkerUnknownRefs.Inc()
|
|
continue
|
|
}
|
|
idx := uint64(ms.ref) % uint64(concurrency)
|
|
// It is possible that some old sample is being processed in processWALSamples that
|
|
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
|
|
// processing all old samples after emptying the buffer.
|
|
processors[idx].waitUntilIdle()
|
|
// Lock the subset so we can modify the series object
|
|
processors[idx].mx.Lock()
|
|
|
|
// All samples till now have been m-mapped. Hence clear out the headChunk.
|
|
// In case some samples slipped through and went into m-map chunks because of changed
|
|
// chunk size parameters, we are not taking care of that here.
|
|
// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
|
|
// the size of ooo chunk was reduced between restart.
|
|
if ms.ooo != nil {
|
|
ms.ooo.oooHeadChunk = nil
|
|
}
|
|
|
|
processors[idx].mx.Unlock()
|
|
}
|
|
default:
|
|
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
|
|
}
|
|
}
|
|
|
|
if decodeErr != nil {
|
|
return decodeErr
|
|
}
|
|
|
|
// Signal termination to each worker and wait for it to close its output channel.
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].closeAndDrain()
|
|
}
|
|
wg.Wait()
|
|
|
|
if r.Err() != nil {
|
|
return errors.Wrap(r.Err(), "read records")
|
|
}
|
|
|
|
if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
|
|
level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type errLoadWbl struct {
|
|
err error
|
|
}
|
|
|
|
func (e errLoadWbl) Error() string {
|
|
return e.err.Error()
|
|
}
|
|
|
|
// To support errors.Cause().
|
|
func (e errLoadWbl) Cause() error {
|
|
return e.err
|
|
}
|
|
|
|
// To support errors.Unwrap().
|
|
func (e errLoadWbl) Unwrap() error {
|
|
return e.err
|
|
}
|
|
|
|
// isErrLoadOOOWal returns a boolean if the error is errLoadWbl.
|
|
func isErrLoadOOOWal(err error) bool {
|
|
_, ok := err.(*errLoadWbl)
|
|
return ok
|
|
}
|
|
|
|
type wblSubsetProcessor struct {
|
|
mx sync.Mutex // Take this lock while modifying series in the subset.
|
|
input chan []record.RefSample
|
|
output chan []record.RefSample
|
|
}
|
|
|
|
func (wp *wblSubsetProcessor) setup() {
|
|
wp.output = make(chan []record.RefSample, 300)
|
|
wp.input = make(chan []record.RefSample, 300)
|
|
}
|
|
|
|
func (wp *wblSubsetProcessor) closeAndDrain() {
|
|
close(wp.input)
|
|
for range wp.output {
|
|
}
|
|
}
|
|
|
|
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
|
|
func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample {
|
|
select {
|
|
case buf := <-wp.output:
|
|
return buf[:0]
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processWBLSamples adds the samples it receives to the head and passes
|
|
// the buffer received to an output channel for reuse.
|
|
// Samples before the minValidTime timestamp are discarded.
|
|
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
|
|
defer close(wp.output)
|
|
|
|
oooCapMax := h.opts.OutOfOrderCapMax.Load()
|
|
// We don't check for minValidTime for ooo samples.
|
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
|
for samples := range wp.input {
|
|
wp.mx.Lock()
|
|
for _, s := range samples {
|
|
ms := h.series.getByID(s.Ref)
|
|
if ms == nil {
|
|
unknownRefs++
|
|
continue
|
|
}
|
|
ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax)
|
|
if chunkCreated {
|
|
h.metrics.chunksCreated.Inc()
|
|
h.metrics.chunks.Inc()
|
|
}
|
|
if ok {
|
|
if s.T < mint {
|
|
mint = s.T
|
|
}
|
|
if s.T > maxt {
|
|
maxt = s.T
|
|
}
|
|
}
|
|
}
|
|
wp.mx.Unlock()
|
|
|
|
}
|
|
|
|
h.updateMinOOOMaxOOOTime(mint, maxt)
|
|
|
|
return unknownRefs
|
|
}
|
|
|
|
func (wp *wblSubsetProcessor) waitUntilIdle() {
|
|
select {
|
|
case <-wp.output: // Allow output side to drain to avoid deadlock.
|
|
default:
|
|
}
|
|
wp.input <- []record.RefSample{}
|
|
for len(wp.input) != 0 {
|
|
time.Sleep(10 * time.Microsecond)
|
|
select {
|
|
case <-wp.output: // Allow output side to drain to avoid deadlock.
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
const (
|
|
chunkSnapshotRecordTypeSeries uint8 = 1
|
|
chunkSnapshotRecordTypeTombstones uint8 = 2
|
|
chunkSnapshotRecordTypeExemplars uint8 = 3
|
|
)
|
|
|
|
type chunkSnapshotRecord struct {
|
|
ref chunks.HeadSeriesRef
|
|
lset labels.Labels
|
|
mc *memChunk
|
|
lastValue float64
|
|
}
|
|
|
|
func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
|
|
buf := encoding.Encbuf{B: b}
|
|
|
|
buf.PutByte(chunkSnapshotRecordTypeSeries)
|
|
buf.PutBE64(uint64(s.ref))
|
|
record.EncodeLabels(&buf, s.lset)
|
|
buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused.
|
|
|
|
s.Lock()
|
|
if s.headChunk == nil {
|
|
buf.PutUvarint(0)
|
|
} else {
|
|
buf.PutUvarint(1)
|
|
buf.PutBE64int64(s.headChunk.minTime)
|
|
buf.PutBE64int64(s.headChunk.maxTime)
|
|
buf.PutByte(byte(s.headChunk.chunk.Encoding()))
|
|
buf.PutUvarintBytes(s.headChunk.chunk.Bytes())
|
|
// Backwards compatibility for old sampleBuf which had last 4 samples.
|
|
for i := 0; i < 3; i++ {
|
|
buf.PutBE64int64(0)
|
|
buf.PutBEFloat64(0)
|
|
}
|
|
buf.PutBE64int64(0)
|
|
buf.PutBEFloat64(s.lastValue)
|
|
}
|
|
s.Unlock()
|
|
|
|
return buf.Get()
|
|
}
|
|
|
|
func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapshotRecord, err error) {
|
|
dec := encoding.Decbuf{B: b}
|
|
|
|
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries {
|
|
return csr, errors.Errorf("invalid record type %x", flag)
|
|
}
|
|
|
|
csr.ref = chunks.HeadSeriesRef(dec.Be64())
|
|
// The label set written to the disk is already sorted.
|
|
// TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it.
|
|
csr.lset = d.DecodeLabels(&dec)
|
|
|
|
_ = dec.Be64int64() // Was chunkRange but now unused.
|
|
if dec.Uvarint() == 0 {
|
|
return
|
|
}
|
|
|
|
csr.mc = &memChunk{}
|
|
csr.mc.minTime = dec.Be64int64()
|
|
csr.mc.maxTime = dec.Be64int64()
|
|
enc := chunkenc.Encoding(dec.Byte())
|
|
|
|
// The underlying bytes gets re-used later, so make a copy.
|
|
chunkBytes := dec.UvarintBytes()
|
|
chunkBytesCopy := make([]byte, len(chunkBytes))
|
|
copy(chunkBytesCopy, chunkBytes)
|
|
|
|
chk, err := chunkenc.FromData(enc, chunkBytesCopy)
|
|
if err != nil {
|
|
return csr, errors.Wrap(err, "chunk from data")
|
|
}
|
|
csr.mc.chunk = chk
|
|
|
|
// Backwards-compatibility for old sampleBuf which had last 4 samples.
|
|
for i := 0; i < 3; i++ {
|
|
_ = dec.Be64int64()
|
|
_ = dec.Be64Float64()
|
|
}
|
|
_ = dec.Be64int64()
|
|
csr.lastValue = dec.Be64Float64()
|
|
|
|
err = dec.Err()
|
|
if err != nil && len(dec.B) > 0 {
|
|
err = errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) {
|
|
buf := encoding.Encbuf{}
|
|
|
|
buf.PutByte(chunkSnapshotRecordTypeTombstones)
|
|
b, err := tombstones.Encode(tr)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "encode tombstones")
|
|
}
|
|
buf.PutUvarintBytes(b)
|
|
|
|
return buf.Get(), nil
|
|
}
|
|
|
|
func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) {
|
|
dec := encoding.Decbuf{B: b}
|
|
|
|
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones {
|
|
return nil, errors.Errorf("invalid record type %x", flag)
|
|
}
|
|
|
|
tr, err := tombstones.Decode(dec.UvarintBytes())
|
|
return tr, errors.Wrap(err, "decode tombstones")
|
|
}
|
|
|
|
const chunkSnapshotPrefix = "chunk_snapshot."
|
|
|
|
// ChunkSnapshot creates a snapshot of all the series and tombstones in the head.
|
|
// It deletes the old chunk snapshots if the chunk snapshot creation is successful.
|
|
//
|
|
// The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written
|
|
// using the WAL package. N is the last WAL segment present during snapshotting and
|
|
// M is the offset in segment N upto which data was written.
|
|
//
|
|
// The snapshot first contains all series (each in individual records and not sorted), followed by
|
|
// tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they
|
|
// were written to the circular buffer.
|
|
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
|
|
if h.wal == nil {
|
|
// If we are not storing any WAL, does not make sense to take a snapshot too.
|
|
level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled")
|
|
return &ChunkSnapshotStats{}, nil
|
|
}
|
|
h.chunkSnapshotMtx.Lock()
|
|
defer h.chunkSnapshotMtx.Unlock()
|
|
|
|
stats := &ChunkSnapshotStats{}
|
|
|
|
wlast, woffset, err := h.wal.LastSegmentAndOffset()
|
|
if err != nil && err != record.ErrNotFound {
|
|
return stats, errors.Wrap(err, "get last wal segment and offset")
|
|
}
|
|
|
|
_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
|
|
if err != nil && err != record.ErrNotFound {
|
|
return stats, errors.Wrap(err, "find last chunk snapshot")
|
|
}
|
|
|
|
if wlast == cslast && woffset == csoffset {
|
|
// Nothing has been written to the WAL/Head since the last snapshot.
|
|
return stats, nil
|
|
}
|
|
|
|
snapshotName := chunkSnapshotDir(wlast, woffset)
|
|
|
|
cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName)
|
|
cpdirtmp := cpdir + ".tmp"
|
|
stats.Dir = cpdir
|
|
|
|
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
|
|
return stats, errors.Wrap(err, "create chunk snapshot dir")
|
|
}
|
|
cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
|
|
if err != nil {
|
|
return stats, errors.Wrap(err, "open chunk snapshot")
|
|
}
|
|
|
|
// Ensures that an early return caused by an error doesn't leave any tmp files.
|
|
defer func() {
|
|
cp.Close()
|
|
os.RemoveAll(cpdirtmp)
|
|
}()
|
|
|
|
var (
|
|
buf []byte
|
|
recs [][]byte
|
|
)
|
|
// Add all series to the snapshot.
|
|
stripeSize := h.series.size
|
|
for i := 0; i < stripeSize; i++ {
|
|
h.series.locks[i].RLock()
|
|
|
|
for _, s := range h.series.series[i] {
|
|
start := len(buf)
|
|
buf = s.encodeToSnapshotRecord(buf)
|
|
if len(buf[start:]) == 0 {
|
|
continue // All contents discarded.
|
|
}
|
|
recs = append(recs, buf[start:])
|
|
// Flush records in 10 MB increments.
|
|
if len(buf) > 10*1024*1024 {
|
|
if err := cp.Log(recs...); err != nil {
|
|
h.series.locks[i].RUnlock()
|
|
return stats, errors.Wrap(err, "flush records")
|
|
}
|
|
buf, recs = buf[:0], recs[:0]
|
|
}
|
|
}
|
|
stats.TotalSeries += len(h.series.series[i])
|
|
|
|
h.series.locks[i].RUnlock()
|
|
}
|
|
|
|
// Add tombstones to the snapshot.
|
|
tombstonesReader, err := h.Tombstones()
|
|
if err != nil {
|
|
return stats, errors.Wrap(err, "get tombstones")
|
|
}
|
|
rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader)
|
|
if err != nil {
|
|
return stats, errors.Wrap(err, "encode tombstones")
|
|
}
|
|
recs = append(recs, rec)
|
|
// Flush remaining series records and tombstones.
|
|
if err := cp.Log(recs...); err != nil {
|
|
return stats, errors.Wrap(err, "flush records")
|
|
}
|
|
buf = buf[:0]
|
|
|
|
// Add exemplars in the snapshot.
|
|
// We log in batches, with each record having upto 10000 exemplars.
|
|
// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB.
|
|
maxExemplarsPerRecord := 10000
|
|
batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord)
|
|
enc := record.Encoder{}
|
|
flushExemplars := func() error {
|
|
if len(batch) == 0 {
|
|
return nil
|
|
}
|
|
buf = buf[:0]
|
|
encbuf := encoding.Encbuf{B: buf}
|
|
encbuf.PutByte(chunkSnapshotRecordTypeExemplars)
|
|
enc.EncodeExemplarsIntoBuffer(batch, &encbuf)
|
|
if err := cp.Log(encbuf.Get()); err != nil {
|
|
return errors.Wrap(err, "log exemplars")
|
|
}
|
|
buf, batch = buf[:0], batch[:0]
|
|
return nil
|
|
}
|
|
err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error {
|
|
if len(batch) >= maxExemplarsPerRecord {
|
|
if err := flushExemplars(); err != nil {
|
|
return errors.Wrap(err, "flush exemplars")
|
|
}
|
|
}
|
|
|
|
ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels)
|
|
if ms == nil {
|
|
// It is possible that exemplar refers to some old series. We discard such exemplars.
|
|
return nil
|
|
}
|
|
batch = append(batch, record.RefExemplar{
|
|
Ref: ms.ref,
|
|
T: e.Ts,
|
|
V: e.Value,
|
|
Labels: e.Labels,
|
|
})
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return stats, errors.Wrap(err, "iterate exemplars")
|
|
}
|
|
|
|
// Flush remaining exemplars.
|
|
if err := flushExemplars(); err != nil {
|
|
return stats, errors.Wrap(err, "flush exemplars at the end")
|
|
}
|
|
|
|
if err := cp.Close(); err != nil {
|
|
return stats, errors.Wrap(err, "close chunk snapshot")
|
|
}
|
|
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
|
|
return stats, errors.Wrap(err, "rename chunk snapshot directory")
|
|
}
|
|
|
|
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil {
|
|
// Leftover old chunk snapshots do not cause problems down the line beyond
|
|
// occupying disk space.
|
|
// They will just be ignored since a higher chunk snapshot exists.
|
|
level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err)
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
func chunkSnapshotDir(wlast, woffset int) string {
|
|
return fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset)
|
|
}
|
|
|
|
func (h *Head) performChunkSnapshot() error {
|
|
level.Info(h.logger).Log("msg", "creating chunk snapshot")
|
|
startTime := time.Now()
|
|
stats, err := h.ChunkSnapshot()
|
|
elapsed := time.Since(startTime)
|
|
if err == nil {
|
|
level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir)
|
|
}
|
|
return errors.Wrap(err, "chunk snapshot")
|
|
}
|
|
|
|
// ChunkSnapshotStats returns stats about a created chunk snapshot.
|
|
type ChunkSnapshotStats struct {
|
|
TotalSeries int
|
|
Dir string
|
|
}
|
|
|
|
// LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot.
|
|
// If dir does not contain any chunk snapshots, ErrNotFound is returned.
|
|
func LastChunkSnapshot(dir string) (string, int, int, error) {
|
|
files, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return "", 0, 0, err
|
|
}
|
|
maxIdx, maxOffset := -1, -1
|
|
maxFileName := ""
|
|
for i := 0; i < len(files); i++ {
|
|
fi := files[i]
|
|
|
|
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
|
|
continue
|
|
}
|
|
if !fi.IsDir() {
|
|
return "", 0, 0, errors.Errorf("chunk snapshot %s is not a directory", fi.Name())
|
|
}
|
|
|
|
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
|
|
if len(splits) != 2 {
|
|
// Chunk snapshots is not in the right format, we do not care about it.
|
|
continue
|
|
}
|
|
|
|
idx, err := strconv.Atoi(splits[0])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
offset, err := strconv.Atoi(splits[1])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if idx > maxIdx || (idx == maxIdx && offset > maxOffset) {
|
|
maxIdx, maxOffset = idx, offset
|
|
maxFileName = filepath.Join(dir, fi.Name())
|
|
}
|
|
}
|
|
if maxFileName == "" {
|
|
return "", 0, 0, record.ErrNotFound
|
|
}
|
|
return maxFileName, maxIdx, maxOffset, nil
|
|
}
|
|
|
|
// DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
|
|
func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
|
|
files, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, fi := range files {
|
|
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
|
|
continue
|
|
}
|
|
|
|
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
|
|
if len(splits) != 2 {
|
|
continue
|
|
}
|
|
|
|
idx, err := strconv.Atoi(splits[0])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
offset, err := strconv.Atoi(splits[1])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if idx < maxIndex || (idx == maxIndex && offset < maxOffset) {
|
|
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
|
|
errs.Add(err)
|
|
}
|
|
}
|
|
|
|
}
|
|
return errs.Err()
|
|
}
|
|
|
|
// loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned,
|
|
// it is the responsibility of the caller to clear the contents of the Head.
|
|
func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSeries, error) {
|
|
dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
|
|
if err != nil {
|
|
if err == record.ErrNotFound {
|
|
return snapIdx, snapOffset, nil, nil
|
|
}
|
|
return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot")
|
|
}
|
|
|
|
start := time.Now()
|
|
sr, err := wlog.NewSegmentsReader(dir)
|
|
if err != nil {
|
|
return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
|
|
}
|
|
defer func() {
|
|
if err := sr.Close(); err != nil {
|
|
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
|
}
|
|
}()
|
|
|
|
var (
|
|
numSeries = 0
|
|
unknownRefs = int64(0)
|
|
concurrency = h.opts.WALReplayConcurrency
|
|
wg sync.WaitGroup
|
|
recordChan = make(chan chunkSnapshotRecord, 5*concurrency)
|
|
shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, concurrency)
|
|
errChan = make(chan error, concurrency)
|
|
refSeries map[chunks.HeadSeriesRef]*memSeries
|
|
exemplarBuf []record.RefExemplar
|
|
dec record.Decoder
|
|
)
|
|
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
go func(idx int, rc <-chan chunkSnapshotRecord) {
|
|
defer wg.Done()
|
|
defer func() {
|
|
// If there was an error, drain the channel
|
|
// to unblock the main thread.
|
|
for range rc {
|
|
}
|
|
}()
|
|
|
|
shardedRefSeries[idx] = make(map[chunks.HeadSeriesRef]*memSeries)
|
|
localRefSeries := shardedRefSeries[idx]
|
|
|
|
for csr := range rc {
|
|
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
localRefSeries[csr.ref] = series
|
|
for {
|
|
seriesID := uint64(series.ref)
|
|
lastSeriesID := h.lastSeriesID.Load()
|
|
if lastSeriesID >= seriesID || h.lastSeriesID.CompareAndSwap(lastSeriesID, seriesID) {
|
|
break
|
|
}
|
|
}
|
|
|
|
if csr.mc == nil {
|
|
continue
|
|
}
|
|
series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
|
|
series.headChunk = csr.mc
|
|
series.lastValue = csr.lastValue
|
|
|
|
app, err := series.headChunk.chunk.Appender()
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
series.app = app
|
|
|
|
h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime)
|
|
}
|
|
}(i, recordChan)
|
|
}
|
|
|
|
r := wlog.NewReader(sr)
|
|
var loopErr error
|
|
Outer:
|
|
for r.Next() {
|
|
select {
|
|
case err := <-errChan:
|
|
errChan <- err
|
|
break Outer
|
|
default:
|
|
}
|
|
|
|
rec := r.Record()
|
|
switch rec[0] {
|
|
case chunkSnapshotRecordTypeSeries:
|
|
numSeries++
|
|
csr, err := decodeSeriesFromChunkSnapshot(&dec, rec)
|
|
if err != nil {
|
|
loopErr = errors.Wrap(err, "decode series record")
|
|
break Outer
|
|
}
|
|
recordChan <- csr
|
|
|
|
case chunkSnapshotRecordTypeTombstones:
|
|
tr, err := decodeTombstonesSnapshotRecord(rec)
|
|
if err != nil {
|
|
loopErr = errors.Wrap(err, "decode tombstones")
|
|
break Outer
|
|
}
|
|
|
|
if err = tr.Iter(func(ref storage.SeriesRef, ivs tombstones.Intervals) error {
|
|
h.tombstones.AddInterval(ref, ivs...)
|
|
return nil
|
|
}); err != nil {
|
|
loopErr = errors.Wrap(err, "iterate tombstones")
|
|
break Outer
|
|
}
|
|
|
|
case chunkSnapshotRecordTypeExemplars:
|
|
// Exemplars are at the end of snapshot. So all series are loaded at this point.
|
|
if len(refSeries) == 0 {
|
|
close(recordChan)
|
|
wg.Wait()
|
|
|
|
refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries)
|
|
for _, shard := range shardedRefSeries {
|
|
for k, v := range shard {
|
|
refSeries[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
if !h.opts.EnableExemplarStorage || h.opts.MaxExemplars.Load() <= 0 {
|
|
// Exemplar storage is disabled.
|
|
continue Outer
|
|
}
|
|
|
|
decbuf := encoding.Decbuf{B: rec[1:]}
|
|
|
|
exemplarBuf = exemplarBuf[:0]
|
|
exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf)
|
|
if err != nil {
|
|
loopErr = errors.Wrap(err, "exemplars from buffer")
|
|
break Outer
|
|
}
|
|
|
|
for _, e := range exemplarBuf {
|
|
ms, ok := refSeries[e.Ref]
|
|
if !ok {
|
|
unknownRefs++
|
|
continue
|
|
}
|
|
|
|
if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{
|
|
Labels: e.Labels,
|
|
Value: e.V,
|
|
Ts: e.T,
|
|
}); err != nil {
|
|
loopErr = errors.Wrap(err, "add exemplar")
|
|
break Outer
|
|
}
|
|
}
|
|
|
|
default:
|
|
// This is a record type we don't understand. It is either and old format from earlier versions,
|
|
// or a new format and the code was rolled back to old version.
|
|
loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0])
|
|
break Outer
|
|
}
|
|
}
|
|
if len(refSeries) == 0 {
|
|
close(recordChan)
|
|
wg.Wait()
|
|
}
|
|
|
|
close(errChan)
|
|
merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop"))
|
|
for err := range errChan {
|
|
merr.Add(errors.Wrap(err, "record processing"))
|
|
}
|
|
if err := merr.Err(); err != nil {
|
|
return -1, -1, nil, err
|
|
}
|
|
|
|
if r.Err() != nil {
|
|
return -1, -1, nil, errors.Wrap(r.Err(), "read records")
|
|
}
|
|
|
|
if len(refSeries) == 0 {
|
|
// We had no exemplar record, so we have to build the map here.
|
|
refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries)
|
|
for _, shard := range shardedRefSeries {
|
|
for k, v := range shard {
|
|
refSeries[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
elapsed := time.Since(start)
|
|
level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String())
|
|
if unknownRefs > 0 {
|
|
level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs)
|
|
}
|
|
|
|
return snapIdx, snapOffset, refSeries, nil
|
|
}
|