mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
3c80963e81
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks. When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call. If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending our sample to it. Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed. When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes. Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait for it to be mmapped. If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting queries and scrapes. Queries might timeout, since by default they have a 2 minute timeout set. Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything. To avoid this we need to remove mmapping from append path, since mmapping is blocking. But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later. This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples, while older, yet to be mmapped, chunks are linked to it. Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger it manually, which reduces the risk that it will have to compete for mmap locks with other chunks. Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1560 lines
43 KiB
Go
1560 lines
43 KiB
Go
// Copyright 2021 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// nolint:revive // Many legitimately empty blocks in this file.
|
|
package tsdb
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-kit/log/level"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/metadata"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/encoding"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
|
"github.com/prometheus/prometheus/util/zeropool"
|
|
)
|
|
|
|
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
|
|
// to simplify the WAL replay.
|
|
type histogramRecord struct {
|
|
ref chunks.HeadSeriesRef
|
|
t int64
|
|
h *histogram.Histogram
|
|
fh *histogram.FloatHistogram
|
|
}
|
|
|
|
func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
|
|
// Track number of samples that referenced a series we don't know about
|
|
// for error reporting.
|
|
var unknownRefs atomic.Uint64
|
|
var unknownExemplarRefs atomic.Uint64
|
|
var unknownHistogramRefs atomic.Uint64
|
|
var unknownMetadataRefs atomic.Uint64
|
|
// Track number of series records that had overlapping m-map chunks.
|
|
var mmapOverlappingChunks atomic.Uint64
|
|
|
|
// Start workers that each process samples for a partition of the series ID space.
|
|
var (
|
|
wg sync.WaitGroup
|
|
concurrency = h.opts.WALReplayConcurrency
|
|
processors = make([]walSubsetProcessor, concurrency)
|
|
exemplarsInput chan record.RefExemplar
|
|
|
|
dec record.Decoder
|
|
shards = make([][]record.RefSample, concurrency)
|
|
histogramShards = make([][]histogramRecord, concurrency)
|
|
|
|
decoded = make(chan interface{}, 10)
|
|
decodeErr, seriesCreationErr error
|
|
|
|
seriesPool zeropool.Pool[[]record.RefSeries]
|
|
samplesPool zeropool.Pool[[]record.RefSample]
|
|
tstonesPool zeropool.Pool[[]tombstones.Stone]
|
|
exemplarsPool zeropool.Pool[[]record.RefExemplar]
|
|
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
|
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
|
metadataPool zeropool.Pool[[]record.RefMetadata]
|
|
)
|
|
|
|
defer func() {
|
|
// For CorruptionErr ensure to terminate all workers before exiting.
|
|
_, ok := err.(*wlog.CorruptionErr)
|
|
if ok || seriesCreationErr != nil {
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].closeAndDrain()
|
|
}
|
|
close(exemplarsInput)
|
|
wg.Wait()
|
|
}
|
|
}()
|
|
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].setup()
|
|
|
|
go func(wp *walSubsetProcessor) {
|
|
unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
|
|
unknownRefs.Add(unknown)
|
|
mmapOverlappingChunks.Add(overlapping)
|
|
unknownHistogramRefs.Add(unknownHistograms)
|
|
wg.Done()
|
|
}(&processors[i])
|
|
}
|
|
|
|
wg.Add(1)
|
|
exemplarsInput = make(chan record.RefExemplar, 300)
|
|
go func(input <-chan record.RefExemplar) {
|
|
var err error
|
|
defer wg.Done()
|
|
for e := range input {
|
|
ms := h.series.getByID(e.Ref)
|
|
if ms == nil {
|
|
unknownExemplarRefs.Inc()
|
|
continue
|
|
}
|
|
|
|
if e.T < h.minValidTime.Load() {
|
|
continue
|
|
}
|
|
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
|
|
// replaying the WAL, so lets just log the error if it's not that type.
|
|
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
|
|
if err != nil && err == storage.ErrOutOfOrderExemplar {
|
|
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
|
|
}
|
|
}
|
|
}(exemplarsInput)
|
|
|
|
go func() {
|
|
defer close(decoded)
|
|
var err error
|
|
for r.Next() {
|
|
rec := r.Record()
|
|
switch dec.Type(rec) {
|
|
case record.Series:
|
|
series := seriesPool.Get()[:0]
|
|
series, err = dec.Series(rec, series)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode series"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- series
|
|
case record.Samples:
|
|
samples := samplesPool.Get()[:0]
|
|
samples, err = dec.Samples(rec, samples)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode samples"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- samples
|
|
case record.Tombstones:
|
|
tstones := tstonesPool.Get()[:0]
|
|
tstones, err = dec.Tombstones(rec, tstones)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode tombstones"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- tstones
|
|
case record.Exemplars:
|
|
exemplars := exemplarsPool.Get()[:0]
|
|
exemplars, err = dec.Exemplars(rec, exemplars)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode exemplars"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- exemplars
|
|
case record.HistogramSamples:
|
|
hists := histogramsPool.Get()[:0]
|
|
hists, err = dec.HistogramSamples(rec, hists)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode histograms"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- hists
|
|
case record.FloatHistogramSamples:
|
|
hists := floatHistogramsPool.Get()[:0]
|
|
hists, err = dec.FloatHistogramSamples(rec, hists)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode float histograms"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- hists
|
|
case record.Metadata:
|
|
meta := metadataPool.Get()[:0]
|
|
meta, err := dec.Metadata(rec, meta)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode metadata"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- meta
|
|
default:
|
|
// Noop.
|
|
}
|
|
}
|
|
}()
|
|
|
|
// The records are always replayed from the oldest to the newest.
|
|
Outer:
|
|
for d := range decoded {
|
|
switch v := d.(type) {
|
|
case []record.RefSeries:
|
|
for _, walSeries := range v {
|
|
mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels)
|
|
if err != nil {
|
|
seriesCreationErr = err
|
|
break Outer
|
|
}
|
|
|
|
if chunks.HeadSeriesRef(h.lastSeriesID.Load()) < walSeries.Ref {
|
|
h.lastSeriesID.Store(uint64(walSeries.Ref))
|
|
}
|
|
if !created {
|
|
multiRef[walSeries.Ref] = mSeries.ref
|
|
}
|
|
|
|
idx := uint64(mSeries.ref) % uint64(concurrency)
|
|
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
|
|
}
|
|
seriesPool.Put(v)
|
|
case []record.RefSample:
|
|
samples := v
|
|
minValidTime := h.minValidTime.Load()
|
|
// We split up the samples into chunks of 5000 samples or less.
|
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
|
// cause thousands of very large in flight buffers occupying large amounts
|
|
// of unused memory.
|
|
for len(samples) > 0 {
|
|
m := 5000
|
|
if len(samples) < m {
|
|
m = len(samples)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if shards[i] == nil {
|
|
shards[i] = processors[i].reuseBuf()
|
|
}
|
|
}
|
|
for _, sam := range samples[:m] {
|
|
if sam.T < minValidTime {
|
|
continue // Before minValidTime: discard.
|
|
}
|
|
if r, ok := multiRef[sam.Ref]; ok {
|
|
sam.Ref = r
|
|
}
|
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
|
shards[mod] = append(shards[mod], sam)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if len(shards[i]) > 0 {
|
|
processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]}
|
|
shards[i] = nil
|
|
}
|
|
}
|
|
samples = samples[m:]
|
|
}
|
|
samplesPool.Put(v)
|
|
case []tombstones.Stone:
|
|
for _, s := range v {
|
|
for _, itv := range s.Intervals {
|
|
if itv.Maxt < h.minValidTime.Load() {
|
|
continue
|
|
}
|
|
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
|
|
unknownRefs.Inc()
|
|
continue
|
|
}
|
|
h.tombstones.AddInterval(s.Ref, itv)
|
|
}
|
|
}
|
|
tstonesPool.Put(v)
|
|
case []record.RefExemplar:
|
|
for _, e := range v {
|
|
exemplarsInput <- e
|
|
}
|
|
exemplarsPool.Put(v)
|
|
case []record.RefHistogramSample:
|
|
samples := v
|
|
minValidTime := h.minValidTime.Load()
|
|
// We split up the samples into chunks of 5000 samples or less.
|
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
|
// cause thousands of very large in flight buffers occupying large amounts
|
|
// of unused memory.
|
|
for len(samples) > 0 {
|
|
m := 5000
|
|
if len(samples) < m {
|
|
m = len(samples)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if histogramShards[i] == nil {
|
|
histogramShards[i] = processors[i].reuseHistogramBuf()
|
|
}
|
|
}
|
|
for _, sam := range samples[:m] {
|
|
if sam.T < minValidTime {
|
|
continue // Before minValidTime: discard.
|
|
}
|
|
if r, ok := multiRef[sam.Ref]; ok {
|
|
sam.Ref = r
|
|
}
|
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
|
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if len(histogramShards[i]) > 0 {
|
|
processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
|
|
histogramShards[i] = nil
|
|
}
|
|
}
|
|
samples = samples[m:]
|
|
}
|
|
histogramsPool.Put(v)
|
|
case []record.RefFloatHistogramSample:
|
|
samples := v
|
|
minValidTime := h.minValidTime.Load()
|
|
// We split up the samples into chunks of 5000 samples or less.
|
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
|
// cause thousands of very large in flight buffers occupying large amounts
|
|
// of unused memory.
|
|
for len(samples) > 0 {
|
|
m := 5000
|
|
if len(samples) < m {
|
|
m = len(samples)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if histogramShards[i] == nil {
|
|
histogramShards[i] = processors[i].reuseHistogramBuf()
|
|
}
|
|
}
|
|
for _, sam := range samples[:m] {
|
|
if sam.T < minValidTime {
|
|
continue // Before minValidTime: discard.
|
|
}
|
|
if r, ok := multiRef[sam.Ref]; ok {
|
|
sam.Ref = r
|
|
}
|
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
|
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
if len(histogramShards[i]) > 0 {
|
|
processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
|
|
histogramShards[i] = nil
|
|
}
|
|
}
|
|
samples = samples[m:]
|
|
}
|
|
floatHistogramsPool.Put(v)
|
|
case []record.RefMetadata:
|
|
for _, m := range v {
|
|
s := h.series.getByID(m.Ref)
|
|
if s == nil {
|
|
unknownMetadataRefs.Inc()
|
|
continue
|
|
}
|
|
s.meta = &metadata.Metadata{
|
|
Type: record.ToTextparseMetricType(m.Type),
|
|
Unit: m.Unit,
|
|
Help: m.Help,
|
|
}
|
|
}
|
|
metadataPool.Put(v)
|
|
default:
|
|
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
|
}
|
|
}
|
|
|
|
if decodeErr != nil {
|
|
return decodeErr
|
|
}
|
|
if seriesCreationErr != nil {
|
|
// Drain the channel to unblock the goroutine.
|
|
for range decoded {
|
|
}
|
|
return seriesCreationErr
|
|
}
|
|
|
|
// Signal termination to each worker and wait for it to close its output channel.
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].closeAndDrain()
|
|
}
|
|
close(exemplarsInput)
|
|
wg.Wait()
|
|
|
|
if r.Err() != nil {
|
|
return errors.Wrap(r.Err(), "read records")
|
|
}
|
|
|
|
if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 {
|
|
level.Warn(h.logger).Log(
|
|
"msg", "Unknown series references",
|
|
"samples", unknownRefs.Load(),
|
|
"exemplars", unknownExemplarRefs.Load(),
|
|
"histograms", unknownHistogramRefs.Load(),
|
|
"metadata", unknownMetadataRefs.Load(),
|
|
)
|
|
}
|
|
if count := mmapOverlappingChunks.Load(); count > 0 {
|
|
level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", count)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// resetSeriesWithMMappedChunks is only used during the WAL replay.
|
|
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) {
|
|
if mSeries.ref != walSeriesRef {
|
|
// Checking if the new m-mapped chunks overlap with the already existing ones.
|
|
if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 {
|
|
if overlapsClosedInterval(
|
|
mSeries.mmappedChunks[0].minTime,
|
|
mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
|
|
mmc[0].minTime,
|
|
mmc[len(mmc)-1].maxTime,
|
|
) {
|
|
level.Debug(h.logger).Log(
|
|
"msg", "M-mapped chunks overlap on a duplicate series record",
|
|
"series", mSeries.lset.String(),
|
|
"oldref", mSeries.ref,
|
|
"oldmint", mSeries.mmappedChunks[0].minTime,
|
|
"oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
|
|
"newref", walSeriesRef,
|
|
"newmint", mmc[0].minTime,
|
|
"newmaxt", mmc[len(mmc)-1].maxTime,
|
|
)
|
|
overlapped = true
|
|
}
|
|
}
|
|
}
|
|
|
|
h.metrics.chunksCreated.Add(float64(len(mmc) + len(oooMmc)))
|
|
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
|
|
h.metrics.chunks.Add(float64(len(mmc) + len(oooMmc) - len(mSeries.mmappedChunks)))
|
|
|
|
if mSeries.ooo != nil {
|
|
h.metrics.chunksRemoved.Add(float64(len(mSeries.ooo.oooMmappedChunks)))
|
|
h.metrics.chunks.Sub(float64(len(mSeries.ooo.oooMmappedChunks)))
|
|
}
|
|
|
|
mSeries.mmappedChunks = mmc
|
|
if len(oooMmc) == 0 {
|
|
mSeries.ooo = nil
|
|
} else {
|
|
if mSeries.ooo == nil {
|
|
mSeries.ooo = &memSeriesOOOFields{}
|
|
}
|
|
*mSeries.ooo = memSeriesOOOFields{oooMmappedChunks: oooMmc}
|
|
}
|
|
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
|
|
if len(mmc) == 0 {
|
|
mSeries.mmMaxTime = math.MinInt64
|
|
} else {
|
|
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
|
|
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
|
|
}
|
|
if len(oooMmc) != 0 {
|
|
// Mint and maxt can be in any chunk, they are not sorted.
|
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
|
for _, ch := range oooMmc {
|
|
if ch.minTime < mint {
|
|
mint = ch.minTime
|
|
}
|
|
if ch.maxTime > maxt {
|
|
maxt = ch.maxTime
|
|
}
|
|
}
|
|
h.updateMinOOOMaxOOOTime(mint, maxt)
|
|
}
|
|
|
|
// Any samples replayed till now would already be compacted. Resetting the head chunk.
|
|
mSeries.nextAt = 0
|
|
mSeries.headChunks = nil
|
|
mSeries.app = nil
|
|
return
|
|
}
|
|
|
|
type walSubsetProcessor struct {
|
|
input chan walSubsetProcessorInputItem
|
|
output chan []record.RefSample
|
|
histogramsOutput chan []histogramRecord
|
|
}
|
|
|
|
type walSubsetProcessorInputItem struct {
|
|
samples []record.RefSample
|
|
histogramSamples []histogramRecord
|
|
existingSeries *memSeries
|
|
walSeriesRef chunks.HeadSeriesRef
|
|
}
|
|
|
|
func (wp *walSubsetProcessor) setup() {
|
|
wp.input = make(chan walSubsetProcessorInputItem, 300)
|
|
wp.output = make(chan []record.RefSample, 300)
|
|
wp.histogramsOutput = make(chan []histogramRecord, 300)
|
|
}
|
|
|
|
func (wp *walSubsetProcessor) closeAndDrain() {
|
|
close(wp.input)
|
|
for range wp.output {
|
|
}
|
|
for range wp.histogramsOutput {
|
|
}
|
|
}
|
|
|
|
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
|
|
func (wp *walSubsetProcessor) reuseBuf() []record.RefSample {
|
|
select {
|
|
case buf := <-wp.output:
|
|
return buf[:0]
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
|
|
func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord {
|
|
select {
|
|
case buf := <-wp.histogramsOutput:
|
|
return buf[:0]
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processWALSamples adds the samples it receives to the head and passes
|
|
// the buffer received to an output channel for reuse.
|
|
// Samples before the minValidTime timestamp are discarded.
|
|
func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, mmapOverlappingChunks uint64) {
|
|
defer close(wp.output)
|
|
defer close(wp.histogramsOutput)
|
|
|
|
minValidTime := h.minValidTime.Load()
|
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
|
appendChunkOpts := chunkOpts{
|
|
chunkDiskMapper: h.chunkDiskMapper,
|
|
chunkRange: h.chunkRange.Load(),
|
|
samplesPerChunk: h.opts.SamplesPerChunk,
|
|
}
|
|
|
|
for in := range wp.input {
|
|
if in.existingSeries != nil {
|
|
mmc := mmappedChunks[in.walSeriesRef]
|
|
oooMmc := oooMmappedChunks[in.walSeriesRef]
|
|
if h.resetSeriesWithMMappedChunks(in.existingSeries, mmc, oooMmc, in.walSeriesRef) {
|
|
mmapOverlappingChunks++
|
|
}
|
|
continue
|
|
}
|
|
|
|
for _, s := range in.samples {
|
|
ms := h.series.getByID(s.Ref)
|
|
if ms == nil {
|
|
unknownRefs++
|
|
continue
|
|
}
|
|
if s.T <= ms.mmMaxTime {
|
|
continue
|
|
}
|
|
if s.T <= ms.mmMaxTime {
|
|
continue
|
|
}
|
|
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
|
|
h.metrics.chunksCreated.Inc()
|
|
h.metrics.chunks.Inc()
|
|
_ = ms.mmapChunks(h.chunkDiskMapper)
|
|
}
|
|
if s.T > maxt {
|
|
maxt = s.T
|
|
}
|
|
if s.T < mint {
|
|
mint = s.T
|
|
}
|
|
}
|
|
select {
|
|
case wp.output <- in.samples:
|
|
default:
|
|
}
|
|
|
|
for _, s := range in.histogramSamples {
|
|
if s.t < minValidTime {
|
|
continue
|
|
}
|
|
ms := h.series.getByID(s.ref)
|
|
if ms == nil {
|
|
unknownHistogramRefs++
|
|
continue
|
|
}
|
|
if s.t <= ms.mmMaxTime {
|
|
continue
|
|
}
|
|
var chunkCreated bool
|
|
if s.h != nil {
|
|
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
|
|
} else {
|
|
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
|
|
}
|
|
if chunkCreated {
|
|
h.metrics.chunksCreated.Inc()
|
|
h.metrics.chunks.Inc()
|
|
}
|
|
if s.t > maxt {
|
|
maxt = s.t
|
|
}
|
|
if s.t < mint {
|
|
mint = s.t
|
|
}
|
|
}
|
|
|
|
select {
|
|
case wp.histogramsOutput <- in.histogramSamples:
|
|
default:
|
|
}
|
|
}
|
|
h.updateMinMaxTime(mint, maxt)
|
|
|
|
return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks
|
|
}
|
|
|
|
func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
|
// Track number of samples, m-map markers, that referenced a series we don't know about
|
|
// for error reporting.
|
|
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
|
|
|
|
lastSeq, lastOff := lastMmapRef.Unpack()
|
|
// Start workers that each process samples for a partition of the series ID space.
|
|
var (
|
|
wg sync.WaitGroup
|
|
concurrency = h.opts.WALReplayConcurrency
|
|
processors = make([]wblSubsetProcessor, concurrency)
|
|
|
|
dec record.Decoder
|
|
shards = make([][]record.RefSample, concurrency)
|
|
|
|
decodedCh = make(chan interface{}, 10)
|
|
decodeErr error
|
|
samplesPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefSample{}
|
|
},
|
|
}
|
|
markersPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefMmapMarker{}
|
|
},
|
|
}
|
|
)
|
|
|
|
defer func() {
|
|
// For CorruptionErr ensure to terminate all workers before exiting.
|
|
// We also wrap it to identify OOO WBL corruption.
|
|
_, ok := err.(*wlog.CorruptionErr)
|
|
if ok {
|
|
err = &errLoadWbl{err: err}
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].closeAndDrain()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
}()
|
|
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].setup()
|
|
|
|
go func(wp *wblSubsetProcessor) {
|
|
unknown := wp.processWBLSamples(h)
|
|
unknownRefs.Add(unknown)
|
|
wg.Done()
|
|
}(&processors[i])
|
|
}
|
|
|
|
go func() {
|
|
defer close(decodedCh)
|
|
for r.Next() {
|
|
rec := r.Record()
|
|
switch dec.Type(rec) {
|
|
case record.Samples:
|
|
samples := samplesPool.Get().([]record.RefSample)[:0]
|
|
samples, err = dec.Samples(rec, samples)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode samples"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decodedCh <- samples
|
|
case record.MmapMarkers:
|
|
markers := markersPool.Get().([]record.RefMmapMarker)[:0]
|
|
markers, err = dec.MmapMarkers(rec, markers)
|
|
if err != nil {
|
|
decodeErr = &wlog.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode mmap markers"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decodedCh <- markers
|
|
default:
|
|
// Noop.
|
|
}
|
|
}
|
|
}()
|
|
|
|
// The records are always replayed from the oldest to the newest.
|
|
for d := range decodedCh {
|
|
switch v := d.(type) {
|
|
case []record.RefSample:
|
|
samples := v
|
|
// We split up the samples into parts of 5000 samples or less.
|
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
|
// cause thousands of very large in flight buffers occupying large amounts
|
|
// of unused memory.
|
|
for len(samples) > 0 {
|
|
m := 5000
|
|
if len(samples) < m {
|
|
m = len(samples)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
shards[i] = processors[i].reuseBuf()
|
|
}
|
|
for _, sam := range samples[:m] {
|
|
if r, ok := multiRef[sam.Ref]; ok {
|
|
sam.Ref = r
|
|
}
|
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
|
shards[mod] = append(shards[mod], sam)
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].input <- shards[i]
|
|
}
|
|
samples = samples[m:]
|
|
}
|
|
samplesPool.Put(d)
|
|
case []record.RefMmapMarker:
|
|
markers := v
|
|
for _, rm := range markers {
|
|
seq, off := rm.MmapRef.Unpack()
|
|
if seq > lastSeq || (seq == lastSeq && off > lastOff) {
|
|
// This m-map chunk from markers was not present during
|
|
// the load of mmapped chunks that happened in the head
|
|
// initialization.
|
|
continue
|
|
}
|
|
|
|
if r, ok := multiRef[rm.Ref]; ok {
|
|
rm.Ref = r
|
|
}
|
|
|
|
ms := h.series.getByID(rm.Ref)
|
|
if ms == nil {
|
|
mmapMarkerUnknownRefs.Inc()
|
|
continue
|
|
}
|
|
idx := uint64(ms.ref) % uint64(concurrency)
|
|
// It is possible that some old sample is being processed in processWALSamples that
|
|
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
|
|
// processing all old samples after emptying the buffer.
|
|
processors[idx].waitUntilIdle()
|
|
// Lock the subset so we can modify the series object
|
|
processors[idx].mx.Lock()
|
|
|
|
// All samples till now have been m-mapped. Hence clear out the headChunk.
|
|
// In case some samples slipped through and went into m-map chunks because of changed
|
|
// chunk size parameters, we are not taking care of that here.
|
|
// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
|
|
// the size of ooo chunk was reduced between restart.
|
|
if ms.ooo != nil {
|
|
ms.ooo.oooHeadChunk = nil
|
|
}
|
|
|
|
processors[idx].mx.Unlock()
|
|
}
|
|
default:
|
|
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
|
|
}
|
|
}
|
|
|
|
if decodeErr != nil {
|
|
return decodeErr
|
|
}
|
|
|
|
// Signal termination to each worker and wait for it to close its output channel.
|
|
for i := 0; i < concurrency; i++ {
|
|
processors[i].closeAndDrain()
|
|
}
|
|
wg.Wait()
|
|
|
|
if r.Err() != nil {
|
|
return errors.Wrap(r.Err(), "read records")
|
|
}
|
|
|
|
if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
|
|
level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type errLoadWbl struct {
|
|
err error
|
|
}
|
|
|
|
func (e errLoadWbl) Error() string {
|
|
return e.err.Error()
|
|
}
|
|
|
|
// To support errors.Cause().
|
|
func (e errLoadWbl) Cause() error {
|
|
return e.err
|
|
}
|
|
|
|
// To support errors.Unwrap().
|
|
func (e errLoadWbl) Unwrap() error {
|
|
return e.err
|
|
}
|
|
|
|
// isErrLoadOOOWal returns a boolean if the error is errLoadWbl.
|
|
func isErrLoadOOOWal(err error) bool {
|
|
_, ok := err.(*errLoadWbl)
|
|
return ok
|
|
}
|
|
|
|
type wblSubsetProcessor struct {
|
|
mx sync.Mutex // Take this lock while modifying series in the subset.
|
|
input chan []record.RefSample
|
|
output chan []record.RefSample
|
|
}
|
|
|
|
func (wp *wblSubsetProcessor) setup() {
|
|
wp.output = make(chan []record.RefSample, 300)
|
|
wp.input = make(chan []record.RefSample, 300)
|
|
}
|
|
|
|
func (wp *wblSubsetProcessor) closeAndDrain() {
|
|
close(wp.input)
|
|
for range wp.output {
|
|
}
|
|
}
|
|
|
|
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
|
|
func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample {
|
|
select {
|
|
case buf := <-wp.output:
|
|
return buf[:0]
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processWBLSamples adds the samples it receives to the head and passes
|
|
// the buffer received to an output channel for reuse.
|
|
// Samples before the minValidTime timestamp are discarded.
|
|
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
|
|
defer close(wp.output)
|
|
|
|
oooCapMax := h.opts.OutOfOrderCapMax.Load()
|
|
// We don't check for minValidTime for ooo samples.
|
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
|
for samples := range wp.input {
|
|
wp.mx.Lock()
|
|
for _, s := range samples {
|
|
ms := h.series.getByID(s.Ref)
|
|
if ms == nil {
|
|
unknownRefs++
|
|
continue
|
|
}
|
|
ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax)
|
|
if chunkCreated {
|
|
h.metrics.chunksCreated.Inc()
|
|
h.metrics.chunks.Inc()
|
|
}
|
|
if ok {
|
|
if s.T < mint {
|
|
mint = s.T
|
|
}
|
|
if s.T > maxt {
|
|
maxt = s.T
|
|
}
|
|
}
|
|
}
|
|
wp.mx.Unlock()
|
|
|
|
}
|
|
|
|
h.updateMinOOOMaxOOOTime(mint, maxt)
|
|
|
|
return unknownRefs
|
|
}
|
|
|
|
func (wp *wblSubsetProcessor) waitUntilIdle() {
|
|
select {
|
|
case <-wp.output: // Allow output side to drain to avoid deadlock.
|
|
default:
|
|
}
|
|
wp.input <- []record.RefSample{}
|
|
for len(wp.input) != 0 {
|
|
time.Sleep(10 * time.Microsecond)
|
|
select {
|
|
case <-wp.output: // Allow output side to drain to avoid deadlock.
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
const (
|
|
chunkSnapshotRecordTypeSeries uint8 = 1
|
|
chunkSnapshotRecordTypeTombstones uint8 = 2
|
|
chunkSnapshotRecordTypeExemplars uint8 = 3
|
|
)
|
|
|
|
type chunkSnapshotRecord struct {
|
|
ref chunks.HeadSeriesRef
|
|
lset labels.Labels
|
|
mc *memChunk
|
|
lastValue float64
|
|
lastHistogramValue *histogram.Histogram
|
|
lastFloatHistogramValue *histogram.FloatHistogram
|
|
}
|
|
|
|
func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
|
|
buf := encoding.Encbuf{B: b}
|
|
|
|
buf.PutByte(chunkSnapshotRecordTypeSeries)
|
|
buf.PutBE64(uint64(s.ref))
|
|
record.EncodeLabels(&buf, s.lset)
|
|
buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused.
|
|
|
|
s.Lock()
|
|
if s.headChunks == nil {
|
|
buf.PutUvarint(0)
|
|
} else {
|
|
enc := s.headChunks.chunk.Encoding()
|
|
buf.PutUvarint(1)
|
|
buf.PutBE64int64(s.headChunks.minTime)
|
|
buf.PutBE64int64(s.headChunks.maxTime)
|
|
buf.PutByte(byte(enc))
|
|
buf.PutUvarintBytes(s.headChunks.chunk.Bytes())
|
|
|
|
switch enc {
|
|
case chunkenc.EncXOR:
|
|
// Backwards compatibility for old sampleBuf which had last 4 samples.
|
|
for i := 0; i < 3; i++ {
|
|
buf.PutBE64int64(0)
|
|
buf.PutBEFloat64(0)
|
|
}
|
|
buf.PutBE64int64(0)
|
|
buf.PutBEFloat64(s.lastValue)
|
|
case chunkenc.EncHistogram:
|
|
record.EncodeHistogram(&buf, s.lastHistogramValue)
|
|
default: // chunkenc.FloatHistogram.
|
|
record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue)
|
|
}
|
|
}
|
|
s.Unlock()
|
|
|
|
return buf.Get()
|
|
}
|
|
|
|
func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapshotRecord, err error) {
|
|
dec := encoding.Decbuf{B: b}
|
|
|
|
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries {
|
|
return csr, errors.Errorf("invalid record type %x", flag)
|
|
}
|
|
|
|
csr.ref = chunks.HeadSeriesRef(dec.Be64())
|
|
// The label set written to the disk is already sorted.
|
|
// TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it.
|
|
csr.lset = d.DecodeLabels(&dec)
|
|
|
|
_ = dec.Be64int64() // Was chunkRange but now unused.
|
|
if dec.Uvarint() == 0 {
|
|
return
|
|
}
|
|
|
|
csr.mc = &memChunk{}
|
|
csr.mc.minTime = dec.Be64int64()
|
|
csr.mc.maxTime = dec.Be64int64()
|
|
enc := chunkenc.Encoding(dec.Byte())
|
|
|
|
// The underlying bytes gets re-used later, so make a copy.
|
|
chunkBytes := dec.UvarintBytes()
|
|
chunkBytesCopy := make([]byte, len(chunkBytes))
|
|
copy(chunkBytesCopy, chunkBytes)
|
|
|
|
chk, err := chunkenc.FromData(enc, chunkBytesCopy)
|
|
if err != nil {
|
|
return csr, errors.Wrap(err, "chunk from data")
|
|
}
|
|
csr.mc.chunk = chk
|
|
|
|
switch enc {
|
|
case chunkenc.EncXOR:
|
|
// Backwards-compatibility for old sampleBuf which had last 4 samples.
|
|
for i := 0; i < 3; i++ {
|
|
_ = dec.Be64int64()
|
|
_ = dec.Be64Float64()
|
|
}
|
|
_ = dec.Be64int64()
|
|
csr.lastValue = dec.Be64Float64()
|
|
case chunkenc.EncHistogram:
|
|
csr.lastHistogramValue = &histogram.Histogram{}
|
|
record.DecodeHistogram(&dec, csr.lastHistogramValue)
|
|
default: // chunkenc.FloatHistogram.
|
|
csr.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
|
record.DecodeFloatHistogram(&dec, csr.lastFloatHistogramValue)
|
|
}
|
|
|
|
err = dec.Err()
|
|
if err != nil && len(dec.B) > 0 {
|
|
err = errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) {
|
|
buf := encoding.Encbuf{}
|
|
|
|
buf.PutByte(chunkSnapshotRecordTypeTombstones)
|
|
b, err := tombstones.Encode(tr)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "encode tombstones")
|
|
}
|
|
buf.PutUvarintBytes(b)
|
|
|
|
return buf.Get(), nil
|
|
}
|
|
|
|
func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) {
|
|
dec := encoding.Decbuf{B: b}
|
|
|
|
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones {
|
|
return nil, errors.Errorf("invalid record type %x", flag)
|
|
}
|
|
|
|
tr, err := tombstones.Decode(dec.UvarintBytes())
|
|
return tr, errors.Wrap(err, "decode tombstones")
|
|
}
|
|
|
|
const chunkSnapshotPrefix = "chunk_snapshot."
|
|
|
|
// ChunkSnapshot creates a snapshot of all the series and tombstones in the head.
|
|
// It deletes the old chunk snapshots if the chunk snapshot creation is successful.
|
|
//
|
|
// The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written
|
|
// using the WAL package. N is the last WAL segment present during snapshotting and
|
|
// M is the offset in segment N upto which data was written.
|
|
//
|
|
// The snapshot first contains all series (each in individual records and not sorted), followed by
|
|
// tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they
|
|
// were written to the circular buffer.
|
|
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
|
|
if h.wal == nil {
|
|
// If we are not storing any WAL, does not make sense to take a snapshot too.
|
|
level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled")
|
|
return &ChunkSnapshotStats{}, nil
|
|
}
|
|
h.chunkSnapshotMtx.Lock()
|
|
defer h.chunkSnapshotMtx.Unlock()
|
|
|
|
stats := &ChunkSnapshotStats{}
|
|
|
|
wlast, woffset, err := h.wal.LastSegmentAndOffset()
|
|
if err != nil && err != record.ErrNotFound {
|
|
return stats, errors.Wrap(err, "get last wal segment and offset")
|
|
}
|
|
|
|
_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
|
|
if err != nil && err != record.ErrNotFound {
|
|
return stats, errors.Wrap(err, "find last chunk snapshot")
|
|
}
|
|
|
|
if wlast == cslast && woffset == csoffset {
|
|
// Nothing has been written to the WAL/Head since the last snapshot.
|
|
return stats, nil
|
|
}
|
|
|
|
snapshotName := chunkSnapshotDir(wlast, woffset)
|
|
|
|
cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName)
|
|
cpdirtmp := cpdir + ".tmp"
|
|
stats.Dir = cpdir
|
|
|
|
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
|
|
return stats, errors.Wrap(err, "create chunk snapshot dir")
|
|
}
|
|
cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionType())
|
|
if err != nil {
|
|
return stats, errors.Wrap(err, "open chunk snapshot")
|
|
}
|
|
|
|
// Ensures that an early return caused by an error doesn't leave any tmp files.
|
|
defer func() {
|
|
cp.Close()
|
|
os.RemoveAll(cpdirtmp)
|
|
}()
|
|
|
|
var (
|
|
buf []byte
|
|
recs [][]byte
|
|
)
|
|
// Add all series to the snapshot.
|
|
stripeSize := h.series.size
|
|
for i := 0; i < stripeSize; i++ {
|
|
h.series.locks[i].RLock()
|
|
|
|
for _, s := range h.series.series[i] {
|
|
start := len(buf)
|
|
buf = s.encodeToSnapshotRecord(buf)
|
|
if len(buf[start:]) == 0 {
|
|
continue // All contents discarded.
|
|
}
|
|
recs = append(recs, buf[start:])
|
|
// Flush records in 10 MB increments.
|
|
if len(buf) > 10*1024*1024 {
|
|
if err := cp.Log(recs...); err != nil {
|
|
h.series.locks[i].RUnlock()
|
|
return stats, errors.Wrap(err, "flush records")
|
|
}
|
|
buf, recs = buf[:0], recs[:0]
|
|
}
|
|
}
|
|
stats.TotalSeries += len(h.series.series[i])
|
|
|
|
h.series.locks[i].RUnlock()
|
|
}
|
|
|
|
// Add tombstones to the snapshot.
|
|
tombstonesReader, err := h.Tombstones()
|
|
if err != nil {
|
|
return stats, errors.Wrap(err, "get tombstones")
|
|
}
|
|
rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader)
|
|
if err != nil {
|
|
return stats, errors.Wrap(err, "encode tombstones")
|
|
}
|
|
recs = append(recs, rec)
|
|
// Flush remaining series records and tombstones.
|
|
if err := cp.Log(recs...); err != nil {
|
|
return stats, errors.Wrap(err, "flush records")
|
|
}
|
|
buf = buf[:0]
|
|
|
|
// Add exemplars in the snapshot.
|
|
// We log in batches, with each record having upto 10000 exemplars.
|
|
// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB.
|
|
maxExemplarsPerRecord := 10000
|
|
batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord)
|
|
enc := record.Encoder{}
|
|
flushExemplars := func() error {
|
|
if len(batch) == 0 {
|
|
return nil
|
|
}
|
|
buf = buf[:0]
|
|
encbuf := encoding.Encbuf{B: buf}
|
|
encbuf.PutByte(chunkSnapshotRecordTypeExemplars)
|
|
enc.EncodeExemplarsIntoBuffer(batch, &encbuf)
|
|
if err := cp.Log(encbuf.Get()); err != nil {
|
|
return errors.Wrap(err, "log exemplars")
|
|
}
|
|
buf, batch = buf[:0], batch[:0]
|
|
return nil
|
|
}
|
|
err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error {
|
|
if len(batch) >= maxExemplarsPerRecord {
|
|
if err := flushExemplars(); err != nil {
|
|
return errors.Wrap(err, "flush exemplars")
|
|
}
|
|
}
|
|
|
|
ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels)
|
|
if ms == nil {
|
|
// It is possible that exemplar refers to some old series. We discard such exemplars.
|
|
return nil
|
|
}
|
|
batch = append(batch, record.RefExemplar{
|
|
Ref: ms.ref,
|
|
T: e.Ts,
|
|
V: e.Value,
|
|
Labels: e.Labels,
|
|
})
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return stats, errors.Wrap(err, "iterate exemplars")
|
|
}
|
|
|
|
// Flush remaining exemplars.
|
|
if err := flushExemplars(); err != nil {
|
|
return stats, errors.Wrap(err, "flush exemplars at the end")
|
|
}
|
|
|
|
if err := cp.Close(); err != nil {
|
|
return stats, errors.Wrap(err, "close chunk snapshot")
|
|
}
|
|
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
|
|
return stats, errors.Wrap(err, "rename chunk snapshot directory")
|
|
}
|
|
|
|
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil {
|
|
// Leftover old chunk snapshots do not cause problems down the line beyond
|
|
// occupying disk space.
|
|
// They will just be ignored since a higher chunk snapshot exists.
|
|
level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err)
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
func chunkSnapshotDir(wlast, woffset int) string {
|
|
return fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset)
|
|
}
|
|
|
|
func (h *Head) performChunkSnapshot() error {
|
|
level.Info(h.logger).Log("msg", "creating chunk snapshot")
|
|
startTime := time.Now()
|
|
stats, err := h.ChunkSnapshot()
|
|
elapsed := time.Since(startTime)
|
|
if err == nil {
|
|
level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir)
|
|
}
|
|
return errors.Wrap(err, "chunk snapshot")
|
|
}
|
|
|
|
// ChunkSnapshotStats returns stats about a created chunk snapshot.
|
|
type ChunkSnapshotStats struct {
|
|
TotalSeries int
|
|
Dir string
|
|
}
|
|
|
|
// LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot.
|
|
// If dir does not contain any chunk snapshots, ErrNotFound is returned.
|
|
func LastChunkSnapshot(dir string) (string, int, int, error) {
|
|
files, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return "", 0, 0, err
|
|
}
|
|
maxIdx, maxOffset := -1, -1
|
|
maxFileName := ""
|
|
for i := 0; i < len(files); i++ {
|
|
fi := files[i]
|
|
|
|
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
|
|
continue
|
|
}
|
|
if !fi.IsDir() {
|
|
return "", 0, 0, errors.Errorf("chunk snapshot %s is not a directory", fi.Name())
|
|
}
|
|
|
|
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
|
|
if len(splits) != 2 {
|
|
// Chunk snapshots is not in the right format, we do not care about it.
|
|
continue
|
|
}
|
|
|
|
idx, err := strconv.Atoi(splits[0])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
offset, err := strconv.Atoi(splits[1])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if idx > maxIdx || (idx == maxIdx && offset > maxOffset) {
|
|
maxIdx, maxOffset = idx, offset
|
|
maxFileName = filepath.Join(dir, fi.Name())
|
|
}
|
|
}
|
|
if maxFileName == "" {
|
|
return "", 0, 0, record.ErrNotFound
|
|
}
|
|
return maxFileName, maxIdx, maxOffset, nil
|
|
}
|
|
|
|
// DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
|
|
func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
|
|
files, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, fi := range files {
|
|
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
|
|
continue
|
|
}
|
|
|
|
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
|
|
if len(splits) != 2 {
|
|
continue
|
|
}
|
|
|
|
idx, err := strconv.Atoi(splits[0])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
offset, err := strconv.Atoi(splits[1])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if idx < maxIndex || (idx == maxIndex && offset < maxOffset) {
|
|
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
|
|
errs.Add(err)
|
|
}
|
|
}
|
|
|
|
}
|
|
return errs.Err()
|
|
}
|
|
|
|
// loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned,
|
|
// it is the responsibility of the caller to clear the contents of the Head.
|
|
func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSeries, error) {
|
|
dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
|
|
if err != nil {
|
|
if err == record.ErrNotFound {
|
|
return snapIdx, snapOffset, nil, nil
|
|
}
|
|
return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot")
|
|
}
|
|
|
|
start := time.Now()
|
|
sr, err := wlog.NewSegmentsReader(dir)
|
|
if err != nil {
|
|
return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
|
|
}
|
|
defer func() {
|
|
if err := sr.Close(); err != nil {
|
|
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
|
}
|
|
}()
|
|
|
|
var (
|
|
numSeries = 0
|
|
unknownRefs = int64(0)
|
|
concurrency = h.opts.WALReplayConcurrency
|
|
wg sync.WaitGroup
|
|
recordChan = make(chan chunkSnapshotRecord, 5*concurrency)
|
|
shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, concurrency)
|
|
errChan = make(chan error, concurrency)
|
|
refSeries map[chunks.HeadSeriesRef]*memSeries
|
|
exemplarBuf []record.RefExemplar
|
|
dec record.Decoder
|
|
)
|
|
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
go func(idx int, rc <-chan chunkSnapshotRecord) {
|
|
defer wg.Done()
|
|
defer func() {
|
|
// If there was an error, drain the channel
|
|
// to unblock the main thread.
|
|
for range rc {
|
|
}
|
|
}()
|
|
|
|
shardedRefSeries[idx] = make(map[chunks.HeadSeriesRef]*memSeries)
|
|
localRefSeries := shardedRefSeries[idx]
|
|
|
|
for csr := range rc {
|
|
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
localRefSeries[csr.ref] = series
|
|
for {
|
|
seriesID := uint64(series.ref)
|
|
lastSeriesID := h.lastSeriesID.Load()
|
|
if lastSeriesID >= seriesID || h.lastSeriesID.CompareAndSwap(lastSeriesID, seriesID) {
|
|
break
|
|
}
|
|
}
|
|
|
|
if csr.mc == nil {
|
|
continue
|
|
}
|
|
series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
|
|
series.headChunks = csr.mc
|
|
series.lastValue = csr.lastValue
|
|
series.lastHistogramValue = csr.lastHistogramValue
|
|
series.lastFloatHistogramValue = csr.lastFloatHistogramValue
|
|
|
|
app, err := series.headChunks.chunk.Appender()
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
series.app = app
|
|
|
|
h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime)
|
|
}
|
|
}(i, recordChan)
|
|
}
|
|
|
|
r := wlog.NewReader(sr)
|
|
var loopErr error
|
|
Outer:
|
|
for r.Next() {
|
|
select {
|
|
case err := <-errChan:
|
|
errChan <- err
|
|
break Outer
|
|
default:
|
|
}
|
|
|
|
rec := r.Record()
|
|
switch rec[0] {
|
|
case chunkSnapshotRecordTypeSeries:
|
|
numSeries++
|
|
csr, err := decodeSeriesFromChunkSnapshot(&dec, rec)
|
|
if err != nil {
|
|
loopErr = errors.Wrap(err, "decode series record")
|
|
break Outer
|
|
}
|
|
recordChan <- csr
|
|
|
|
case chunkSnapshotRecordTypeTombstones:
|
|
tr, err := decodeTombstonesSnapshotRecord(rec)
|
|
if err != nil {
|
|
loopErr = errors.Wrap(err, "decode tombstones")
|
|
break Outer
|
|
}
|
|
|
|
if err = tr.Iter(func(ref storage.SeriesRef, ivs tombstones.Intervals) error {
|
|
h.tombstones.AddInterval(ref, ivs...)
|
|
return nil
|
|
}); err != nil {
|
|
loopErr = errors.Wrap(err, "iterate tombstones")
|
|
break Outer
|
|
}
|
|
|
|
case chunkSnapshotRecordTypeExemplars:
|
|
// Exemplars are at the end of snapshot. So all series are loaded at this point.
|
|
if len(refSeries) == 0 {
|
|
close(recordChan)
|
|
wg.Wait()
|
|
|
|
refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries)
|
|
for _, shard := range shardedRefSeries {
|
|
for k, v := range shard {
|
|
refSeries[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
if !h.opts.EnableExemplarStorage || h.opts.MaxExemplars.Load() <= 0 {
|
|
// Exemplar storage is disabled.
|
|
continue Outer
|
|
}
|
|
|
|
decbuf := encoding.Decbuf{B: rec[1:]}
|
|
|
|
exemplarBuf = exemplarBuf[:0]
|
|
exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf)
|
|
if err != nil {
|
|
loopErr = errors.Wrap(err, "exemplars from buffer")
|
|
break Outer
|
|
}
|
|
|
|
for _, e := range exemplarBuf {
|
|
ms, ok := refSeries[e.Ref]
|
|
if !ok {
|
|
unknownRefs++
|
|
continue
|
|
}
|
|
|
|
if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{
|
|
Labels: e.Labels,
|
|
Value: e.V,
|
|
Ts: e.T,
|
|
}); err != nil {
|
|
loopErr = errors.Wrap(err, "add exemplar")
|
|
break Outer
|
|
}
|
|
}
|
|
|
|
default:
|
|
// This is a record type we don't understand. It is either and old format from earlier versions,
|
|
// or a new format and the code was rolled back to old version.
|
|
loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0])
|
|
break Outer
|
|
}
|
|
}
|
|
if len(refSeries) == 0 {
|
|
close(recordChan)
|
|
wg.Wait()
|
|
}
|
|
|
|
close(errChan)
|
|
merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop"))
|
|
for err := range errChan {
|
|
merr.Add(errors.Wrap(err, "record processing"))
|
|
}
|
|
if err := merr.Err(); err != nil {
|
|
return -1, -1, nil, err
|
|
}
|
|
|
|
if r.Err() != nil {
|
|
return -1, -1, nil, errors.Wrap(r.Err(), "read records")
|
|
}
|
|
|
|
if len(refSeries) == 0 {
|
|
// We had no exemplar record, so we have to build the map here.
|
|
refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries)
|
|
for _, shard := range shardedRefSeries {
|
|
for k, v := range shard {
|
|
refSeries[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
elapsed := time.Since(start)
|
|
level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String())
|
|
if unknownRefs > 0 {
|
|
level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs)
|
|
}
|
|
|
|
return snapIdx, snapOffset, refSeries, nil
|
|
}
|