mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
6dcbd653e9
This avoids situations where metrics are scraped before the data they are trying to look at is initialized. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
2223 lines
72 KiB
Go
2223 lines
72 KiB
Go
// Copyright 2017 The Prometheus Authors
|
||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
// you may not use this file except in compliance with the License.
|
||
// You may obtain a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
// See the License for the specific language governing permissions and
|
||
// limitations under the License.
|
||
|
||
package tsdb
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"math"
|
||
"path/filepath"
|
||
"runtime"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/go-kit/log"
|
||
"github.com/go-kit/log/level"
|
||
"github.com/oklog/ulid"
|
||
"github.com/pkg/errors"
|
||
"go.uber.org/atomic"
|
||
|
||
"github.com/prometheus/client_golang/prometheus"
|
||
|
||
"github.com/prometheus/prometheus/config"
|
||
"github.com/prometheus/prometheus/model/exemplar"
|
||
"github.com/prometheus/prometheus/model/histogram"
|
||
"github.com/prometheus/prometheus/model/labels"
|
||
"github.com/prometheus/prometheus/model/metadata"
|
||
"github.com/prometheus/prometheus/storage"
|
||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||
"github.com/prometheus/prometheus/tsdb/index"
|
||
"github.com/prometheus/prometheus/tsdb/record"
|
||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||
"github.com/prometheus/prometheus/util/zeropool"
|
||
)
|
||
|
||
var (
|
||
// ErrInvalidSample is returned if an appended sample is not valid and can't
|
||
// be ingested.
|
||
ErrInvalidSample = errors.New("invalid sample")
|
||
// ErrInvalidExemplar is returned if an appended exemplar is not valid and can't
|
||
// be ingested.
|
||
ErrInvalidExemplar = errors.New("invalid exemplar")
|
||
// ErrAppenderClosed is returned if an appender has already be successfully
|
||
// rolled back or committed.
|
||
ErrAppenderClosed = errors.New("appender closed")
|
||
|
||
// defaultIsolationDisabled is true if isolation is disabled by default.
|
||
defaultIsolationDisabled = false
|
||
|
||
defaultWALReplayConcurrency = runtime.GOMAXPROCS(0)
|
||
)
|
||
|
||
// Head handles reads and writes of time series data within a time window.
|
||
type Head struct {
|
||
chunkRange atomic.Int64
|
||
numSeries atomic.Uint64
|
||
minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection.
|
||
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked.
|
||
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
|
||
lastWALTruncationTime atomic.Int64
|
||
lastMemoryTruncationTime atomic.Int64
|
||
lastSeriesID atomic.Uint64
|
||
// All the ooo m-map chunks should be after this. This is used to truncate old ooo m-map chunks.
|
||
// This should be typecasted to chunks.ChunkDiskMapperRef after loading.
|
||
minOOOMmapRef atomic.Uint64
|
||
|
||
metrics *headMetrics
|
||
opts *HeadOptions
|
||
wal, wbl *wlog.WL
|
||
exemplarMetrics *ExemplarMetrics
|
||
exemplars ExemplarStorage
|
||
logger log.Logger
|
||
appendPool zeropool.Pool[[]record.RefSample]
|
||
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||
seriesPool zeropool.Pool[[]*memSeries]
|
||
bytesPool zeropool.Pool[[]byte]
|
||
memChunkPool sync.Pool
|
||
|
||
// All series addressable by their ID or hash.
|
||
series *stripeSeries
|
||
|
||
deletedMtx sync.Mutex
|
||
deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until.
|
||
|
||
// TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings.
|
||
postings *index.MemPostings // Postings lists for terms.
|
||
|
||
tombstones *tombstones.MemTombstones
|
||
|
||
iso *isolation
|
||
|
||
cardinalityMutex sync.Mutex
|
||
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec.
|
||
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
|
||
|
||
// chunkDiskMapper is used to write and read Head chunks to/from disk.
|
||
chunkDiskMapper *chunks.ChunkDiskMapper
|
||
|
||
chunkSnapshotMtx sync.Mutex
|
||
|
||
closedMtx sync.Mutex
|
||
closed bool
|
||
|
||
stats *HeadStats
|
||
reg prometheus.Registerer
|
||
|
||
writeNotified wlog.WriteNotified
|
||
|
||
memTruncationInProcess atomic.Bool
|
||
}
|
||
|
||
type ExemplarStorage interface {
|
||
storage.ExemplarQueryable
|
||
AddExemplar(labels.Labels, exemplar.Exemplar) error
|
||
ValidateExemplar(labels.Labels, exemplar.Exemplar) error
|
||
IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
|
||
}
|
||
|
||
// HeadOptions are parameters for the Head block.
|
||
type HeadOptions struct {
|
||
// Runtime reloadable option. At the top of the struct for 32 bit OS:
|
||
// https://pkg.go.dev/sync/atomic#pkg-note-BUG
|
||
MaxExemplars atomic.Int64
|
||
|
||
OutOfOrderTimeWindow atomic.Int64
|
||
OutOfOrderCapMax atomic.Int64
|
||
|
||
// EnableNativeHistograms enables the ingestion of native histograms.
|
||
EnableNativeHistograms atomic.Bool
|
||
|
||
ChunkRange int64
|
||
// ChunkDirRoot is the parent directory of the chunks directory.
|
||
ChunkDirRoot string
|
||
ChunkPool chunkenc.Pool
|
||
ChunkWriteBufferSize int
|
||
ChunkWriteQueueSize int
|
||
|
||
SamplesPerChunk int
|
||
|
||
// StripeSize sets the number of entries in the hash map, it must be a power of 2.
|
||
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
|
||
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
|
||
StripeSize int
|
||
SeriesCallback SeriesLifecycleCallback
|
||
EnableExemplarStorage bool
|
||
EnableMemorySnapshotOnShutdown bool
|
||
|
||
IsolationDisabled bool
|
||
|
||
// Maximum number of CPUs that can simultaneously processes WAL replay.
|
||
// The default value is GOMAXPROCS.
|
||
// If it is set to a negative value or zero, the default value is used.
|
||
WALReplayConcurrency int
|
||
}
|
||
|
||
const (
|
||
// DefaultOutOfOrderCapMax is the default maximum size of an in-memory out-of-order chunk.
|
||
DefaultOutOfOrderCapMax int64 = 32
|
||
// DefaultSamplesPerChunk provides a default target number of samples per chunk.
|
||
DefaultSamplesPerChunk = 120
|
||
)
|
||
|
||
func DefaultHeadOptions() *HeadOptions {
|
||
ho := &HeadOptions{
|
||
ChunkRange: DefaultBlockDuration,
|
||
ChunkDirRoot: "",
|
||
ChunkPool: chunkenc.NewPool(),
|
||
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
|
||
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
|
||
SamplesPerChunk: DefaultSamplesPerChunk,
|
||
StripeSize: DefaultStripeSize,
|
||
SeriesCallback: &noopSeriesLifecycleCallback{},
|
||
IsolationDisabled: defaultIsolationDisabled,
|
||
WALReplayConcurrency: defaultWALReplayConcurrency,
|
||
}
|
||
ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax)
|
||
return ho
|
||
}
|
||
|
||
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
|
||
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
|
||
// All the callbacks should be safe to be called concurrently.
|
||
// It is up to the user to implement soft or hard consistency by making the callbacks
|
||
// atomic or non-atomic. Atomic callbacks can cause degradation performance.
|
||
type SeriesLifecycleCallback interface {
|
||
// PreCreation is called before creating a series to indicate if the series can be created.
|
||
// A non nil error means the series should not be created.
|
||
PreCreation(labels.Labels) error
|
||
// PostCreation is called after creating a series to indicate a creation of series.
|
||
PostCreation(labels.Labels)
|
||
// PostDeletion is called after deletion of series.
|
||
PostDeletion(map[chunks.HeadSeriesRef]labels.Labels)
|
||
}
|
||
|
||
// NewHead opens the head block in dir.
|
||
func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *HeadOptions, stats *HeadStats) (*Head, error) {
|
||
var err error
|
||
if l == nil {
|
||
l = log.NewNopLogger()
|
||
}
|
||
|
||
if opts.OutOfOrderTimeWindow.Load() < 0 {
|
||
opts.OutOfOrderTimeWindow.Store(0)
|
||
}
|
||
|
||
// Time window can be set on runtime. So the capMin and capMax should be valid
|
||
// even if ooo is not enabled yet.
|
||
capMax := opts.OutOfOrderCapMax.Load()
|
||
if capMax <= 0 || capMax > 255 {
|
||
return nil, errors.Errorf("OOOCapMax of %d is invalid. must be > 0 and <= 255", capMax)
|
||
}
|
||
|
||
if opts.ChunkRange < 1 {
|
||
return nil, errors.Errorf("invalid chunk range %d", opts.ChunkRange)
|
||
}
|
||
if opts.SeriesCallback == nil {
|
||
opts.SeriesCallback = &noopSeriesLifecycleCallback{}
|
||
}
|
||
|
||
if stats == nil {
|
||
stats = NewHeadStats()
|
||
}
|
||
|
||
if !opts.EnableExemplarStorage {
|
||
opts.MaxExemplars.Store(0)
|
||
}
|
||
|
||
h := &Head{
|
||
wal: wal,
|
||
wbl: wbl,
|
||
logger: l,
|
||
opts: opts,
|
||
memChunkPool: sync.Pool{
|
||
New: func() interface{} {
|
||
return &memChunk{}
|
||
},
|
||
},
|
||
stats: stats,
|
||
reg: r,
|
||
}
|
||
if err := h.resetInMemoryState(); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if opts.ChunkPool == nil {
|
||
opts.ChunkPool = chunkenc.NewPool()
|
||
}
|
||
|
||
if opts.WALReplayConcurrency <= 0 {
|
||
opts.WALReplayConcurrency = defaultWALReplayConcurrency
|
||
}
|
||
|
||
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
|
||
r,
|
||
mmappedChunksDir(opts.ChunkDirRoot),
|
||
opts.ChunkPool,
|
||
opts.ChunkWriteBufferSize,
|
||
opts.ChunkWriteQueueSize,
|
||
)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
h.metrics = newHeadMetrics(h, r)
|
||
|
||
return h, nil
|
||
}
|
||
|
||
func (h *Head) resetInMemoryState() error {
|
||
var err error
|
||
var em *ExemplarMetrics
|
||
if h.exemplars != nil {
|
||
ce, ok := h.exemplars.(*CircularExemplarStorage)
|
||
if ok {
|
||
em = ce.metrics
|
||
}
|
||
}
|
||
if em == nil {
|
||
em = NewExemplarMetrics(h.reg)
|
||
}
|
||
es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
h.iso = newIsolation(h.opts.IsolationDisabled)
|
||
|
||
h.exemplarMetrics = em
|
||
h.exemplars = es
|
||
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
|
||
h.postings = index.NewUnorderedMemPostings()
|
||
h.tombstones = tombstones.NewMemTombstones()
|
||
h.deleted = map[chunks.HeadSeriesRef]int{}
|
||
h.chunkRange.Store(h.opts.ChunkRange)
|
||
h.minTime.Store(math.MaxInt64)
|
||
h.maxTime.Store(math.MinInt64)
|
||
h.minOOOTime.Store(math.MaxInt64)
|
||
h.maxOOOTime.Store(math.MinInt64)
|
||
h.lastWALTruncationTime.Store(math.MinInt64)
|
||
h.lastMemoryTruncationTime.Store(math.MinInt64)
|
||
return nil
|
||
}
|
||
|
||
type headMetrics struct {
|
||
activeAppenders prometheus.Gauge
|
||
series prometheus.GaugeFunc
|
||
seriesCreated prometheus.Counter
|
||
seriesRemoved prometheus.Counter
|
||
seriesNotFound prometheus.Counter
|
||
chunks prometheus.Gauge
|
||
chunksCreated prometheus.Counter
|
||
chunksRemoved prometheus.Counter
|
||
gcDuration prometheus.Summary
|
||
samplesAppended *prometheus.CounterVec
|
||
outOfOrderSamplesAppended prometheus.Counter
|
||
outOfBoundSamples *prometheus.CounterVec
|
||
outOfOrderSamples *prometheus.CounterVec
|
||
tooOldSamples *prometheus.CounterVec
|
||
walTruncateDuration prometheus.Summary
|
||
walCorruptionsTotal prometheus.Counter
|
||
dataTotalReplayDuration prometheus.Gauge
|
||
headTruncateFail prometheus.Counter
|
||
headTruncateTotal prometheus.Counter
|
||
checkpointDeleteFail prometheus.Counter
|
||
checkpointDeleteTotal prometheus.Counter
|
||
checkpointCreationFail prometheus.Counter
|
||
checkpointCreationTotal prometheus.Counter
|
||
mmapChunkCorruptionTotal prometheus.Counter
|
||
snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1.
|
||
oooHistogram prometheus.Histogram
|
||
mmapChunksTotal prometheus.Counter
|
||
}
|
||
|
||
const (
|
||
sampleMetricTypeFloat = "float"
|
||
sampleMetricTypeHistogram = "histogram"
|
||
)
|
||
|
||
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
||
m := &headMetrics{
|
||
activeAppenders: prometheus.NewGauge(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_active_appenders",
|
||
Help: "Number of currently active appender transactions",
|
||
}),
|
||
series: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_series",
|
||
Help: "Total number of series in the head block.",
|
||
}, func() float64 {
|
||
return float64(h.NumSeries())
|
||
}),
|
||
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_series_created_total",
|
||
Help: "Total number of series created in the head",
|
||
}),
|
||
seriesRemoved: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_series_removed_total",
|
||
Help: "Total number of series removed in the head",
|
||
}),
|
||
seriesNotFound: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_series_not_found_total",
|
||
Help: "Total number of requests for series that were not found.",
|
||
}),
|
||
chunks: prometheus.NewGauge(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_chunks",
|
||
Help: "Total number of chunks in the head block.",
|
||
}),
|
||
chunksCreated: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_chunks_created_total",
|
||
Help: "Total number of chunks created in the head",
|
||
}),
|
||
chunksRemoved: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_chunks_removed_total",
|
||
Help: "Total number of chunks removed in the head",
|
||
}),
|
||
gcDuration: prometheus.NewSummary(prometheus.SummaryOpts{
|
||
Name: "prometheus_tsdb_head_gc_duration_seconds",
|
||
Help: "Runtime of garbage collection in the head block.",
|
||
}),
|
||
walTruncateDuration: prometheus.NewSummary(prometheus.SummaryOpts{
|
||
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
|
||
Help: "Duration of WAL truncation.",
|
||
}),
|
||
walCorruptionsTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_wal_corruptions_total",
|
||
Help: "Total number of WAL corruptions.",
|
||
}),
|
||
dataTotalReplayDuration: prometheus.NewGauge(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_data_replay_duration_seconds",
|
||
Help: "Time taken to replay the data on disk.",
|
||
}),
|
||
samplesAppended: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_samples_appended_total",
|
||
Help: "Total number of appended samples.",
|
||
}, []string{"type"}),
|
||
outOfOrderSamplesAppended: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_out_of_order_samples_appended_total",
|
||
Help: "Total number of appended out of order samples.",
|
||
}),
|
||
outOfBoundSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_out_of_bound_samples_total",
|
||
Help: "Total number of out of bound samples ingestion failed attempts with out of order support disabled.",
|
||
}, []string{"type"}),
|
||
outOfOrderSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_out_of_order_samples_total",
|
||
Help: "Total number of out of order samples ingestion failed attempts due to out of order being disabled.",
|
||
}, []string{"type"}),
|
||
tooOldSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_too_old_samples_total",
|
||
Help: "Total number of out of order samples ingestion failed attempts with out of support enabled, but sample outside of time window.",
|
||
}, []string{"type"}),
|
||
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_truncations_failed_total",
|
||
Help: "Total number of head truncations that failed.",
|
||
}),
|
||
headTruncateTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_truncations_total",
|
||
Help: "Total number of head truncations attempted.",
|
||
}),
|
||
checkpointDeleteFail: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
|
||
Help: "Total number of checkpoint deletions that failed.",
|
||
}),
|
||
checkpointDeleteTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_checkpoint_deletions_total",
|
||
Help: "Total number of checkpoint deletions attempted.",
|
||
}),
|
||
checkpointCreationFail: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
|
||
Help: "Total number of checkpoint creations that failed.",
|
||
}),
|
||
checkpointCreationTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_checkpoint_creations_total",
|
||
Help: "Total number of checkpoint creations attempted.",
|
||
}),
|
||
mmapChunkCorruptionTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_mmap_chunk_corruptions_total",
|
||
Help: "Total number of memory-mapped chunk corruptions.",
|
||
}),
|
||
snapshotReplayErrorTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_snapshot_replay_error_total",
|
||
Help: "Total number snapshot replays that failed.",
|
||
}),
|
||
oooHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{
|
||
Name: "prometheus_tsdb_sample_ooo_delta",
|
||
Help: "Delta in seconds by which a sample is considered out of order (reported regardless of OOO time window and whether sample is accepted or not).",
|
||
Buckets: []float64{
|
||
60 * 10, // 10 min
|
||
60 * 30, // 30 min
|
||
60 * 60, // 60 min
|
||
60 * 60 * 2, // 2h
|
||
60 * 60 * 3, // 3h
|
||
60 * 60 * 6, // 6h
|
||
60 * 60 * 12, // 12h
|
||
},
|
||
}),
|
||
mmapChunksTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_mmap_chunks_total",
|
||
Help: "Total number of chunks that were memory-mapped.",
|
||
}),
|
||
}
|
||
|
||
if r != nil {
|
||
r.MustRegister(
|
||
m.activeAppenders,
|
||
m.series,
|
||
m.chunks,
|
||
m.chunksCreated,
|
||
m.chunksRemoved,
|
||
m.seriesCreated,
|
||
m.seriesRemoved,
|
||
m.seriesNotFound,
|
||
m.gcDuration,
|
||
m.walTruncateDuration,
|
||
m.walCorruptionsTotal,
|
||
m.dataTotalReplayDuration,
|
||
m.samplesAppended,
|
||
m.outOfOrderSamplesAppended,
|
||
m.outOfBoundSamples,
|
||
m.outOfOrderSamples,
|
||
m.tooOldSamples,
|
||
m.headTruncateFail,
|
||
m.headTruncateTotal,
|
||
m.checkpointDeleteFail,
|
||
m.checkpointDeleteTotal,
|
||
m.checkpointCreationFail,
|
||
m.checkpointCreationTotal,
|
||
m.mmapChunksTotal,
|
||
m.mmapChunkCorruptionTotal,
|
||
m.snapshotReplayErrorTotal,
|
||
// Metrics bound to functions and not needed in tests
|
||
// can be created and registered on the spot.
|
||
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_max_time",
|
||
Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
|
||
}, func() float64 {
|
||
return float64(h.MaxTime())
|
||
}),
|
||
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_min_time",
|
||
Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
|
||
}, func() float64 {
|
||
return float64(h.MinTime())
|
||
}),
|
||
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_isolation_low_watermark",
|
||
Help: "The lowest TSDB append ID that is still referenced.",
|
||
}, func() float64 {
|
||
return float64(h.iso.lowWatermark())
|
||
}),
|
||
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_isolation_high_watermark",
|
||
Help: "The highest TSDB append ID that has been given out.",
|
||
}, func() float64 {
|
||
return float64(h.iso.lastAppendID())
|
||
}),
|
||
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_chunks_storage_size_bytes",
|
||
Help: "Size of the chunks_head directory.",
|
||
}, func() float64 {
|
||
val, err := h.chunkDiskMapper.Size()
|
||
if err != nil {
|
||
level.Error(h.logger).Log("msg", "Failed to calculate size of \"chunks_head\" dir",
|
||
"err", err.Error())
|
||
}
|
||
return float64(val)
|
||
}),
|
||
)
|
||
}
|
||
return m
|
||
}
|
||
|
||
func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") }
|
||
|
||
// HeadStats are the statistics for the head component of the DB.
|
||
type HeadStats struct {
|
||
WALReplayStatus *WALReplayStatus
|
||
}
|
||
|
||
// NewHeadStats returns a new HeadStats object.
|
||
func NewHeadStats() *HeadStats {
|
||
return &HeadStats{
|
||
WALReplayStatus: &WALReplayStatus{},
|
||
}
|
||
}
|
||
|
||
// WALReplayStatus contains status information about the WAL replay.
|
||
type WALReplayStatus struct {
|
||
sync.RWMutex
|
||
Min int
|
||
Max int
|
||
Current int
|
||
}
|
||
|
||
// GetWALReplayStatus returns the WAL replay status information.
|
||
func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus {
|
||
s.RLock()
|
||
defer s.RUnlock()
|
||
|
||
return WALReplayStatus{
|
||
Min: s.Min,
|
||
Max: s.Max,
|
||
Current: s.Current,
|
||
}
|
||
}
|
||
|
||
const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
|
||
|
||
// Init loads data from the write ahead log and prepares the head for writes.
|
||
// It should be called before using an appender so that it
|
||
// limits the ingested samples to the head min valid time.
|
||
func (h *Head) Init(minValidTime int64) error {
|
||
h.minValidTime.Store(minValidTime)
|
||
defer func() {
|
||
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
|
||
}()
|
||
defer h.gc() // After loading the wal remove the obsolete data from the head.
|
||
defer func() {
|
||
// Loading of m-mapped chunks and snapshot can make the mint of the Head
|
||
// to go below minValidTime.
|
||
if h.MinTime() < h.minValidTime.Load() {
|
||
h.minTime.Store(h.minValidTime.Load())
|
||
}
|
||
}()
|
||
|
||
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
|
||
start := time.Now()
|
||
|
||
snapIdx, snapOffset := -1, 0
|
||
refSeries := make(map[chunks.HeadSeriesRef]*memSeries)
|
||
|
||
snapshotLoaded := false
|
||
if h.opts.EnableMemorySnapshotOnShutdown {
|
||
level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot")
|
||
// If there are any WAL files, there should be at least one WAL file with an index that is current or newer
|
||
// than the snapshot index. If the WAL index is behind the snapshot index somehow, the snapshot is assumed
|
||
// to be outdated.
|
||
loadSnapshot := true
|
||
if h.wal != nil {
|
||
_, endAt, err := wlog.Segments(h.wal.Dir())
|
||
if err != nil {
|
||
return errors.Wrap(err, "finding WAL segments")
|
||
}
|
||
|
||
_, idx, _, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
|
||
if err != nil && err != record.ErrNotFound {
|
||
level.Error(h.logger).Log("msg", "Could not find last snapshot", "err", err)
|
||
}
|
||
|
||
if err == nil && endAt < idx {
|
||
loadSnapshot = false
|
||
level.Warn(h.logger).Log("msg", "Last WAL file is behind snapshot, removing snapshots")
|
||
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, math.MaxInt, math.MaxInt); err != nil {
|
||
level.Error(h.logger).Log("msg", "Error while deleting snapshot directories", "err", err)
|
||
}
|
||
}
|
||
}
|
||
if loadSnapshot {
|
||
var err error
|
||
snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
|
||
if err == nil {
|
||
snapshotLoaded = true
|
||
level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
|
||
}
|
||
if err != nil {
|
||
snapIdx, snapOffset = -1, 0
|
||
refSeries = make(map[chunks.HeadSeriesRef]*memSeries)
|
||
|
||
h.metrics.snapshotReplayErrorTotal.Inc()
|
||
level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err)
|
||
// We clear the partially loaded data to replay fresh from the WAL.
|
||
if err := h.resetInMemoryState(); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
mmapChunkReplayStart := time.Now()
|
||
var (
|
||
mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
|
||
oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
|
||
lastMmapRef chunks.ChunkDiskMapperRef
|
||
err error
|
||
)
|
||
if snapshotLoaded || h.wal != nil {
|
||
// If snapshot was not loaded and if there is no WAL, then m-map chunks will be discarded
|
||
// anyway. So we only load m-map chunks when it won't be discarded.
|
||
mmappedChunks, oooMmappedChunks, lastMmapRef, err = h.loadMmappedChunks(refSeries)
|
||
if err != nil {
|
||
// TODO(codesome): clear out all m-map chunks here for refSeries.
|
||
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err)
|
||
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
|
||
h.metrics.mmapChunkCorruptionTotal.Inc()
|
||
}
|
||
|
||
// Discard snapshot data since we need to replay the WAL for the missed m-map chunks data.
|
||
snapIdx, snapOffset = -1, 0
|
||
|
||
// If this fails, data will be recovered from WAL.
|
||
// Hence we wont lose any data (given WAL is not corrupt).
|
||
mmappedChunks, oooMmappedChunks, lastMmapRef, err = h.removeCorruptedMmappedChunks(err)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
|
||
}
|
||
|
||
if h.wal == nil {
|
||
level.Info(h.logger).Log("msg", "WAL not found")
|
||
return nil
|
||
}
|
||
|
||
level.Info(h.logger).Log("msg", "Replaying WAL, this may take a while")
|
||
|
||
checkpointReplayStart := time.Now()
|
||
// Backfill the checkpoint first if it exists.
|
||
dir, startFrom, err := wlog.LastCheckpoint(h.wal.Dir())
|
||
if err != nil && err != record.ErrNotFound {
|
||
return errors.Wrap(err, "find last checkpoint")
|
||
}
|
||
|
||
// Find the last segment.
|
||
_, endAt, e := wlog.Segments(h.wal.Dir())
|
||
if e != nil {
|
||
return errors.Wrap(e, "finding WAL segments")
|
||
}
|
||
|
||
h.startWALReplayStatus(startFrom, endAt)
|
||
|
||
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
|
||
if err == nil && startFrom >= snapIdx {
|
||
sr, err := wlog.NewSegmentsReader(dir)
|
||
if err != nil {
|
||
return errors.Wrap(err, "open checkpoint")
|
||
}
|
||
defer func() {
|
||
if err := sr.Close(); err != nil {
|
||
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
|
||
}
|
||
}()
|
||
|
||
// A corrupted checkpoint is a hard error for now and requires user
|
||
// intervention. There's likely little data that can be recovered anyway.
|
||
if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil {
|
||
return errors.Wrap(err, "backfill checkpoint")
|
||
}
|
||
h.updateWALReplayStatusRead(startFrom)
|
||
startFrom++
|
||
level.Info(h.logger).Log("msg", "WAL checkpoint loaded")
|
||
}
|
||
checkpointReplayDuration := time.Since(checkpointReplayStart)
|
||
|
||
walReplayStart := time.Now()
|
||
|
||
if snapIdx > startFrom {
|
||
startFrom = snapIdx
|
||
}
|
||
// Backfill segments from the most recent checkpoint onwards.
|
||
for i := startFrom; i <= endAt; i++ {
|
||
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wal.Dir(), i))
|
||
if err != nil {
|
||
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
|
||
}
|
||
|
||
offset := 0
|
||
if i == snapIdx {
|
||
offset = snapOffset
|
||
}
|
||
sr, err := wlog.NewSegmentBufReaderWithOffset(offset, s)
|
||
if errors.Is(err, io.EOF) {
|
||
// File does not exist.
|
||
continue
|
||
}
|
||
if err != nil {
|
||
return errors.Wrapf(err, "segment reader (offset=%d)", offset)
|
||
}
|
||
err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks)
|
||
if err := sr.Close(); err != nil {
|
||
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
|
||
}
|
||
if err != nil {
|
||
return err
|
||
}
|
||
level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", endAt)
|
||
h.updateWALReplayStatusRead(i)
|
||
}
|
||
walReplayDuration := time.Since(walReplayStart)
|
||
|
||
wblReplayStart := time.Now()
|
||
if h.wbl != nil {
|
||
// Replay OOO WAL.
|
||
startFrom, endAt, e = wlog.Segments(h.wbl.Dir())
|
||
if e != nil {
|
||
return errors.Wrap(e, "finding OOO WAL segments")
|
||
}
|
||
h.startWALReplayStatus(startFrom, endAt)
|
||
|
||
for i := startFrom; i <= endAt; i++ {
|
||
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i))
|
||
if err != nil {
|
||
return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))
|
||
}
|
||
|
||
sr := wlog.NewSegmentBufReader(s)
|
||
err = h.loadWBL(wlog.NewReader(sr), multiRef, lastMmapRef)
|
||
if err := sr.Close(); err != nil {
|
||
level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err)
|
||
}
|
||
if err != nil {
|
||
return err
|
||
}
|
||
level.Info(h.logger).Log("msg", "WBL segment loaded", "segment", i, "maxSegment", endAt)
|
||
h.updateWALReplayStatusRead(i)
|
||
}
|
||
}
|
||
|
||
wblReplayDuration := time.Since(wblReplayStart)
|
||
|
||
totalReplayDuration := time.Since(start)
|
||
h.metrics.dataTotalReplayDuration.Set(totalReplayDuration.Seconds())
|
||
level.Info(h.logger).Log(
|
||
"msg", "WAL replay completed",
|
||
"checkpoint_replay_duration", checkpointReplayDuration.String(),
|
||
"wal_replay_duration", walReplayDuration.String(),
|
||
"wbl_replay_duration", wblReplayDuration.String(),
|
||
"total_replay_duration", totalReplayDuration.String(),
|
||
)
|
||
|
||
return nil
|
||
}
|
||
|
||
func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) (map[chunks.HeadSeriesRef][]*mmappedChunk, map[chunks.HeadSeriesRef][]*mmappedChunk, chunks.ChunkDiskMapperRef, error) {
|
||
mmappedChunks := map[chunks.HeadSeriesRef][]*mmappedChunk{}
|
||
oooMmappedChunks := map[chunks.HeadSeriesRef][]*mmappedChunk{}
|
||
var lastRef, secondLastRef chunks.ChunkDiskMapperRef
|
||
if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error {
|
||
secondLastRef = lastRef
|
||
lastRef = chunkRef
|
||
if !isOOO && maxt < h.minValidTime.Load() {
|
||
return nil
|
||
}
|
||
|
||
// We ignore any chunk that doesn't have a valid encoding
|
||
if !chunkenc.IsValidEncoding(encoding) {
|
||
return nil
|
||
}
|
||
|
||
ms, ok := refSeries[seriesRef]
|
||
|
||
if isOOO {
|
||
if !ok {
|
||
oooMmappedChunks[seriesRef] = append(oooMmappedChunks[seriesRef], &mmappedChunk{
|
||
ref: chunkRef,
|
||
minTime: mint,
|
||
maxTime: maxt,
|
||
numSamples: numSamples,
|
||
})
|
||
return nil
|
||
}
|
||
|
||
h.metrics.chunks.Inc()
|
||
h.metrics.chunksCreated.Inc()
|
||
|
||
if ms.ooo == nil {
|
||
ms.ooo = &memSeriesOOOFields{}
|
||
}
|
||
|
||
ms.ooo.oooMmappedChunks = append(ms.ooo.oooMmappedChunks, &mmappedChunk{
|
||
ref: chunkRef,
|
||
minTime: mint,
|
||
maxTime: maxt,
|
||
numSamples: numSamples,
|
||
})
|
||
|
||
h.updateMinOOOMaxOOOTime(mint, maxt)
|
||
return nil
|
||
}
|
||
|
||
if !ok {
|
||
slice := mmappedChunks[seriesRef]
|
||
if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint {
|
||
h.metrics.mmapChunkCorruptionTotal.Inc()
|
||
return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]",
|
||
seriesRef, slice[len(slice)-1].minTime, slice[len(slice)-1].maxTime, mint, maxt)
|
||
}
|
||
slice = append(slice, &mmappedChunk{
|
||
ref: chunkRef,
|
||
minTime: mint,
|
||
maxTime: maxt,
|
||
numSamples: numSamples,
|
||
})
|
||
mmappedChunks[seriesRef] = slice
|
||
return nil
|
||
}
|
||
|
||
if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint {
|
||
h.metrics.mmapChunkCorruptionTotal.Inc()
|
||
return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]",
|
||
seriesRef, ms.mmappedChunks[len(ms.mmappedChunks)-1].minTime, ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime,
|
||
mint, maxt)
|
||
}
|
||
|
||
h.metrics.chunks.Inc()
|
||
h.metrics.chunksCreated.Inc()
|
||
ms.mmappedChunks = append(ms.mmappedChunks, &mmappedChunk{
|
||
ref: chunkRef,
|
||
minTime: mint,
|
||
maxTime: maxt,
|
||
numSamples: numSamples,
|
||
})
|
||
h.updateMinMaxTime(mint, maxt)
|
||
if ms.headChunks != nil && maxt >= ms.headChunks.minTime {
|
||
// The head chunk was completed and was m-mapped after taking the snapshot.
|
||
// Hence remove this chunk.
|
||
ms.nextAt = 0
|
||
ms.headChunks = nil
|
||
ms.app = nil
|
||
}
|
||
return nil
|
||
}); err != nil {
|
||
// secondLastRef because the lastRef caused an error.
|
||
return nil, nil, secondLastRef, errors.Wrap(err, "iterate on on-disk chunks")
|
||
}
|
||
return mmappedChunks, oooMmappedChunks, lastRef, nil
|
||
}
|
||
|
||
// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously
|
||
// loaded mmapped chunks.
|
||
func (h *Head) removeCorruptedMmappedChunks(err error) (map[chunks.HeadSeriesRef][]*mmappedChunk, map[chunks.HeadSeriesRef][]*mmappedChunk, chunks.ChunkDiskMapperRef, error) {
|
||
level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
|
||
// We never want to preserve the in-memory series from snapshots if we are repairing m-map chunks.
|
||
if err := h.resetInMemoryState(); err != nil {
|
||
return map[chunks.HeadSeriesRef][]*mmappedChunk{}, map[chunks.HeadSeriesRef][]*mmappedChunk{}, 0, err
|
||
}
|
||
|
||
level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
|
||
|
||
if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil {
|
||
level.Info(h.logger).Log("msg", "Deletion of corrupted mmap chunk files failed, discarding chunk files completely", "err", err)
|
||
if err := h.chunkDiskMapper.Truncate(math.MaxUint32); err != nil {
|
||
level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed", "err", err)
|
||
}
|
||
return map[chunks.HeadSeriesRef][]*mmappedChunk{}, map[chunks.HeadSeriesRef][]*mmappedChunk{}, 0, nil
|
||
}
|
||
|
||
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks")
|
||
mmappedChunks, oooMmappedChunks, lastRef, err := h.loadMmappedChunks(make(map[chunks.HeadSeriesRef]*memSeries))
|
||
if err != nil {
|
||
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err)
|
||
if err := h.chunkDiskMapper.Truncate(math.MaxUint32); err != nil {
|
||
level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed after failed loading", "err", err)
|
||
}
|
||
mmappedChunks = map[chunks.HeadSeriesRef][]*mmappedChunk{}
|
||
}
|
||
|
||
return mmappedChunks, oooMmappedChunks, lastRef, nil
|
||
}
|
||
|
||
func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL) {
|
||
oooTimeWindow := int64(0)
|
||
if cfg.StorageConfig.TSDBConfig != nil {
|
||
oooTimeWindow = cfg.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
|
||
}
|
||
if oooTimeWindow < 0 {
|
||
oooTimeWindow = 0
|
||
}
|
||
|
||
h.SetOutOfOrderTimeWindow(oooTimeWindow, wbl)
|
||
|
||
if !h.opts.EnableExemplarStorage {
|
||
return
|
||
}
|
||
|
||
// Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage
|
||
// to decide if it should pass exemplars along to its exemplar storage, so we
|
||
// need to update opts.MaxExemplars here.
|
||
prevSize := h.opts.MaxExemplars.Load()
|
||
h.opts.MaxExemplars.Store(cfg.StorageConfig.ExemplarsConfig.MaxExemplars)
|
||
newSize := h.opts.MaxExemplars.Load()
|
||
|
||
if prevSize == newSize {
|
||
return
|
||
}
|
||
|
||
migrated := h.exemplars.(*CircularExemplarStorage).Resize(newSize)
|
||
level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", newSize, "migrated", migrated)
|
||
}
|
||
|
||
// SetOutOfOrderTimeWindow updates the out of order related parameters.
|
||
// If the Head already has a WBL set, then the wbl will be ignored.
|
||
func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wlog.WL) {
|
||
if oooTimeWindow > 0 && h.wbl == nil {
|
||
h.wbl = wbl
|
||
}
|
||
|
||
h.opts.OutOfOrderTimeWindow.Store(oooTimeWindow)
|
||
}
|
||
|
||
// EnableNativeHistograms enables the native histogram feature.
|
||
func (h *Head) EnableNativeHistograms() {
|
||
h.opts.EnableNativeHistograms.Store(true)
|
||
}
|
||
|
||
// DisableNativeHistograms disables the native histogram feature.
|
||
func (h *Head) DisableNativeHistograms() {
|
||
h.opts.EnableNativeHistograms.Store(false)
|
||
}
|
||
|
||
// PostingsCardinalityStats returns highest cardinality stats by label and value names.
|
||
func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats {
|
||
h.cardinalityMutex.Lock()
|
||
defer h.cardinalityMutex.Unlock()
|
||
currentTime := time.Duration(time.Now().Unix()) * time.Second
|
||
seconds := currentTime - h.lastPostingsStatsCall
|
||
if seconds > cardinalityCacheExpirationTime {
|
||
h.cardinalityCache = nil
|
||
}
|
||
if h.cardinalityCache != nil {
|
||
return h.cardinalityCache
|
||
}
|
||
h.cardinalityCache = h.postings.Stats(statsByLabelName, limit)
|
||
h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second
|
||
|
||
return h.cardinalityCache
|
||
}
|
||
|
||
func (h *Head) updateMinMaxTime(mint, maxt int64) {
|
||
for {
|
||
lt := h.MinTime()
|
||
if mint >= lt {
|
||
break
|
||
}
|
||
if h.minTime.CompareAndSwap(lt, mint) {
|
||
break
|
||
}
|
||
}
|
||
for {
|
||
ht := h.MaxTime()
|
||
if maxt <= ht {
|
||
break
|
||
}
|
||
if h.maxTime.CompareAndSwap(ht, maxt) {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
func (h *Head) updateMinOOOMaxOOOTime(mint, maxt int64) {
|
||
for {
|
||
lt := h.MinOOOTime()
|
||
if mint >= lt {
|
||
break
|
||
}
|
||
if h.minOOOTime.CompareAndSwap(lt, mint) {
|
||
break
|
||
}
|
||
}
|
||
for {
|
||
ht := h.MaxOOOTime()
|
||
if maxt <= ht {
|
||
break
|
||
}
|
||
if h.maxOOOTime.CompareAndSwap(ht, maxt) {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// SetMinValidTime sets the minimum timestamp the head can ingest.
|
||
func (h *Head) SetMinValidTime(minValidTime int64) {
|
||
h.minValidTime.Store(minValidTime)
|
||
}
|
||
|
||
// Truncate removes old data before mint from the head and WAL.
|
||
func (h *Head) Truncate(mint int64) (err error) {
|
||
initialize := h.MinTime() == math.MaxInt64
|
||
if err := h.truncateMemory(mint); err != nil {
|
||
return err
|
||
}
|
||
if initialize {
|
||
return nil
|
||
}
|
||
return h.truncateWAL(mint)
|
||
}
|
||
|
||
// OverlapsClosedInterval returns true if the head overlaps [mint, maxt].
|
||
func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool {
|
||
return h.MinTime() <= maxt && mint <= h.MaxTime()
|
||
}
|
||
|
||
// truncateMemory removes old data before mint from the head.
|
||
func (h *Head) truncateMemory(mint int64) (err error) {
|
||
h.chunkSnapshotMtx.Lock()
|
||
defer h.chunkSnapshotMtx.Unlock()
|
||
|
||
defer func() {
|
||
if err != nil {
|
||
h.metrics.headTruncateFail.Inc()
|
||
}
|
||
}()
|
||
|
||
initialize := h.MinTime() == math.MaxInt64
|
||
|
||
if h.MinTime() >= mint && !initialize {
|
||
return nil
|
||
}
|
||
|
||
// The order of these two Store() should not be changed,
|
||
// i.e. truncation time is set before in-process boolean.
|
||
h.lastMemoryTruncationTime.Store(mint)
|
||
h.memTruncationInProcess.Store(true)
|
||
defer h.memTruncationInProcess.Store(false)
|
||
|
||
// We wait for pending queries to end that overlap with this truncation.
|
||
if !initialize {
|
||
h.WaitForPendingReadersInTimeRange(h.MinTime(), mint)
|
||
}
|
||
|
||
h.minTime.Store(mint)
|
||
h.minValidTime.Store(mint)
|
||
|
||
// Ensure that max time is at least as high as min time.
|
||
for h.MaxTime() < mint {
|
||
h.maxTime.CompareAndSwap(h.MaxTime(), mint)
|
||
}
|
||
|
||
// This was an initial call to Truncate after loading blocks on startup.
|
||
// We haven't read back the WAL yet, so do not attempt to truncate it.
|
||
if initialize {
|
||
return nil
|
||
}
|
||
|
||
h.metrics.headTruncateTotal.Inc()
|
||
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
|
||
}
|
||
|
||
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
||
// The query timeout limits the max wait time of this function implicitly.
|
||
// The mint is inclusive and maxt is the truncation time hence exclusive.
|
||
func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) {
|
||
maxt-- // Making it inclusive before checking overlaps.
|
||
overlaps := func() bool {
|
||
o := false
|
||
h.iso.TraverseOpenReads(func(s *isolationState) bool {
|
||
if s.mint <= maxt && mint <= s.maxt {
|
||
// Overlaps with the truncation range.
|
||
o = true
|
||
return false
|
||
}
|
||
return true
|
||
})
|
||
return o
|
||
}
|
||
for overlaps() {
|
||
time.Sleep(500 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
// WaitForAppendersOverlapping waits for appends overlapping maxt to finish.
|
||
func (h *Head) WaitForAppendersOverlapping(maxt int64) {
|
||
for maxt >= h.iso.lowestAppendTime() {
|
||
time.Sleep(500 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
// IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier
|
||
// has to be created. In the latter case, the method also returns the new mint to be used for creating the
|
||
// new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.
|
||
//
|
||
// NOTE: The querier should already be taken before calling this.
|
||
func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose, getNew bool, newMint int64) {
|
||
if !h.memTruncationInProcess.Load() {
|
||
return false, false, 0
|
||
}
|
||
// Head truncation is in process. It also means that the block that was
|
||
// created for this truncation range is also available.
|
||
// Check if we took a querier that overlaps with this truncation.
|
||
memTruncTime := h.lastMemoryTruncationTime.Load()
|
||
if querierMaxt < memTruncTime {
|
||
// Head compaction has happened and this time range is being truncated.
|
||
// This query doesn't overlap with the Head any longer.
|
||
// We should close this querier to avoid races and the data would be
|
||
// available with the blocks below.
|
||
// Cases:
|
||
// 1. |------truncation------|
|
||
// |---query---|
|
||
// 2. |------truncation------|
|
||
// |---query---|
|
||
return true, false, 0
|
||
}
|
||
if querierMint < memTruncTime {
|
||
// The truncation time is not same as head mint that we saw above but the
|
||
// query still overlaps with the Head.
|
||
// The truncation started after we got the querier. So it is not safe
|
||
// to use this querier and/or might block truncation. We should get
|
||
// a new querier for the new Head range while remaining will be available
|
||
// in the blocks below.
|
||
// Case:
|
||
// |------truncation------|
|
||
// |----query----|
|
||
// Turns into
|
||
// |------truncation------|
|
||
// |---qu---|
|
||
return true, true, memTruncTime
|
||
}
|
||
|
||
// Other case is this, which is a no-op
|
||
// |------truncation------|
|
||
// |---query---|
|
||
return false, false, 0
|
||
}
|
||
|
||
// truncateWAL removes old data before mint from the WAL.
|
||
func (h *Head) truncateWAL(mint int64) error {
|
||
h.chunkSnapshotMtx.Lock()
|
||
defer h.chunkSnapshotMtx.Unlock()
|
||
|
||
if h.wal == nil || mint <= h.lastWALTruncationTime.Load() {
|
||
return nil
|
||
}
|
||
start := time.Now()
|
||
h.lastWALTruncationTime.Store(mint)
|
||
|
||
first, last, err := wlog.Segments(h.wal.Dir())
|
||
if err != nil {
|
||
return errors.Wrap(err, "get segment range")
|
||
}
|
||
// Start a new segment, so low ingestion volume TSDB don't have more WAL than
|
||
// needed.
|
||
if _, err := h.wal.NextSegment(); err != nil {
|
||
return errors.Wrap(err, "next segment")
|
||
}
|
||
last-- // Never consider last segment for checkpoint.
|
||
if last < 0 {
|
||
return nil // no segments yet.
|
||
}
|
||
// The lower two thirds of segments should contain mostly obsolete samples.
|
||
// If we have less than two segments, it's not worth checkpointing yet.
|
||
// With the default 2h blocks, this will keeping up to around 3h worth
|
||
// of WAL segments.
|
||
last = first + (last-first)*2/3
|
||
if last <= first {
|
||
return nil
|
||
}
|
||
|
||
keep := func(id chunks.HeadSeriesRef) bool {
|
||
if h.series.getByID(id) != nil {
|
||
return true
|
||
}
|
||
h.deletedMtx.Lock()
|
||
keepUntil, ok := h.deleted[id]
|
||
h.deletedMtx.Unlock()
|
||
return ok && keepUntil > last
|
||
}
|
||
h.metrics.checkpointCreationTotal.Inc()
|
||
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil {
|
||
h.metrics.checkpointCreationFail.Inc()
|
||
if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok {
|
||
h.metrics.walCorruptionsTotal.Inc()
|
||
}
|
||
return errors.Wrap(err, "create checkpoint")
|
||
}
|
||
if err := h.wal.Truncate(last + 1); err != nil {
|
||
// If truncating fails, we'll just try again at the next checkpoint.
|
||
// Leftover segments will just be ignored in the future if there's a checkpoint
|
||
// that supersedes them.
|
||
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
|
||
}
|
||
|
||
// The checkpoint is written and segments before it is truncated, so we no
|
||
// longer need to track deleted series that are before it.
|
||
h.deletedMtx.Lock()
|
||
for ref, segment := range h.deleted {
|
||
if segment <= last {
|
||
delete(h.deleted, ref)
|
||
}
|
||
}
|
||
h.deletedMtx.Unlock()
|
||
|
||
h.metrics.checkpointDeleteTotal.Inc()
|
||
if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
|
||
// Leftover old checkpoints do not cause problems down the line beyond
|
||
// occupying disk space.
|
||
// They will just be ignored since a higher checkpoint exists.
|
||
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
|
||
h.metrics.checkpointDeleteFail.Inc()
|
||
}
|
||
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
|
||
|
||
level.Info(h.logger).Log("msg", "WAL checkpoint complete",
|
||
"first", first, "last", last, "duration", time.Since(start))
|
||
|
||
return nil
|
||
}
|
||
|
||
// truncateOOO
|
||
// - truncates the OOO WBL files whose index is strictly less than lastWBLFile.
|
||
// - garbage collects all the m-map chunks from the memory that are less than or equal to minOOOMmapRef
|
||
// and then deletes the series that do not have any data anymore.
|
||
func (h *Head) truncateOOO(lastWBLFile int, minOOOMmapRef chunks.ChunkDiskMapperRef) error {
|
||
curMinOOOMmapRef := chunks.ChunkDiskMapperRef(h.minOOOMmapRef.Load())
|
||
if minOOOMmapRef.GreaterThan(curMinOOOMmapRef) {
|
||
h.minOOOMmapRef.Store(uint64(minOOOMmapRef))
|
||
if err := h.truncateSeriesAndChunkDiskMapper("truncateOOO"); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
if h.wbl == nil {
|
||
return nil
|
||
}
|
||
|
||
return h.wbl.Truncate(lastWBLFile)
|
||
}
|
||
|
||
// truncateSeriesAndChunkDiskMapper is a helper function for truncateMemory and truncateOOO.
|
||
// It runs GC on the Head and truncates the ChunkDiskMapper accordingly.
|
||
func (h *Head) truncateSeriesAndChunkDiskMapper(caller string) error {
|
||
start := time.Now()
|
||
headMaxt := h.MaxTime()
|
||
actualMint, minOOOTime, minMmapFile := h.gc()
|
||
level.Info(h.logger).Log("msg", "Head GC completed", "caller", caller, "duration", time.Since(start))
|
||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||
|
||
if actualMint > h.minTime.Load() {
|
||
// The actual mint of the head is higher than the one asked to truncate.
|
||
appendableMinValidTime := h.appendableMinValidTime()
|
||
if actualMint < appendableMinValidTime {
|
||
h.minTime.Store(actualMint)
|
||
h.minValidTime.Store(actualMint)
|
||
} else {
|
||
// The actual min time is in the appendable window.
|
||
// So we set the mint to the appendableMinValidTime.
|
||
h.minTime.Store(appendableMinValidTime)
|
||
h.minValidTime.Store(appendableMinValidTime)
|
||
}
|
||
}
|
||
if headMaxt-h.opts.OutOfOrderTimeWindow.Load() < minOOOTime {
|
||
// The allowed OOO window is lower than the min OOO time seen during GC.
|
||
// So it is possible that some OOO sample was inserted that was less that minOOOTime.
|
||
// So we play safe and set it to the min that was possible.
|
||
minOOOTime = headMaxt - h.opts.OutOfOrderTimeWindow.Load()
|
||
}
|
||
h.minOOOTime.Store(minOOOTime)
|
||
|
||
// Truncate the chunk m-mapper.
|
||
if err := h.chunkDiskMapper.Truncate(uint32(minMmapFile)); err != nil {
|
||
return errors.Wrap(err, "truncate chunks.HeadReadWriter by file number")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
type Stats struct {
|
||
NumSeries uint64
|
||
MinTime, MaxTime int64
|
||
IndexPostingStats *index.PostingsStats
|
||
}
|
||
|
||
// Stats returns important current HEAD statistics. Note that it is expensive to
|
||
// calculate these.
|
||
func (h *Head) Stats(statsByLabelName string, limit int) *Stats {
|
||
return &Stats{
|
||
NumSeries: h.NumSeries(),
|
||
MaxTime: h.MaxTime(),
|
||
MinTime: h.MinTime(),
|
||
IndexPostingStats: h.PostingsCardinalityStats(statsByLabelName, limit),
|
||
}
|
||
}
|
||
|
||
// RangeHead allows querying Head via an IndexReader, ChunkReader and tombstones.Reader
|
||
// but only within a restricted range. Used for queries and compactions.
|
||
type RangeHead struct {
|
||
head *Head
|
||
mint, maxt int64
|
||
|
||
isolationOff bool
|
||
}
|
||
|
||
// NewRangeHead returns a *RangeHead.
|
||
// There are no restrictions on mint/maxt.
|
||
func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
|
||
return &RangeHead{
|
||
head: head,
|
||
mint: mint,
|
||
maxt: maxt,
|
||
}
|
||
}
|
||
|
||
// NewRangeHeadWithIsolationDisabled returns a *RangeHead that does not create an isolationState.
|
||
func NewRangeHeadWithIsolationDisabled(head *Head, mint, maxt int64) *RangeHead {
|
||
rh := NewRangeHead(head, mint, maxt)
|
||
rh.isolationOff = true
|
||
return rh
|
||
}
|
||
|
||
func (h *RangeHead) Index() (IndexReader, error) {
|
||
return h.head.indexRange(h.mint, h.maxt), nil
|
||
}
|
||
|
||
func (h *RangeHead) Chunks() (ChunkReader, error) {
|
||
var isoState *isolationState
|
||
if !h.isolationOff {
|
||
isoState = h.head.iso.State(h.mint, h.maxt)
|
||
}
|
||
return h.head.chunksRange(h.mint, h.maxt, isoState)
|
||
}
|
||
|
||
func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
|
||
return h.head.tombstones, nil
|
||
}
|
||
|
||
func (h *RangeHead) MinTime() int64 {
|
||
return h.mint
|
||
}
|
||
|
||
// MaxTime returns the max time of actual data fetch-able from the head.
|
||
// This controls the chunks time range which is closed [b.MinTime, b.MaxTime].
|
||
func (h *RangeHead) MaxTime() int64 {
|
||
return h.maxt
|
||
}
|
||
|
||
// BlockMaxTime returns the max time of the potential block created from this head.
|
||
// It's different to MaxTime as we need to add +1 millisecond to block maxt because block
|
||
// intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
|
||
func (h *RangeHead) BlockMaxTime() int64 {
|
||
return h.MaxTime() + 1
|
||
}
|
||
|
||
func (h *RangeHead) NumSeries() uint64 {
|
||
return h.head.NumSeries()
|
||
}
|
||
|
||
func (h *RangeHead) Meta() BlockMeta {
|
||
return BlockMeta{
|
||
MinTime: h.MinTime(),
|
||
MaxTime: h.MaxTime(),
|
||
ULID: h.head.Meta().ULID,
|
||
Stats: BlockStats{
|
||
NumSeries: h.NumSeries(),
|
||
},
|
||
}
|
||
}
|
||
|
||
// String returns an human readable representation of the range head. It's important to
|
||
// keep this function in order to avoid the struct dump when the head is stringified in
|
||
// errors or logs.
|
||
func (h *RangeHead) String() string {
|
||
return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
|
||
}
|
||
|
||
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
||
// label matchers.
|
||
func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
||
// Do not delete anything beyond the currently valid range.
|
||
mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime())
|
||
|
||
ir := h.indexRange(mint, maxt)
|
||
|
||
p, err := PostingsForMatchers(ctx, ir, ms...)
|
||
if err != nil {
|
||
return errors.Wrap(err, "select series")
|
||
}
|
||
|
||
var stones []tombstones.Stone
|
||
for p.Next() {
|
||
if err := ctx.Err(); err != nil {
|
||
return errors.Wrap(err, "select series")
|
||
}
|
||
|
||
series := h.series.getByID(chunks.HeadSeriesRef(p.At()))
|
||
if series == nil {
|
||
level.Debug(h.logger).Log("msg", "Series not found in Head.Delete")
|
||
continue
|
||
}
|
||
|
||
series.RLock()
|
||
t0, t1 := series.minTime(), series.maxTime()
|
||
series.RUnlock()
|
||
if t0 == math.MinInt64 || t1 == math.MinInt64 {
|
||
continue
|
||
}
|
||
// Delete only until the current values and not beyond.
|
||
t0, t1 = clampInterval(mint, maxt, t0, t1)
|
||
stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}})
|
||
}
|
||
if p.Err() != nil {
|
||
return p.Err()
|
||
}
|
||
if ctx.Err() != nil {
|
||
return errors.Wrap(err, "select series")
|
||
}
|
||
|
||
if h.wal != nil {
|
||
var enc record.Encoder
|
||
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
for _, s := range stones {
|
||
h.tombstones.AddInterval(s.Ref, s.Intervals[0])
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// gc removes data before the minimum timestamp from the head.
|
||
// It returns
|
||
// * The actual min times of the chunks present in the Head.
|
||
// * The min OOO time seen during the GC.
|
||
// * Min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||
func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
||
// Only data strictly lower than this timestamp must be deleted.
|
||
mint := h.MinTime()
|
||
// Only ooo m-map chunks strictly lower than or equal to this ref
|
||
// must be deleted.
|
||
minOOOMmapRef := chunks.ChunkDiskMapperRef(h.minOOOMmapRef.Load())
|
||
|
||
// Drop old chunks and remember series IDs and hashes if they can be
|
||
// deleted entirely.
|
||
deleted, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||
seriesRemoved := len(deleted)
|
||
|
||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||
h.numSeries.Sub(uint64(seriesRemoved))
|
||
|
||
// Remove deleted series IDs from the postings lists.
|
||
h.postings.Delete(deleted)
|
||
|
||
// Remove tombstones referring to the deleted series.
|
||
h.tombstones.DeleteTombstones(deleted)
|
||
h.tombstones.TruncateBefore(mint)
|
||
|
||
if h.wal != nil {
|
||
_, last, _ := wlog.Segments(h.wal.Dir())
|
||
h.deletedMtx.Lock()
|
||
// Keep series records until we're past segment 'last'
|
||
// because the WAL will still have samples records with
|
||
// this ref ID. If we didn't keep these series records then
|
||
// on start up when we replay the WAL, or any other code
|
||
// that reads the WAL, wouldn't be able to use those
|
||
// samples since we would have no labels for that ref ID.
|
||
for ref := range deleted {
|
||
h.deleted[chunks.HeadSeriesRef(ref)] = last
|
||
}
|
||
h.deletedMtx.Unlock()
|
||
}
|
||
|
||
return actualInOrderMint, minOOOTime, minMmapFile
|
||
}
|
||
|
||
// Tombstones returns a new reader over the head's tombstones
|
||
func (h *Head) Tombstones() (tombstones.Reader, error) {
|
||
return h.tombstones, nil
|
||
}
|
||
|
||
// NumSeries returns the number of active series in the head.
|
||
func (h *Head) NumSeries() uint64 {
|
||
return h.numSeries.Load()
|
||
}
|
||
|
||
// Meta returns meta information about the head.
|
||
// The head is dynamic so will return dynamic results.
|
||
func (h *Head) Meta() BlockMeta {
|
||
var id [16]byte
|
||
copy(id[:], "______head______")
|
||
return BlockMeta{
|
||
MinTime: h.MinTime(),
|
||
MaxTime: h.MaxTime(),
|
||
ULID: ulid.ULID(id),
|
||
Stats: BlockStats{
|
||
NumSeries: h.NumSeries(),
|
||
},
|
||
}
|
||
}
|
||
|
||
// MinTime returns the lowest time bound on visible data in the head.
|
||
func (h *Head) MinTime() int64 {
|
||
return h.minTime.Load()
|
||
}
|
||
|
||
// MaxTime returns the highest timestamp seen in data of the head.
|
||
func (h *Head) MaxTime() int64 {
|
||
return h.maxTime.Load()
|
||
}
|
||
|
||
// MinOOOTime returns the lowest time bound on visible data in the out of order
|
||
// head.
|
||
func (h *Head) MinOOOTime() int64 {
|
||
return h.minOOOTime.Load()
|
||
}
|
||
|
||
// MaxOOOTime returns the highest timestamp on visible data in the out of order
|
||
// head.
|
||
func (h *Head) MaxOOOTime() int64 {
|
||
return h.maxOOOTime.Load()
|
||
}
|
||
|
||
// compactable returns whether the head has a compactable range.
|
||
// The head has a compactable range when the head time range is 1.5 times the chunk range.
|
||
// The 0.5 acts as a buffer of the appendable window.
|
||
func (h *Head) compactable() bool {
|
||
return h.MaxTime()-h.MinTime() > h.chunkRange.Load()/2*3
|
||
}
|
||
|
||
// Close flushes the WAL and closes the head.
|
||
// It also takes a snapshot of in-memory chunks if enabled.
|
||
func (h *Head) Close() error {
|
||
h.closedMtx.Lock()
|
||
defer h.closedMtx.Unlock()
|
||
h.closed = true
|
||
|
||
// mmap all but last chunk in case we're performing snapshot since that only
|
||
// takes samples from most recent head chunk.
|
||
h.mmapHeadChunks()
|
||
|
||
errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close())
|
||
if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown {
|
||
errs.Add(h.performChunkSnapshot())
|
||
}
|
||
if h.wal != nil {
|
||
errs.Add(h.wal.Close())
|
||
}
|
||
if h.wbl != nil {
|
||
errs.Add(h.wbl.Close())
|
||
}
|
||
if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown {
|
||
errs.Add(h.performChunkSnapshot())
|
||
}
|
||
return errs.Err()
|
||
}
|
||
|
||
// String returns an human readable representation of the TSDB head. It's important to
|
||
// keep this function in order to avoid the struct dump when the head is stringified in
|
||
// errors or logs.
|
||
func (h *Head) String() string {
|
||
return "head"
|
||
}
|
||
|
||
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||
// Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create
|
||
// a new series on every sample inserted via Add(), which causes allocations
|
||
// and makes our series IDs rather random and harder to compress in postings.
|
||
s := h.series.getByHash(hash, lset)
|
||
if s != nil {
|
||
return s, false, nil
|
||
}
|
||
|
||
// Optimistically assume that we are the first one to create the series.
|
||
id := chunks.HeadSeriesRef(h.lastSeriesID.Inc())
|
||
|
||
return h.getOrCreateWithID(id, hash, lset)
|
||
}
|
||
|
||
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
||
return newMemSeries(lset, id, h.opts.IsolationDisabled)
|
||
})
|
||
if err != nil {
|
||
return nil, false, err
|
||
}
|
||
if !created {
|
||
return s, false, nil
|
||
}
|
||
|
||
h.metrics.seriesCreated.Inc()
|
||
h.numSeries.Inc()
|
||
|
||
h.postings.Add(storage.SeriesRef(id), lset)
|
||
return s, true, nil
|
||
}
|
||
|
||
// mmapHeadChunks will iterate all memSeries stored on Head and call mmapHeadChunks() on each of them.
|
||
//
|
||
// There are two types of chunks that store samples for each memSeries:
|
||
// A) Head chunk - stored on Go heap, when new samples are appended they go there.
|
||
// B) M-mapped chunks - memory mapped chunks, kernel manages the memory for us on-demand, these chunks
|
||
//
|
||
// are read-only.
|
||
//
|
||
// Calling mmapHeadChunks() will iterate all memSeries and m-mmap all chunks that should be m-mapped.
|
||
// The m-mapping operation is needs to be serialised and so it goes via central lock.
|
||
// If there are multiple concurrent memSeries that need to m-map some chunk then they can block each-other.
|
||
//
|
||
// To minimise the effect of locking on TSDB operations m-mapping is serialised and done away from
|
||
// sample append path, since waiting on a lock inside an append would lock the entire memSeries for
|
||
// (potentially) a long time, since that could eventually delay next scrape and/or cause query timeouts.
|
||
func (h *Head) mmapHeadChunks() {
|
||
var count int
|
||
for i := 0; i < h.series.size; i++ {
|
||
h.series.locks[i].RLock()
|
||
for _, all := range h.series.hashes[i] {
|
||
for _, series := range all {
|
||
series.Lock()
|
||
count += series.mmapChunks(h.chunkDiskMapper)
|
||
series.Unlock()
|
||
}
|
||
}
|
||
h.series.locks[i].RUnlock()
|
||
}
|
||
h.metrics.mmapChunksTotal.Add(float64(count))
|
||
}
|
||
|
||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
||
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
|
||
// Its methods require the hash to be submitted with it to avoid re-computations throughout
|
||
// the code.
|
||
type seriesHashmap map[uint64][]*memSeries
|
||
|
||
func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
|
||
for _, s := range m[hash] {
|
||
if labels.Equal(s.lset, lset) {
|
||
return s
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (m seriesHashmap) set(hash uint64, s *memSeries) {
|
||
l := m[hash]
|
||
for i, prev := range l {
|
||
if labels.Equal(prev.lset, s.lset) {
|
||
l[i] = s
|
||
return
|
||
}
|
||
}
|
||
m[hash] = append(l, s)
|
||
}
|
||
|
||
func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
|
||
var rem []*memSeries
|
||
for _, s := range m[hash] {
|
||
if !labels.Equal(s.lset, lset) {
|
||
rem = append(rem, s)
|
||
}
|
||
}
|
||
if len(rem) == 0 {
|
||
delete(m, hash)
|
||
} else {
|
||
m[hash] = rem
|
||
}
|
||
}
|
||
|
||
const (
|
||
// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
|
||
DefaultStripeSize = 1 << 14
|
||
)
|
||
|
||
// stripeSeries holds series by HeadSeriesRef ("ID") and also by hash of their labels.
|
||
// ID-based lookups via getByID() are preferred over getByHash() for performance reasons.
|
||
// It locks modulo ranges of IDs and hashes to reduce lock contention.
|
||
// The locks are padded to not be on the same cache line. Filling the padded space
|
||
// with the maps was profiled to be slower – likely due to the additional pointer
|
||
// dereferences.
|
||
type stripeSeries struct {
|
||
size int
|
||
series []map[chunks.HeadSeriesRef]*memSeries // Sharded by ref. A series ref is the value of `size` when the series was being newly added.
|
||
hashes []seriesHashmap // Sharded by label hash.
|
||
locks []stripeLock // Sharded by ref for series access, by label hash for hashes access.
|
||
seriesLifecycleCallback SeriesLifecycleCallback
|
||
}
|
||
|
||
type stripeLock struct {
|
||
sync.RWMutex
|
||
// Padding to avoid multiple locks being on the same cache line.
|
||
_ [40]byte
|
||
}
|
||
|
||
func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *stripeSeries {
|
||
s := &stripeSeries{
|
||
size: stripeSize,
|
||
series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize),
|
||
hashes: make([]seriesHashmap, stripeSize),
|
||
locks: make([]stripeLock, stripeSize),
|
||
seriesLifecycleCallback: seriesCallback,
|
||
}
|
||
|
||
for i := range s.series {
|
||
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
|
||
}
|
||
for i := range s.hashes {
|
||
s.hashes[i] = seriesHashmap{}
|
||
}
|
||
return s
|
||
}
|
||
|
||
// gc garbage collects old chunks that are strictly before mint and removes
|
||
// series entirely that have no chunks left.
|
||
// note: returning map[chunks.HeadSeriesRef]struct{} would be more accurate,
|
||
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
|
||
// and there's no easy way to cast maps.
|
||
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) {
|
||
var (
|
||
deleted = map[storage.SeriesRef]struct{}{}
|
||
rmChunks = 0
|
||
actualMint int64 = math.MaxInt64
|
||
minOOOTime int64 = math.MaxInt64
|
||
deletedFromPrevStripe = 0
|
||
)
|
||
minMmapFile = math.MaxInt32
|
||
// Run through all series and truncate old chunks. Mark those with no
|
||
// chunks left as deleted and store their ID.
|
||
for i := 0; i < s.size; i++ {
|
||
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe)
|
||
s.locks[i].Lock()
|
||
|
||
for hash, all := range s.hashes[i] {
|
||
for _, series := range all {
|
||
series.Lock()
|
||
rmChunks += series.truncateChunksBefore(mint, minOOOMmapRef)
|
||
|
||
if len(series.mmappedChunks) > 0 {
|
||
seq, _ := series.mmappedChunks[0].ref.Unpack()
|
||
if seq < minMmapFile {
|
||
minMmapFile = seq
|
||
}
|
||
}
|
||
if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 {
|
||
seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack()
|
||
if seq < minMmapFile {
|
||
minMmapFile = seq
|
||
}
|
||
for _, ch := range series.ooo.oooMmappedChunks {
|
||
if ch.minTime < minOOOTime {
|
||
minOOOTime = ch.minTime
|
||
}
|
||
}
|
||
}
|
||
if series.ooo != nil && series.ooo.oooHeadChunk != nil {
|
||
if series.ooo.oooHeadChunk.minTime < minOOOTime {
|
||
minOOOTime = series.ooo.oooHeadChunk.minTime
|
||
}
|
||
}
|
||
if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit ||
|
||
(series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) {
|
||
seriesMint := series.minTime()
|
||
if seriesMint < actualMint {
|
||
actualMint = seriesMint
|
||
}
|
||
series.Unlock()
|
||
continue
|
||
}
|
||
|
||
// The series is gone entirely. We need to keep the series lock
|
||
// and make sure we have acquired the stripe locks for hash and ID of the
|
||
// series alike.
|
||
// If we don't hold them all, there's a very small chance that a series receives
|
||
// samples again while we are half-way into deleting it.
|
||
j := int(series.ref) & (s.size - 1)
|
||
|
||
if i != j {
|
||
s.locks[j].Lock()
|
||
}
|
||
|
||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||
s.hashes[i].del(hash, series.lset)
|
||
delete(s.series[j], series.ref)
|
||
deletedForCallback[series.ref] = series.lset
|
||
|
||
if i != j {
|
||
s.locks[j].Unlock()
|
||
}
|
||
|
||
series.Unlock()
|
||
}
|
||
}
|
||
|
||
s.locks[i].Unlock()
|
||
|
||
s.seriesLifecycleCallback.PostDeletion(deletedForCallback)
|
||
deletedFromPrevStripe = len(deletedForCallback)
|
||
}
|
||
|
||
if actualMint == math.MaxInt64 {
|
||
actualMint = mint
|
||
}
|
||
|
||
return deleted, rmChunks, actualMint, minOOOTime, minMmapFile
|
||
}
|
||
|
||
func (s *stripeSeries) getByID(id chunks.HeadSeriesRef) *memSeries {
|
||
i := uint64(id) & uint64(s.size-1)
|
||
|
||
s.locks[i].RLock()
|
||
series := s.series[i][id]
|
||
s.locks[i].RUnlock()
|
||
|
||
return series
|
||
}
|
||
|
||
func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
|
||
i := hash & uint64(s.size-1)
|
||
|
||
s.locks[i].RLock()
|
||
series := s.hashes[i].get(hash, lset)
|
||
s.locks[i].RUnlock()
|
||
|
||
return series
|
||
}
|
||
|
||
func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) {
|
||
// PreCreation is called here to avoid calling it inside the lock.
|
||
// It is not necessary to call it just before creating a series,
|
||
// rather it gives a 'hint' whether to create a series or not.
|
||
preCreationErr := s.seriesLifecycleCallback.PreCreation(lset)
|
||
|
||
// Create the series, unless the PreCreation() callback as failed.
|
||
// If failed, we'll not allow to create a new series anyway.
|
||
var series *memSeries
|
||
if preCreationErr == nil {
|
||
series = createSeries()
|
||
}
|
||
|
||
i := hash & uint64(s.size-1)
|
||
s.locks[i].Lock()
|
||
|
||
if prev := s.hashes[i].get(hash, lset); prev != nil {
|
||
s.locks[i].Unlock()
|
||
return prev, false, nil
|
||
}
|
||
if preCreationErr == nil {
|
||
s.hashes[i].set(hash, series)
|
||
}
|
||
s.locks[i].Unlock()
|
||
|
||
if preCreationErr != nil {
|
||
// The callback prevented creation of series.
|
||
return nil, false, preCreationErr
|
||
}
|
||
// Setting the series in the s.hashes marks the creation of series
|
||
// as any further calls to this methods would return that series.
|
||
s.seriesLifecycleCallback.PostCreation(series.lset)
|
||
|
||
i = uint64(series.ref) & uint64(s.size-1)
|
||
|
||
s.locks[i].Lock()
|
||
s.series[i][series.ref] = series
|
||
s.locks[i].Unlock()
|
||
|
||
return series, true, nil
|
||
}
|
||
|
||
type sample struct {
|
||
t int64
|
||
f float64
|
||
h *histogram.Histogram
|
||
fh *histogram.FloatHistogram
|
||
}
|
||
|
||
func newSample(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample {
|
||
return sample{t, v, h, fh}
|
||
}
|
||
|
||
func (s sample) T() int64 { return s.t }
|
||
func (s sample) F() float64 { return s.f }
|
||
func (s sample) H() *histogram.Histogram { return s.h }
|
||
func (s sample) FH() *histogram.FloatHistogram { return s.fh }
|
||
|
||
func (s sample) Type() chunkenc.ValueType {
|
||
switch {
|
||
case s.h != nil:
|
||
return chunkenc.ValHistogram
|
||
case s.fh != nil:
|
||
return chunkenc.ValFloatHistogram
|
||
default:
|
||
return chunkenc.ValFloat
|
||
}
|
||
}
|
||
|
||
// memSeries is the in-memory representation of a series. None of its methods
|
||
// are goroutine safe and it is the caller's responsibility to lock it.
|
||
type memSeries struct {
|
||
sync.RWMutex
|
||
|
||
ref chunks.HeadSeriesRef
|
||
lset labels.Labels
|
||
meta *metadata.Metadata
|
||
|
||
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
|
||
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:
|
||
//
|
||
// /------- let's say these 2 chunks get stored into a block
|
||
// | |
|
||
// before compaction: mmappedChunks=[p5,p6,p7,p8,p9] firstChunkID=5
|
||
// after compaction: mmappedChunks=[p7,p8,p9] firstChunkID=7
|
||
//
|
||
// pN is the pointer to the mmappedChunk referered to by HeadChunkID=N
|
||
mmappedChunks []*mmappedChunk
|
||
// Most recent chunks in memory that are still being built or waiting to be mmapped.
|
||
// This is a linked list, headChunks points to the most recent chunk, headChunks.next points
|
||
// to older chunk and so on.
|
||
headChunks *memChunk
|
||
firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0]
|
||
|
||
ooo *memSeriesOOOFields
|
||
|
||
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
|
||
|
||
nextAt int64 // Timestamp at which to cut the next chunk.
|
||
histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise.
|
||
|
||
// We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates.
|
||
lastValue float64
|
||
|
||
// We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates.
|
||
lastHistogramValue *histogram.Histogram
|
||
lastFloatHistogramValue *histogram.FloatHistogram
|
||
|
||
// Current appender for the head chunk. Set when a new head chunk is cut.
|
||
// It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit
|
||
// (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series).
|
||
app chunkenc.Appender
|
||
|
||
// txs is nil if isolation is disabled.
|
||
txs *txRing
|
||
|
||
pendingCommit bool // Whether there are samples waiting to be committed to this series.
|
||
}
|
||
|
||
// memSeriesOOOFields contains the fields required by memSeries
|
||
// to handle out-of-order data.
|
||
type memSeriesOOOFields struct {
|
||
oooMmappedChunks []*mmappedChunk // Immutable chunks on disk containing OOO samples.
|
||
oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built.
|
||
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
|
||
}
|
||
|
||
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries {
|
||
s := &memSeries{
|
||
lset: lset,
|
||
ref: id,
|
||
nextAt: math.MinInt64,
|
||
}
|
||
if !isolationDisabled {
|
||
s.txs = newTxRing(4)
|
||
}
|
||
return s
|
||
}
|
||
|
||
func (s *memSeries) minTime() int64 {
|
||
if len(s.mmappedChunks) > 0 {
|
||
return s.mmappedChunks[0].minTime
|
||
}
|
||
if s.headChunks != nil {
|
||
return s.headChunks.oldest().minTime
|
||
}
|
||
return math.MinInt64
|
||
}
|
||
|
||
func (s *memSeries) maxTime() int64 {
|
||
// The highest timestamps will always be in the regular (non-OOO) chunks, even if OOO is enabled.
|
||
if s.headChunks != nil {
|
||
return s.headChunks.maxTime
|
||
}
|
||
if len(s.mmappedChunks) > 0 {
|
||
return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime
|
||
}
|
||
return math.MinInt64
|
||
}
|
||
|
||
// truncateChunksBefore removes all chunks from the series that
|
||
// have no timestamp at or after mint.
|
||
// Chunk IDs remain unchanged.
|
||
func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) int {
|
||
var removedInOrder int
|
||
if s.headChunks != nil {
|
||
var i int
|
||
var nextChk *memChunk
|
||
chk := s.headChunks
|
||
for chk != nil {
|
||
if chk.maxTime < mint {
|
||
// If any head chunk is truncated, we can truncate all mmapped chunks.
|
||
removedInOrder = chk.len() + len(s.mmappedChunks)
|
||
s.firstChunkID += chunks.HeadChunkID(removedInOrder)
|
||
if i == 0 {
|
||
// This is the first chunk on the list so we need to remove the entire list.
|
||
s.headChunks = nil
|
||
} else {
|
||
// This is NOT the first chunk, unlink it from parent.
|
||
nextChk.prev = nil
|
||
}
|
||
s.mmappedChunks = nil
|
||
break
|
||
}
|
||
nextChk = chk
|
||
chk = chk.prev
|
||
i++
|
||
}
|
||
}
|
||
if len(s.mmappedChunks) > 0 {
|
||
for i, c := range s.mmappedChunks {
|
||
if c.maxTime >= mint {
|
||
break
|
||
}
|
||
removedInOrder = i + 1
|
||
}
|
||
s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removedInOrder:]...)
|
||
s.firstChunkID += chunks.HeadChunkID(removedInOrder)
|
||
}
|
||
|
||
var removedOOO int
|
||
if s.ooo != nil && len(s.ooo.oooMmappedChunks) > 0 {
|
||
for i, c := range s.ooo.oooMmappedChunks {
|
||
if c.ref.GreaterThan(minOOOMmapRef) {
|
||
break
|
||
}
|
||
removedOOO = i + 1
|
||
}
|
||
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks[:0], s.ooo.oooMmappedChunks[removedOOO:]...)
|
||
s.ooo.firstOOOChunkID += chunks.HeadChunkID(removedOOO)
|
||
|
||
if len(s.ooo.oooMmappedChunks) == 0 && s.ooo.oooHeadChunk == nil {
|
||
s.ooo = nil
|
||
}
|
||
}
|
||
|
||
return removedInOrder + removedOOO
|
||
}
|
||
|
||
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
|
||
// acquiring lock.
|
||
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
|
||
if s.txs != nil {
|
||
s.txs.cleanupAppendIDsBelow(bound)
|
||
}
|
||
}
|
||
|
||
type memChunk struct {
|
||
chunk chunkenc.Chunk
|
||
minTime, maxTime int64
|
||
prev *memChunk // Link to the previous element on the list.
|
||
}
|
||
|
||
// len returns the length of memChunk list, including the element it was called on.
|
||
func (mc *memChunk) len() (count int) {
|
||
elem := mc
|
||
for elem != nil {
|
||
count++
|
||
elem = elem.prev
|
||
}
|
||
return count
|
||
}
|
||
|
||
// oldest returns the oldest element on the list.
|
||
// For single element list this will be the same memChunk oldest() was called on.
|
||
func (mc *memChunk) oldest() (elem *memChunk) {
|
||
elem = mc
|
||
for elem.prev != nil {
|
||
elem = elem.prev
|
||
}
|
||
return elem
|
||
}
|
||
|
||
// atOffset returns a memChunk that's Nth element on the linked list.
|
||
func (mc *memChunk) atOffset(offset int) (elem *memChunk) {
|
||
if offset == 0 {
|
||
return mc
|
||
}
|
||
if offset < 0 {
|
||
return nil
|
||
}
|
||
|
||
var i int
|
||
elem = mc
|
||
for i < offset {
|
||
i++
|
||
elem = elem.prev
|
||
if elem == nil {
|
||
break
|
||
}
|
||
}
|
||
|
||
return elem
|
||
}
|
||
|
||
type oooHeadChunk struct {
|
||
chunk *OOOChunk
|
||
minTime, maxTime int64 // can probably be removed and pulled out of the chunk instead
|
||
}
|
||
|
||
// OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt].
|
||
func (mc *oooHeadChunk) OverlapsClosedInterval(mint, maxt int64) bool {
|
||
return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt)
|
||
}
|
||
|
||
// OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt].
|
||
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
|
||
return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt)
|
||
}
|
||
|
||
func overlapsClosedInterval(mint1, maxt1, mint2, maxt2 int64) bool {
|
||
return mint1 <= maxt2 && mint2 <= maxt1
|
||
}
|
||
|
||
// mmappedChunk describes a head chunk on disk that has been mmapped
|
||
type mmappedChunk struct {
|
||
ref chunks.ChunkDiskMapperRef
|
||
numSamples uint16
|
||
minTime, maxTime int64
|
||
}
|
||
|
||
// Returns true if the chunk overlaps [mint, maxt].
|
||
func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool {
|
||
return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt)
|
||
}
|
||
|
||
type noopSeriesLifecycleCallback struct{}
|
||
|
||
func (noopSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil }
|
||
func (noopSeriesLifecycleCallback) PostCreation(labels.Labels) {}
|
||
func (noopSeriesLifecycleCallback) PostDeletion(map[chunks.HeadSeriesRef]labels.Labels) {}
|
||
|
||
func (h *Head) Size() int64 {
|
||
var walSize, wblSize int64
|
||
if h.wal != nil {
|
||
walSize, _ = h.wal.Size()
|
||
}
|
||
if h.wbl != nil {
|
||
wblSize, _ = h.wbl.Size()
|
||
}
|
||
cdmSize, _ := h.chunkDiskMapper.Size()
|
||
return walSize + wblSize + cdmSize
|
||
}
|
||
|
||
func (h *RangeHead) Size() int64 {
|
||
return h.head.Size()
|
||
}
|
||
|
||
func (h *Head) startWALReplayStatus(startFrom, last int) {
|
||
h.stats.WALReplayStatus.Lock()
|
||
defer h.stats.WALReplayStatus.Unlock()
|
||
|
||
h.stats.WALReplayStatus.Min = startFrom
|
||
h.stats.WALReplayStatus.Max = last
|
||
h.stats.WALReplayStatus.Current = startFrom
|
||
}
|
||
|
||
func (h *Head) updateWALReplayStatusRead(current int) {
|
||
h.stats.WALReplayStatus.Lock()
|
||
defer h.stats.WALReplayStatus.Unlock()
|
||
|
||
h.stats.WALReplayStatus.Current = current
|
||
}
|