mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
48d25e6fe7
Rather than keeping the offset of each postings list, instead keep the nth offset of the offset of the posting list. As postings list offsets have always been sorted, we can then get to the closest entry before the one we want an iterate forwards. I haven't done much tuning on the 32 number, it was chosen to try not to read through more than a 4k page of data. Switch to a bulk interface for fetching postings. Use it to avoid having to re-read parts of the posting offset table when querying lots of it. For a index with what BenchmarkHeadPostingForMatchers uses RAM for r.postings drops from 3.79MB to 80.19kB or about 48x. Bytes allocated go down by 30%, and suprisingly CPU usage drops by 4-6% for typical queries too. benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Block/n="1"-4 35231 36673 +4.09% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 563380 540627 -4.04% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 536782 534186 -0.48% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 533990 541550 +1.42% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113374598 117969608 +4.05% BenchmarkPostingsForMatchers/Block/i=~".+"-4 146329884 139651442 -4.56% BenchmarkPostingsForMatchers/Block/i=~""-4 50346510 44961127 -10.70% BenchmarkPostingsForMatchers/Block/i!=""-4 41261550 35356165 -14.31% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112544418 116904010 +3.87% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 112487086 116864918 +3.89% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 41094758 35457904 -13.72% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 41906372 36151473 -13.73% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 147262414 140424800 -4.64% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 28615629 27872072 -2.60% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 147117177 140462403 -4.52% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 175096826 167902298 -4.11% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Block/n="1"-4 4 6 +50.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 17 +13.33% BenchmarkPostingsForMatchers/Block/i=~".*"-4 100010 100012 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 200069 200040 -0.01% BenchmarkPostingsForMatchers/Block/i=~""-4 200072 200045 -0.01% BenchmarkPostingsForMatchers/Block/i!=""-4 200070 200041 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 100013 100017 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 100017 100023 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 200073 200046 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 200075 200050 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 200074 200049 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 111165 111150 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 200078 200055 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 311282 311238 -0.01% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Block/n="1"-4 264 296 +12.12% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 520 552 +6.15% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1600461 1600482 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 24900801 17259077 -30.69% BenchmarkPostingsForMatchers/Block/i=~""-4 24900836 17259151 -30.69% BenchmarkPostingsForMatchers/Block/i!=""-4 24900760 17259048 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1600557 1600621 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1600717 1600813 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 24900856 17259176 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 24900952 17259304 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 24900993 17259333 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3788311 3142630 -17.04% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 24901137 17259509 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 28693086 20405680 -28.88% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
1972 lines
50 KiB
Go
1972 lines
50 KiB
Go
// Copyright 2017 The Prometheus Authors
|
||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
// you may not use this file except in compliance with the License.
|
||
// You may obtain a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
// See the License for the specific language governing permissions and
|
||
// limitations under the License.
|
||
|
||
package tsdb
|
||
|
||
import (
|
||
"fmt"
|
||
"math"
|
||
"runtime"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/go-kit/kit/log"
|
||
"github.com/go-kit/kit/log/level"
|
||
"github.com/oklog/ulid"
|
||
"github.com/pkg/errors"
|
||
"github.com/prometheus/client_golang/prometheus"
|
||
"github.com/prometheus/prometheus/pkg/labels"
|
||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||
"github.com/prometheus/prometheus/tsdb/encoding"
|
||
"github.com/prometheus/prometheus/tsdb/index"
|
||
"github.com/prometheus/prometheus/tsdb/record"
|
||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||
"github.com/prometheus/prometheus/tsdb/wal"
|
||
)
|
||
|
||
var (
|
||
// ErrNotFound is returned if a looked up resource was not found.
|
||
ErrNotFound = errors.Errorf("not found")
|
||
|
||
// ErrOutOfOrderSample is returned if an appended sample has a
|
||
// timestamp smaller than the most recent sample.
|
||
ErrOutOfOrderSample = errors.New("out of order sample")
|
||
|
||
// ErrAmendSample is returned if an appended sample has the same timestamp
|
||
// as the most recent sample but a different value.
|
||
ErrAmendSample = errors.New("amending sample")
|
||
|
||
// ErrOutOfBounds is returned if an appended sample is out of the
|
||
// writable time range.
|
||
ErrOutOfBounds = errors.New("out of bounds")
|
||
|
||
// emptyTombstoneReader is a no-op Tombstone Reader.
|
||
// This is used by head to satisfy the Tombstones() function call.
|
||
emptyTombstoneReader = tombstones.NewMemTombstones()
|
||
)
|
||
|
||
// Head handles reads and writes of time series data within a time window.
|
||
type Head struct {
|
||
// Keep all 64bit atomically accessed variables at the top of this struct.
|
||
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info.
|
||
chunkRange int64
|
||
numSeries uint64
|
||
minTime, maxTime int64 // Current min and max of the samples included in the head.
|
||
minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
|
||
lastSeriesID uint64
|
||
|
||
metrics *headMetrics
|
||
wal *wal.WAL
|
||
logger log.Logger
|
||
appendPool sync.Pool
|
||
seriesPool sync.Pool
|
||
bytesPool sync.Pool
|
||
|
||
// All series addressable by their ID or hash.
|
||
series *stripeSeries
|
||
|
||
symMtx sync.RWMutex
|
||
symbols map[string]struct{}
|
||
values map[string]stringset // label names to possible values
|
||
|
||
deletedMtx sync.Mutex
|
||
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
|
||
|
||
postings *index.MemPostings // postings lists for terms
|
||
|
||
cardinalityMutex sync.Mutex
|
||
cardinalityCache *index.PostingsStats // posting stats cache which will expire after 30sec
|
||
lastPostingsStatsCall time.Duration // last posting stats call (PostgingsCardinalityStats()) time for caching
|
||
}
|
||
|
||
type headMetrics struct {
|
||
activeAppenders prometheus.Gauge
|
||
series prometheus.GaugeFunc
|
||
seriesCreated prometheus.Counter
|
||
seriesRemoved prometheus.Counter
|
||
seriesNotFound prometheus.Counter
|
||
chunks prometheus.Gauge
|
||
chunksCreated prometheus.Counter
|
||
chunksRemoved prometheus.Counter
|
||
gcDuration prometheus.Summary
|
||
minTime prometheus.GaugeFunc
|
||
maxTime prometheus.GaugeFunc
|
||
samplesAppended prometheus.Counter
|
||
walTruncateDuration prometheus.Summary
|
||
walCorruptionsTotal prometheus.Counter
|
||
headTruncateFail prometheus.Counter
|
||
headTruncateTotal prometheus.Counter
|
||
checkpointDeleteFail prometheus.Counter
|
||
checkpointDeleteTotal prometheus.Counter
|
||
checkpointCreationFail prometheus.Counter
|
||
checkpointCreationTotal prometheus.Counter
|
||
}
|
||
|
||
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
||
m := &headMetrics{}
|
||
|
||
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_active_appenders",
|
||
Help: "Number of currently active appender transactions",
|
||
})
|
||
m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_series",
|
||
Help: "Total number of series in the head block.",
|
||
}, func() float64 {
|
||
return float64(h.NumSeries())
|
||
})
|
||
m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_series_created_total",
|
||
Help: "Total number of series created in the head",
|
||
})
|
||
m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_series_removed_total",
|
||
Help: "Total number of series removed in the head",
|
||
})
|
||
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_series_not_found_total",
|
||
Help: "Total number of requests for series that were not found.",
|
||
})
|
||
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_chunks",
|
||
Help: "Total number of chunks in the head block.",
|
||
})
|
||
m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_chunks_created_total",
|
||
Help: "Total number of chunks created in the head",
|
||
})
|
||
m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_chunks_removed_total",
|
||
Help: "Total number of chunks removed in the head",
|
||
})
|
||
m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||
Name: "prometheus_tsdb_head_gc_duration_seconds",
|
||
Help: "Runtime of garbage collection in the head block.",
|
||
Objectives: map[float64]float64{},
|
||
})
|
||
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_max_time",
|
||
Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
|
||
}, func() float64 {
|
||
return float64(h.MaxTime())
|
||
})
|
||
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||
Name: "prometheus_tsdb_head_min_time",
|
||
Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
|
||
}, func() float64 {
|
||
return float64(h.MinTime())
|
||
})
|
||
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
|
||
Help: "Duration of WAL truncation.",
|
||
Objectives: map[float64]float64{},
|
||
})
|
||
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_wal_corruptions_total",
|
||
Help: "Total number of WAL corruptions.",
|
||
})
|
||
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_samples_appended_total",
|
||
Help: "Total number of appended samples.",
|
||
})
|
||
m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_truncations_failed_total",
|
||
Help: "Total number of head truncations that failed.",
|
||
})
|
||
m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_head_truncations_total",
|
||
Help: "Total number of head truncations attempted.",
|
||
})
|
||
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
|
||
Help: "Total number of checkpoint deletions that failed.",
|
||
})
|
||
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_checkpoint_deletions_total",
|
||
Help: "Total number of checkpoint deletions attempted.",
|
||
})
|
||
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
|
||
Help: "Total number of checkpoint creations that failed.",
|
||
})
|
||
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||
Name: "prometheus_tsdb_checkpoint_creations_total",
|
||
Help: "Total number of checkpoint creations attempted.",
|
||
})
|
||
|
||
if r != nil {
|
||
r.MustRegister(
|
||
m.activeAppenders,
|
||
m.chunks,
|
||
m.chunksCreated,
|
||
m.chunksRemoved,
|
||
m.series,
|
||
m.seriesCreated,
|
||
m.seriesRemoved,
|
||
m.seriesNotFound,
|
||
m.minTime,
|
||
m.maxTime,
|
||
m.gcDuration,
|
||
m.walTruncateDuration,
|
||
m.walCorruptionsTotal,
|
||
m.samplesAppended,
|
||
m.headTruncateFail,
|
||
m.headTruncateTotal,
|
||
m.checkpointDeleteFail,
|
||
m.checkpointDeleteTotal,
|
||
m.checkpointCreationFail,
|
||
m.checkpointCreationTotal,
|
||
)
|
||
}
|
||
return m
|
||
}
|
||
|
||
const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
|
||
|
||
// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.
|
||
func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats {
|
||
h.cardinalityMutex.Lock()
|
||
defer h.cardinalityMutex.Unlock()
|
||
currentTime := time.Duration(time.Now().Unix()) * time.Second
|
||
seconds := currentTime - h.lastPostingsStatsCall
|
||
if seconds > cardinalityCacheExpirationTime {
|
||
h.cardinalityCache = nil
|
||
}
|
||
if h.cardinalityCache != nil {
|
||
return h.cardinalityCache
|
||
}
|
||
h.cardinalityCache = h.postings.Stats(statsByLabelName)
|
||
h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second
|
||
|
||
return h.cardinalityCache
|
||
}
|
||
|
||
// NewHead opens the head block in dir.
|
||
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) {
|
||
if l == nil {
|
||
l = log.NewNopLogger()
|
||
}
|
||
if chunkRange < 1 {
|
||
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
|
||
}
|
||
h := &Head{
|
||
wal: wal,
|
||
logger: l,
|
||
chunkRange: chunkRange,
|
||
minTime: math.MaxInt64,
|
||
maxTime: math.MinInt64,
|
||
series: newStripeSeries(),
|
||
values: map[string]stringset{},
|
||
symbols: map[string]struct{}{},
|
||
postings: index.NewUnorderedMemPostings(),
|
||
deleted: map[uint64]int{},
|
||
}
|
||
h.metrics = newHeadMetrics(h, r)
|
||
|
||
return h, nil
|
||
}
|
||
|
||
// processWALSamples adds a partition of samples it receives to the head and passes
|
||
// them on to other workers.
|
||
// Samples before the mint timestamp are discarded.
|
||
func (h *Head) processWALSamples(
|
||
minValidTime int64,
|
||
input <-chan []record.RefSample, output chan<- []record.RefSample,
|
||
) (unknownRefs uint64) {
|
||
defer close(output)
|
||
|
||
// Mitigate lock contention in getByID.
|
||
refSeries := map[uint64]*memSeries{}
|
||
|
||
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
||
|
||
for samples := range input {
|
||
for _, s := range samples {
|
||
if s.T < minValidTime {
|
||
continue
|
||
}
|
||
ms := refSeries[s.Ref]
|
||
if ms == nil {
|
||
ms = h.series.getByID(s.Ref)
|
||
if ms == nil {
|
||
unknownRefs++
|
||
continue
|
||
}
|
||
refSeries[s.Ref] = ms
|
||
}
|
||
_, chunkCreated := ms.append(s.T, s.V)
|
||
if chunkCreated {
|
||
h.metrics.chunksCreated.Inc()
|
||
h.metrics.chunks.Inc()
|
||
}
|
||
if s.T > maxt {
|
||
maxt = s.T
|
||
}
|
||
if s.T < mint {
|
||
mint = s.T
|
||
}
|
||
}
|
||
output <- samples
|
||
}
|
||
h.updateMinMaxTime(mint, maxt)
|
||
|
||
return unknownRefs
|
||
}
|
||
|
||
func (h *Head) updateMinMaxTime(mint, maxt int64) {
|
||
for {
|
||
lt := h.MinTime()
|
||
if mint >= lt {
|
||
break
|
||
}
|
||
if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) {
|
||
break
|
||
}
|
||
}
|
||
for {
|
||
ht := h.MaxTime()
|
||
if maxt <= ht {
|
||
break
|
||
}
|
||
if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
|
||
// Track number of samples that referenced a series we don't know about
|
||
// for error reporting.
|
||
var unknownRefs uint64
|
||
|
||
// Start workers that each process samples for a partition of the series ID space.
|
||
// They are connected through a ring of channels which ensures that all sample batches
|
||
// read from the WAL are processed in order.
|
||
var (
|
||
wg sync.WaitGroup
|
||
n = runtime.GOMAXPROCS(0)
|
||
inputs = make([]chan []record.RefSample, n)
|
||
outputs = make([]chan []record.RefSample, n)
|
||
)
|
||
wg.Add(n)
|
||
|
||
defer func() {
|
||
// For CorruptionErr ensure to terminate all workers before exiting.
|
||
if _, ok := err.(*wal.CorruptionErr); ok {
|
||
for i := 0; i < n; i++ {
|
||
close(inputs[i])
|
||
for range outputs[i] {
|
||
}
|
||
}
|
||
wg.Wait()
|
||
}
|
||
}()
|
||
|
||
for i := 0; i < n; i++ {
|
||
outputs[i] = make(chan []record.RefSample, 300)
|
||
inputs[i] = make(chan []record.RefSample, 300)
|
||
|
||
go func(input <-chan []record.RefSample, output chan<- []record.RefSample) {
|
||
unknown := h.processWALSamples(h.minValidTime, input, output)
|
||
atomic.AddUint64(&unknownRefs, unknown)
|
||
wg.Done()
|
||
}(inputs[i], outputs[i])
|
||
}
|
||
|
||
var (
|
||
dec record.Decoder
|
||
allStones = tombstones.NewMemTombstones()
|
||
shards = make([][]record.RefSample, n)
|
||
)
|
||
defer func() {
|
||
if err := allStones.Close(); err != nil {
|
||
level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err)
|
||
}
|
||
}()
|
||
|
||
var (
|
||
decoded = make(chan interface{}, 10)
|
||
errCh = make(chan error, 1)
|
||
seriesPool = sync.Pool{
|
||
New: func() interface{} {
|
||
return []record.RefSeries{}
|
||
},
|
||
}
|
||
samplesPool = sync.Pool{
|
||
New: func() interface{} {
|
||
return []record.RefSample{}
|
||
},
|
||
}
|
||
tstonesPool = sync.Pool{
|
||
New: func() interface{} {
|
||
return []tombstones.Stone{}
|
||
},
|
||
}
|
||
)
|
||
go func() {
|
||
defer close(decoded)
|
||
for r.Next() {
|
||
rec := r.Record()
|
||
switch dec.Type(rec) {
|
||
case record.Series:
|
||
series := seriesPool.Get().([]record.RefSeries)[:0]
|
||
series, err = dec.Series(rec, series)
|
||
if err != nil {
|
||
errCh <- &wal.CorruptionErr{
|
||
Err: errors.Wrap(err, "decode series"),
|
||
Segment: r.Segment(),
|
||
Offset: r.Offset(),
|
||
}
|
||
return
|
||
}
|
||
decoded <- series
|
||
case record.Samples:
|
||
samples := samplesPool.Get().([]record.RefSample)[:0]
|
||
samples, err = dec.Samples(rec, samples)
|
||
if err != nil {
|
||
errCh <- &wal.CorruptionErr{
|
||
Err: errors.Wrap(err, "decode samples"),
|
||
Segment: r.Segment(),
|
||
Offset: r.Offset(),
|
||
}
|
||
return
|
||
}
|
||
decoded <- samples
|
||
case record.Tombstones:
|
||
tstones := tstonesPool.Get().([]tombstones.Stone)[:0]
|
||
tstones, err = dec.Tombstones(rec, tstones)
|
||
if err != nil {
|
||
errCh <- &wal.CorruptionErr{
|
||
Err: errors.Wrap(err, "decode tombstones"),
|
||
Segment: r.Segment(),
|
||
Offset: r.Offset(),
|
||
}
|
||
return
|
||
}
|
||
decoded <- tstones
|
||
default:
|
||
errCh <- &wal.CorruptionErr{
|
||
Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
|
||
Segment: r.Segment(),
|
||
Offset: r.Offset(),
|
||
}
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
for d := range decoded {
|
||
switch v := d.(type) {
|
||
case []record.RefSeries:
|
||
for _, s := range v {
|
||
series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||
|
||
if !created {
|
||
// There's already a different ref for this series.
|
||
multiRef[s.Ref] = series.ref
|
||
}
|
||
|
||
if h.lastSeriesID < s.Ref {
|
||
h.lastSeriesID = s.Ref
|
||
}
|
||
}
|
||
//lint:ignore SA6002 relax staticcheck verification.
|
||
seriesPool.Put(v)
|
||
case []record.RefSample:
|
||
samples := v
|
||
// We split up the samples into chunks of 5000 samples or less.
|
||
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
||
// cause thousands of very large in flight buffers occupying large amounts
|
||
// of unused memory.
|
||
for len(samples) > 0 {
|
||
m := 5000
|
||
if len(samples) < m {
|
||
m = len(samples)
|
||
}
|
||
for i := 0; i < n; i++ {
|
||
var buf []record.RefSample
|
||
select {
|
||
case buf = <-outputs[i]:
|
||
default:
|
||
}
|
||
shards[i] = buf[:0]
|
||
}
|
||
for _, sam := range samples[:m] {
|
||
if r, ok := multiRef[sam.Ref]; ok {
|
||
sam.Ref = r
|
||
}
|
||
mod := sam.Ref % uint64(n)
|
||
shards[mod] = append(shards[mod], sam)
|
||
}
|
||
for i := 0; i < n; i++ {
|
||
inputs[i] <- shards[i]
|
||
}
|
||
samples = samples[m:]
|
||
}
|
||
//lint:ignore SA6002 relax staticcheck verification.
|
||
samplesPool.Put(v)
|
||
case []tombstones.Stone:
|
||
for _, s := range v {
|
||
for _, itv := range s.Intervals {
|
||
if itv.Maxt < h.minValidTime {
|
||
continue
|
||
}
|
||
if m := h.series.getByID(s.Ref); m == nil {
|
||
unknownRefs++
|
||
continue
|
||
}
|
||
allStones.AddInterval(s.Ref, itv)
|
||
}
|
||
}
|
||
//lint:ignore SA6002 relax staticcheck verification.
|
||
tstonesPool.Put(v)
|
||
default:
|
||
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
||
}
|
||
}
|
||
|
||
select {
|
||
case err := <-errCh:
|
||
return err
|
||
default:
|
||
}
|
||
|
||
// Signal termination to each worker and wait for it to close its output channel.
|
||
for i := 0; i < n; i++ {
|
||
close(inputs[i])
|
||
for range outputs[i] {
|
||
}
|
||
}
|
||
wg.Wait()
|
||
|
||
if r.Err() != nil {
|
||
return errors.Wrap(r.Err(), "read records")
|
||
}
|
||
|
||
if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error {
|
||
return h.chunkRewrite(ref, dranges)
|
||
}); err != nil {
|
||
return errors.Wrap(r.Err(), "deleting samples from tombstones")
|
||
}
|
||
|
||
if unknownRefs > 0 {
|
||
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Init loads data from the write ahead log and prepares the head for writes.
|
||
// It should be called before using an appender so that
|
||
// limits the ingested samples to the head min valid time.
|
||
func (h *Head) Init(minValidTime int64) error {
|
||
h.minValidTime = minValidTime
|
||
defer h.postings.EnsureOrder()
|
||
defer h.gc() // After loading the wal remove the obsolete data from the head.
|
||
|
||
if h.wal == nil {
|
||
return nil
|
||
}
|
||
|
||
level.Info(h.logger).Log("msg", "replaying WAL, this may take awhile")
|
||
// Backfill the checkpoint first if it exists.
|
||
dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir())
|
||
if err != nil && err != record.ErrNotFound {
|
||
return errors.Wrap(err, "find last checkpoint")
|
||
}
|
||
multiRef := map[uint64]uint64{}
|
||
if err == nil {
|
||
sr, err := wal.NewSegmentsReader(dir)
|
||
if err != nil {
|
||
return errors.Wrap(err, "open checkpoint")
|
||
}
|
||
defer func() {
|
||
if err := sr.Close(); err != nil {
|
||
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
||
}
|
||
}()
|
||
|
||
// A corrupted checkpoint is a hard error for now and requires user
|
||
// intervention. There's likely little data that can be recovered anyway.
|
||
if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil {
|
||
return errors.Wrap(err, "backfill checkpoint")
|
||
}
|
||
startFrom++
|
||
level.Info(h.logger).Log("msg", "WAL checkpoint loaded")
|
||
}
|
||
|
||
// Find the last segment.
|
||
_, last, err := h.wal.Segments()
|
||
if err != nil {
|
||
return errors.Wrap(err, "finding WAL segments")
|
||
}
|
||
|
||
// Backfill segments from the most recent checkpoint onwards.
|
||
for i := startFrom; i <= last; i++ {
|
||
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
|
||
if err != nil {
|
||
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
|
||
}
|
||
|
||
sr := wal.NewSegmentBufReader(s)
|
||
err = h.loadWAL(wal.NewReader(sr), multiRef)
|
||
if err := sr.Close(); err != nil {
|
||
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
||
}
|
||
if err != nil {
|
||
return err
|
||
}
|
||
level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Truncate removes old data before mint from the head.
|
||
func (h *Head) Truncate(mint int64) (err error) {
|
||
defer func() {
|
||
if err != nil {
|
||
h.metrics.headTruncateFail.Inc()
|
||
}
|
||
}()
|
||
initialize := h.MinTime() == math.MaxInt64
|
||
|
||
if h.MinTime() >= mint && !initialize {
|
||
return nil
|
||
}
|
||
atomic.StoreInt64(&h.minTime, mint)
|
||
atomic.StoreInt64(&h.minValidTime, mint)
|
||
|
||
// Ensure that max time is at least as high as min time.
|
||
for h.MaxTime() < mint {
|
||
atomic.CompareAndSwapInt64(&h.maxTime, h.MaxTime(), mint)
|
||
}
|
||
|
||
// This was an initial call to Truncate after loading blocks on startup.
|
||
// We haven't read back the WAL yet, so do not attempt to truncate it.
|
||
if initialize {
|
||
return nil
|
||
}
|
||
|
||
h.metrics.headTruncateTotal.Inc()
|
||
start := time.Now()
|
||
|
||
h.gc()
|
||
level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
|
||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||
|
||
if h.wal == nil {
|
||
return nil
|
||
}
|
||
start = time.Now()
|
||
|
||
first, last, err := h.wal.Segments()
|
||
if err != nil {
|
||
return errors.Wrap(err, "get segment range")
|
||
}
|
||
// Start a new segment, so low ingestion volume TSDB don't have more WAL than
|
||
// needed.
|
||
err = h.wal.NextSegment()
|
||
if err != nil {
|
||
return errors.Wrap(err, "next segment")
|
||
}
|
||
last-- // Never consider last segment for checkpoint.
|
||
if last < 0 {
|
||
return nil // no segments yet.
|
||
}
|
||
// The lower third of segments should contain mostly obsolete samples.
|
||
// If we have less than three segments, it's not worth checkpointing yet.
|
||
last = first + (last-first)/3
|
||
if last <= first {
|
||
return nil
|
||
}
|
||
|
||
keep := func(id uint64) bool {
|
||
if h.series.getByID(id) != nil {
|
||
return true
|
||
}
|
||
h.deletedMtx.Lock()
|
||
_, ok := h.deleted[id]
|
||
h.deletedMtx.Unlock()
|
||
return ok
|
||
}
|
||
h.metrics.checkpointCreationTotal.Inc()
|
||
if _, err = wal.Checkpoint(h.wal, first, last, keep, mint); err != nil {
|
||
h.metrics.checkpointCreationFail.Inc()
|
||
return errors.Wrap(err, "create checkpoint")
|
||
}
|
||
if err := h.wal.Truncate(last + 1); err != nil {
|
||
// If truncating fails, we'll just try again at the next checkpoint.
|
||
// Leftover segments will just be ignored in the future if there's a checkpoint
|
||
// that supersedes them.
|
||
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
|
||
}
|
||
|
||
// The checkpoint is written and segments before it is truncated, so we no
|
||
// longer need to track deleted series that are before it.
|
||
h.deletedMtx.Lock()
|
||
for ref, segment := range h.deleted {
|
||
if segment < first {
|
||
delete(h.deleted, ref)
|
||
}
|
||
}
|
||
h.deletedMtx.Unlock()
|
||
|
||
h.metrics.checkpointDeleteTotal.Inc()
|
||
if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
|
||
// Leftover old checkpoints do not cause problems down the line beyond
|
||
// occupying disk space.
|
||
// They will just be ignored since a higher checkpoint exists.
|
||
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
|
||
h.metrics.checkpointDeleteFail.Inc()
|
||
}
|
||
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
|
||
|
||
level.Info(h.logger).Log("msg", "WAL checkpoint complete",
|
||
"first", first, "last", last, "duration", time.Since(start))
|
||
|
||
return nil
|
||
}
|
||
|
||
// initTime initializes a head with the first timestamp. This only needs to be called
|
||
// for a completely fresh head with an empty WAL.
|
||
// Returns true if the initialization took an effect.
|
||
func (h *Head) initTime(t int64) (initialized bool) {
|
||
if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) {
|
||
return false
|
||
}
|
||
// Ensure that max time is initialized to at least the min time we just set.
|
||
// Concurrent appenders may already have set it to a higher value.
|
||
atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t)
|
||
|
||
return true
|
||
}
|
||
|
||
type rangeHead struct {
|
||
head *Head
|
||
mint, maxt int64
|
||
}
|
||
|
||
func (h *rangeHead) Index() (IndexReader, error) {
|
||
return h.head.indexRange(h.mint, h.maxt), nil
|
||
}
|
||
|
||
func (h *rangeHead) Chunks() (ChunkReader, error) {
|
||
return h.head.chunksRange(h.mint, h.maxt), nil
|
||
}
|
||
|
||
func (h *rangeHead) Tombstones() (tombstones.Reader, error) {
|
||
return emptyTombstoneReader, nil
|
||
}
|
||
|
||
func (h *rangeHead) MinTime() int64 {
|
||
return h.mint
|
||
}
|
||
|
||
func (h *rangeHead) MaxTime() int64 {
|
||
return h.maxt
|
||
}
|
||
|
||
func (h *rangeHead) NumSeries() uint64 {
|
||
return h.head.NumSeries()
|
||
}
|
||
|
||
func (h *rangeHead) Meta() BlockMeta {
|
||
return BlockMeta{
|
||
MinTime: h.MinTime(),
|
||
MaxTime: h.MaxTime(),
|
||
ULID: h.head.Meta().ULID,
|
||
Stats: BlockStats{
|
||
NumSeries: h.NumSeries(),
|
||
},
|
||
}
|
||
}
|
||
|
||
// initAppender is a helper to initialize the time bounds of the head
|
||
// upon the first sample it receives.
|
||
type initAppender struct {
|
||
app Appender
|
||
head *Head
|
||
}
|
||
|
||
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||
if a.app != nil {
|
||
return a.app.Add(lset, t, v)
|
||
}
|
||
a.head.initTime(t)
|
||
a.app = a.head.appender()
|
||
|
||
return a.app.Add(lset, t, v)
|
||
}
|
||
|
||
func (a *initAppender) AddFast(ref uint64, t int64, v float64) error {
|
||
if a.app == nil {
|
||
return ErrNotFound
|
||
}
|
||
return a.app.AddFast(ref, t, v)
|
||
}
|
||
|
||
func (a *initAppender) Commit() error {
|
||
if a.app == nil {
|
||
return nil
|
||
}
|
||
return a.app.Commit()
|
||
}
|
||
|
||
func (a *initAppender) Rollback() error {
|
||
if a.app == nil {
|
||
return nil
|
||
}
|
||
return a.app.Rollback()
|
||
}
|
||
|
||
// Appender returns a new Appender on the database.
|
||
func (h *Head) Appender() Appender {
|
||
h.metrics.activeAppenders.Inc()
|
||
|
||
// The head cache might not have a starting point yet. The init appender
|
||
// picks up the first appended timestamp as the base.
|
||
if h.MinTime() == math.MaxInt64 {
|
||
return &initAppender{head: h}
|
||
}
|
||
return h.appender()
|
||
}
|
||
|
||
func (h *Head) appender() *headAppender {
|
||
return &headAppender{
|
||
head: h,
|
||
// Set the minimum valid time to whichever is greater the head min valid time or the compaciton window.
|
||
// This ensures that no samples will be added within the compaction window to avoid races.
|
||
minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2),
|
||
mint: math.MaxInt64,
|
||
maxt: math.MinInt64,
|
||
samples: h.getAppendBuffer(),
|
||
sampleSeries: h.getSeriesBuffer(),
|
||
}
|
||
}
|
||
|
||
func max(a, b int64) int64 {
|
||
if a > b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
|
||
func (h *Head) getAppendBuffer() []record.RefSample {
|
||
b := h.appendPool.Get()
|
||
if b == nil {
|
||
return make([]record.RefSample, 0, 512)
|
||
}
|
||
return b.([]record.RefSample)
|
||
}
|
||
|
||
func (h *Head) putAppendBuffer(b []record.RefSample) {
|
||
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
||
h.appendPool.Put(b[:0])
|
||
}
|
||
|
||
func (h *Head) getSeriesBuffer() []*memSeries {
|
||
b := h.seriesPool.Get()
|
||
if b == nil {
|
||
return make([]*memSeries, 0, 512)
|
||
}
|
||
return b.([]*memSeries)
|
||
}
|
||
|
||
func (h *Head) putSeriesBuffer(b []*memSeries) {
|
||
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
||
h.seriesPool.Put(b[:0])
|
||
}
|
||
|
||
func (h *Head) getBytesBuffer() []byte {
|
||
b := h.bytesPool.Get()
|
||
if b == nil {
|
||
return make([]byte, 0, 1024)
|
||
}
|
||
return b.([]byte)
|
||
}
|
||
|
||
func (h *Head) putBytesBuffer(b []byte) {
|
||
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
||
h.bytesPool.Put(b[:0])
|
||
}
|
||
|
||
type headAppender struct {
|
||
head *Head
|
||
minValidTime int64 // No samples below this timestamp are allowed.
|
||
mint, maxt int64
|
||
|
||
series []record.RefSeries
|
||
samples []record.RefSample
|
||
sampleSeries []*memSeries
|
||
}
|
||
|
||
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||
if t < a.minValidTime {
|
||
return 0, ErrOutOfBounds
|
||
}
|
||
|
||
// Ensure no empty labels have gotten through.
|
||
lset = lset.WithoutEmpty()
|
||
|
||
s, created := a.head.getOrCreate(lset.Hash(), lset)
|
||
if created {
|
||
a.series = append(a.series, record.RefSeries{
|
||
Ref: s.ref,
|
||
Labels: lset,
|
||
})
|
||
}
|
||
return s.ref, a.AddFast(s.ref, t, v)
|
||
}
|
||
|
||
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||
if t < a.minValidTime {
|
||
return ErrOutOfBounds
|
||
}
|
||
|
||
s := a.head.series.getByID(ref)
|
||
if s == nil {
|
||
return errors.Wrap(ErrNotFound, "unknown series")
|
||
}
|
||
s.Lock()
|
||
if err := s.appendable(t, v); err != nil {
|
||
s.Unlock()
|
||
return err
|
||
}
|
||
s.pendingCommit = true
|
||
s.Unlock()
|
||
|
||
if t < a.mint {
|
||
a.mint = t
|
||
}
|
||
if t > a.maxt {
|
||
a.maxt = t
|
||
}
|
||
|
||
a.samples = append(a.samples, record.RefSample{
|
||
Ref: ref,
|
||
T: t,
|
||
V: v,
|
||
})
|
||
a.sampleSeries = append(a.sampleSeries, s)
|
||
return nil
|
||
}
|
||
|
||
func (a *headAppender) log() error {
|
||
if a.head.wal == nil {
|
||
return nil
|
||
}
|
||
|
||
buf := a.head.getBytesBuffer()
|
||
defer func() { a.head.putBytesBuffer(buf) }()
|
||
|
||
var rec []byte
|
||
var enc record.Encoder
|
||
|
||
if len(a.series) > 0 {
|
||
rec = enc.Series(a.series, buf)
|
||
buf = rec[:0]
|
||
|
||
if err := a.head.wal.Log(rec); err != nil {
|
||
return errors.Wrap(err, "log series")
|
||
}
|
||
}
|
||
if len(a.samples) > 0 {
|
||
rec = enc.Samples(a.samples, buf)
|
||
buf = rec[:0]
|
||
|
||
if err := a.head.wal.Log(rec); err != nil {
|
||
return errors.Wrap(err, "log samples")
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (a *headAppender) Commit() error {
|
||
defer a.head.metrics.activeAppenders.Dec()
|
||
defer a.head.putAppendBuffer(a.samples)
|
||
defer a.head.putSeriesBuffer(a.sampleSeries)
|
||
|
||
if err := a.log(); err != nil {
|
||
return errors.Wrap(err, "write to WAL")
|
||
}
|
||
|
||
total := len(a.samples)
|
||
var series *memSeries
|
||
for i, s := range a.samples {
|
||
series = a.sampleSeries[i]
|
||
series.Lock()
|
||
ok, chunkCreated := series.append(s.T, s.V)
|
||
series.pendingCommit = false
|
||
series.Unlock()
|
||
|
||
if !ok {
|
||
total--
|
||
}
|
||
if chunkCreated {
|
||
a.head.metrics.chunks.Inc()
|
||
a.head.metrics.chunksCreated.Inc()
|
||
}
|
||
}
|
||
|
||
a.head.metrics.samplesAppended.Add(float64(total))
|
||
a.head.updateMinMaxTime(a.mint, a.maxt)
|
||
|
||
return nil
|
||
}
|
||
|
||
func (a *headAppender) Rollback() error {
|
||
a.head.metrics.activeAppenders.Dec()
|
||
var series *memSeries
|
||
for i := range a.samples {
|
||
series = a.sampleSeries[i]
|
||
series.Lock()
|
||
series.pendingCommit = false
|
||
series.Unlock()
|
||
}
|
||
a.head.putAppendBuffer(a.samples)
|
||
|
||
// Series are created in the head memory regardless of rollback. Thus we have
|
||
// to log them to the WAL in any case.
|
||
a.samples = nil
|
||
return a.log()
|
||
}
|
||
|
||
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
||
// label matchers.
|
||
func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
|
||
// Do not delete anything beyond the currently valid range.
|
||
mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime())
|
||
|
||
ir := h.indexRange(mint, maxt)
|
||
|
||
p, err := PostingsForMatchers(ir, ms...)
|
||
if err != nil {
|
||
return errors.Wrap(err, "select series")
|
||
}
|
||
|
||
var stones []tombstones.Stone
|
||
dirty := false
|
||
for p.Next() {
|
||
series := h.series.getByID(p.At())
|
||
|
||
t0, t1 := series.minTime(), series.maxTime()
|
||
if t0 == math.MinInt64 || t1 == math.MinInt64 {
|
||
continue
|
||
}
|
||
// Delete only until the current values and not beyond.
|
||
t0, t1 = clampInterval(mint, maxt, t0, t1)
|
||
if h.wal != nil {
|
||
stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}})
|
||
}
|
||
if err := h.chunkRewrite(p.At(), tombstones.Intervals{{Mint: t0, Maxt: t1}}); err != nil {
|
||
return errors.Wrap(err, "delete samples")
|
||
}
|
||
dirty = true
|
||
}
|
||
if p.Err() != nil {
|
||
return p.Err()
|
||
}
|
||
var enc record.Encoder
|
||
if h.wal != nil {
|
||
// Although we don't store the stones in the head
|
||
// we need to write them to the WAL to mark these as deleted
|
||
// after a restart while loading the WAL.
|
||
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
if dirty {
|
||
h.gc()
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// chunkRewrite re-writes the chunks which overlaps with deleted ranges
|
||
// and removes the samples in the deleted ranges.
|
||
// Chunks is deleted if no samples are left at the end.
|
||
func (h *Head) chunkRewrite(ref uint64, dranges tombstones.Intervals) (err error) {
|
||
if len(dranges) == 0 {
|
||
return nil
|
||
}
|
||
|
||
ms := h.series.getByID(ref)
|
||
ms.Lock()
|
||
defer ms.Unlock()
|
||
if len(ms.chunks) == 0 {
|
||
return nil
|
||
}
|
||
|
||
metas := ms.chunksMetas()
|
||
mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime
|
||
it := newChunkSeriesIterator(metas, dranges, mint, maxt)
|
||
|
||
ms.reset()
|
||
for it.Next() {
|
||
t, v := it.At()
|
||
ok, _ := ms.append(t, v)
|
||
if !ok {
|
||
level.Warn(h.logger).Log("msg", "failed to add sample during delete")
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// gc removes data before the minimum timestamp from the head.
|
||
func (h *Head) gc() {
|
||
// Only data strictly lower than this timestamp must be deleted.
|
||
mint := h.MinTime()
|
||
|
||
// Drop old chunks and remember series IDs and hashes if they can be
|
||
// deleted entirely.
|
||
deleted, chunksRemoved := h.series.gc(mint)
|
||
seriesRemoved := len(deleted)
|
||
|
||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||
// Using AddUint64 to subtract series removed.
|
||
// See: https://golang.org/pkg/sync/atomic/#AddUint64.
|
||
atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1))
|
||
|
||
// Remove deleted series IDs from the postings lists.
|
||
h.postings.Delete(deleted)
|
||
|
||
if h.wal != nil {
|
||
_, last, _ := h.wal.Segments()
|
||
h.deletedMtx.Lock()
|
||
// Keep series records until we're past segment 'last'
|
||
// because the WAL will still have samples records with
|
||
// this ref ID. If we didn't keep these series records then
|
||
// on start up when we replay the WAL, or any other code
|
||
// that reads the WAL, wouldn't be able to use those
|
||
// samples since we would have no labels for that ref ID.
|
||
for ref := range deleted {
|
||
h.deleted[ref] = last
|
||
}
|
||
h.deletedMtx.Unlock()
|
||
}
|
||
|
||
// Rebuild symbols and label value indices from what is left in the postings terms.
|
||
symbols := make(map[string]struct{}, len(h.symbols))
|
||
values := make(map[string]stringset, len(h.values))
|
||
|
||
if err := h.postings.Iter(func(t labels.Label, _ index.Postings) error {
|
||
symbols[t.Name] = struct{}{}
|
||
symbols[t.Value] = struct{}{}
|
||
|
||
ss, ok := values[t.Name]
|
||
if !ok {
|
||
ss = stringset{}
|
||
values[t.Name] = ss
|
||
}
|
||
ss.set(t.Value)
|
||
return nil
|
||
}); err != nil {
|
||
// This should never happen, as the iteration function only returns nil.
|
||
panic(err)
|
||
}
|
||
|
||
h.symMtx.Lock()
|
||
|
||
h.symbols = symbols
|
||
h.values = values
|
||
|
||
h.symMtx.Unlock()
|
||
}
|
||
|
||
// Tombstones returns a new reader over the head's tombstones
|
||
func (h *Head) Tombstones() (tombstones.Reader, error) {
|
||
return emptyTombstoneReader, nil
|
||
}
|
||
|
||
// Index returns an IndexReader against the block.
|
||
func (h *Head) Index() (IndexReader, error) {
|
||
return h.indexRange(math.MinInt64, math.MaxInt64), nil
|
||
}
|
||
|
||
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
|
||
if hmin := h.MinTime(); hmin > mint {
|
||
mint = hmin
|
||
}
|
||
return &headIndexReader{head: h, mint: mint, maxt: maxt}
|
||
}
|
||
|
||
// Chunks returns a ChunkReader against the block.
|
||
func (h *Head) Chunks() (ChunkReader, error) {
|
||
return h.chunksRange(math.MinInt64, math.MaxInt64), nil
|
||
}
|
||
|
||
func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
|
||
if hmin := h.MinTime(); hmin > mint {
|
||
mint = hmin
|
||
}
|
||
return &headChunkReader{head: h, mint: mint, maxt: maxt}
|
||
}
|
||
|
||
// NumSeries returns the number of active series in the head.
|
||
func (h *Head) NumSeries() uint64 {
|
||
return atomic.LoadUint64(&h.numSeries)
|
||
}
|
||
|
||
// Meta returns meta information about the head.
|
||
// The head is dynamic so will return dynamic results.
|
||
func (h *Head) Meta() BlockMeta {
|
||
var id [16]byte
|
||
copy(id[:], "______head______")
|
||
return BlockMeta{
|
||
MinTime: h.MinTime(),
|
||
MaxTime: h.MaxTime(),
|
||
ULID: ulid.ULID(id),
|
||
Stats: BlockStats{
|
||
NumSeries: h.NumSeries(),
|
||
},
|
||
}
|
||
}
|
||
|
||
// MinTime returns the lowest time bound on visible data in the head.
|
||
func (h *Head) MinTime() int64 {
|
||
return atomic.LoadInt64(&h.minTime)
|
||
}
|
||
|
||
// MaxTime returns the highest timestamp seen in data of the head.
|
||
func (h *Head) MaxTime() int64 {
|
||
return atomic.LoadInt64(&h.maxTime)
|
||
}
|
||
|
||
// compactable returns whether the head has a compactable range.
|
||
// The head has a compactable range when the head time range is 1.5 times the chunk range.
|
||
// The 0.5 acts as a buffer of the appendable window.
|
||
func (h *Head) compactable() bool {
|
||
return h.MaxTime()-h.MinTime() > h.chunkRange/2*3
|
||
}
|
||
|
||
// Close flushes the WAL and closes the head.
|
||
func (h *Head) Close() error {
|
||
if h.wal == nil {
|
||
return nil
|
||
}
|
||
return h.wal.Close()
|
||
}
|
||
|
||
type headChunkReader struct {
|
||
head *Head
|
||
mint, maxt int64
|
||
}
|
||
|
||
func (h *headChunkReader) Close() error {
|
||
return nil
|
||
}
|
||
|
||
// packChunkID packs a seriesID and a chunkID within it into a global 8 byte ID.
|
||
// It panicks if the seriesID exceeds 5 bytes or the chunk ID 3 bytes.
|
||
func packChunkID(seriesID, chunkID uint64) uint64 {
|
||
if seriesID > (1<<40)-1 {
|
||
panic("series ID exceeds 5 bytes")
|
||
}
|
||
if chunkID > (1<<24)-1 {
|
||
panic("chunk ID exceeds 3 bytes")
|
||
}
|
||
return (seriesID << 24) | chunkID
|
||
}
|
||
|
||
func unpackChunkID(id uint64) (seriesID, chunkID uint64) {
|
||
return id >> 24, (id << 40) >> 40
|
||
}
|
||
|
||
// Chunk returns the chunk for the reference number.
|
||
func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
|
||
sid, cid := unpackChunkID(ref)
|
||
|
||
s := h.head.series.getByID(sid)
|
||
// This means that the series has been garbage collected.
|
||
if s == nil {
|
||
return nil, ErrNotFound
|
||
}
|
||
|
||
s.Lock()
|
||
c := s.chunk(int(cid))
|
||
|
||
// This means that the chunk has been garbage collected or is outside
|
||
// the specified range.
|
||
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
||
s.Unlock()
|
||
return nil, ErrNotFound
|
||
}
|
||
s.Unlock()
|
||
|
||
return &safeChunk{
|
||
Chunk: c.chunk,
|
||
s: s,
|
||
cid: int(cid),
|
||
}, nil
|
||
}
|
||
|
||
type safeChunk struct {
|
||
chunkenc.Chunk
|
||
s *memSeries
|
||
cid int
|
||
}
|
||
|
||
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
|
||
c.s.Lock()
|
||
it := c.s.iterator(c.cid, reuseIter)
|
||
c.s.Unlock()
|
||
return it
|
||
}
|
||
|
||
type headIndexReader struct {
|
||
head *Head
|
||
mint, maxt int64
|
||
}
|
||
|
||
func (h *headIndexReader) Close() error {
|
||
return nil
|
||
}
|
||
|
||
func (h *headIndexReader) Symbols() (map[string]struct{}, error) {
|
||
h.head.symMtx.RLock()
|
||
defer h.head.symMtx.RUnlock()
|
||
|
||
res := make(map[string]struct{}, len(h.head.symbols))
|
||
|
||
for s := range h.head.symbols {
|
||
res[s] = struct{}{}
|
||
}
|
||
return res, nil
|
||
}
|
||
|
||
// LabelValues returns the possible label values
|
||
func (h *headIndexReader) LabelValues(names ...string) (index.StringTuples, error) {
|
||
if len(names) != 1 {
|
||
return nil, encoding.ErrInvalidSize
|
||
}
|
||
|
||
h.head.symMtx.RLock()
|
||
sl := make([]string, 0, len(h.head.values[names[0]]))
|
||
for s := range h.head.values[names[0]] {
|
||
sl = append(sl, s)
|
||
}
|
||
h.head.symMtx.RUnlock()
|
||
sort.Strings(sl)
|
||
|
||
return index.NewStringTuples(sl, len(names))
|
||
}
|
||
|
||
// LabelNames returns all the unique label names present in the head.
|
||
func (h *headIndexReader) LabelNames() ([]string, error) {
|
||
h.head.symMtx.RLock()
|
||
defer h.head.symMtx.RUnlock()
|
||
labelNames := make([]string, 0, len(h.head.values))
|
||
for name := range h.head.values {
|
||
if name == "" {
|
||
continue
|
||
}
|
||
labelNames = append(labelNames, name)
|
||
}
|
||
sort.Strings(labelNames)
|
||
return labelNames, nil
|
||
}
|
||
|
||
// Postings returns the postings list iterator for the label pairs.
|
||
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
|
||
res := make([]index.Postings, 0, len(values))
|
||
for _, value := range values {
|
||
res = append(res, h.head.postings.Get(name, value))
|
||
}
|
||
return index.Merge(res...), nil
|
||
}
|
||
|
||
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||
series := make([]*memSeries, 0, 128)
|
||
|
||
// Fetch all the series only once.
|
||
for p.Next() {
|
||
s := h.head.series.getByID(p.At())
|
||
if s == nil {
|
||
level.Debug(h.head.logger).Log("msg", "looked up series not found")
|
||
} else {
|
||
series = append(series, s)
|
||
}
|
||
}
|
||
if err := p.Err(); err != nil {
|
||
return index.ErrPostings(errors.Wrap(err, "expand postings"))
|
||
}
|
||
|
||
sort.Slice(series, func(i, j int) bool {
|
||
return labels.Compare(series[i].lset, series[j].lset) < 0
|
||
})
|
||
|
||
// Convert back to list.
|
||
ep := make([]uint64, 0, len(series))
|
||
for _, p := range series {
|
||
ep = append(ep, p.ref)
|
||
}
|
||
return index.NewListPostings(ep)
|
||
}
|
||
|
||
// Series returns the series for the given reference.
|
||
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
||
s := h.head.series.getByID(ref)
|
||
|
||
if s == nil {
|
||
h.head.metrics.seriesNotFound.Inc()
|
||
return ErrNotFound
|
||
}
|
||
*lbls = append((*lbls)[:0], s.lset...)
|
||
|
||
s.Lock()
|
||
defer s.Unlock()
|
||
|
||
*chks = (*chks)[:0]
|
||
|
||
for i, c := range s.chunks {
|
||
// Do not expose chunks that are outside of the specified range.
|
||
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
||
continue
|
||
}
|
||
// Set the head chunks as open (being appended to).
|
||
maxTime := c.maxTime
|
||
if s.headChunk == c {
|
||
maxTime = math.MaxInt64
|
||
}
|
||
|
||
*chks = append(*chks, chunks.Meta{
|
||
MinTime: c.minTime,
|
||
MaxTime: maxTime,
|
||
Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
|
||
})
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
||
h.head.symMtx.RLock()
|
||
defer h.head.symMtx.RUnlock()
|
||
res := [][]string{}
|
||
for s := range h.head.values {
|
||
res = append(res, []string{s})
|
||
}
|
||
return res, nil
|
||
}
|
||
|
||
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||
// Just using `getOrSet` below would be semantically sufficient, but we'd create
|
||
// a new series on every sample inserted via Add(), which causes allocations
|
||
// and makes our series IDs rather random and harder to compress in postings.
|
||
s := h.series.getByHash(hash, lset)
|
||
if s != nil {
|
||
return s, false
|
||
}
|
||
|
||
// Optimistically assume that we are the first one to create the series.
|
||
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
||
|
||
return h.getOrCreateWithID(id, hash, lset)
|
||
}
|
||
|
||
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||
s := newMemSeries(lset, id, h.chunkRange)
|
||
|
||
s, created := h.series.getOrSet(hash, s)
|
||
if !created {
|
||
return s, false
|
||
}
|
||
|
||
h.metrics.seriesCreated.Inc()
|
||
atomic.AddUint64(&h.numSeries, 1)
|
||
|
||
h.postings.Add(id, lset)
|
||
|
||
h.symMtx.Lock()
|
||
defer h.symMtx.Unlock()
|
||
|
||
for _, l := range lset {
|
||
valset, ok := h.values[l.Name]
|
||
if !ok {
|
||
valset = stringset{}
|
||
h.values[l.Name] = valset
|
||
}
|
||
valset.set(l.Value)
|
||
|
||
h.symbols[l.Name] = struct{}{}
|
||
h.symbols[l.Value] = struct{}{}
|
||
}
|
||
|
||
return s, true
|
||
}
|
||
|
||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
||
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
|
||
// Its methods require the hash to be submitted with it to avoid re-computations throughout
|
||
// the code.
|
||
type seriesHashmap map[uint64][]*memSeries
|
||
|
||
func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
|
||
for _, s := range m[hash] {
|
||
if labels.Equal(s.lset, lset) {
|
||
return s
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (m seriesHashmap) set(hash uint64, s *memSeries) {
|
||
l := m[hash]
|
||
for i, prev := range l {
|
||
if labels.Equal(prev.lset, s.lset) {
|
||
l[i] = s
|
||
return
|
||
}
|
||
}
|
||
m[hash] = append(l, s)
|
||
}
|
||
|
||
func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
|
||
var rem []*memSeries
|
||
for _, s := range m[hash] {
|
||
if !labels.Equal(s.lset, lset) {
|
||
rem = append(rem, s)
|
||
}
|
||
}
|
||
if len(rem) == 0 {
|
||
delete(m, hash)
|
||
} else {
|
||
m[hash] = rem
|
||
}
|
||
}
|
||
|
||
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention.
|
||
// The locks are padded to not be on the same cache line. Filling the padded space
|
||
// with the maps was profiled to be slower – likely due to the additional pointer
|
||
// dereferences.
|
||
type stripeSeries struct {
|
||
series [stripeSize]map[uint64]*memSeries
|
||
hashes [stripeSize]seriesHashmap
|
||
locks [stripeSize]stripeLock
|
||
}
|
||
|
||
const (
|
||
stripeSize = 1 << 14
|
||
stripeMask = stripeSize - 1
|
||
)
|
||
|
||
type stripeLock struct {
|
||
sync.RWMutex
|
||
// Padding to avoid multiple locks being on the same cache line.
|
||
_ [40]byte
|
||
}
|
||
|
||
func newStripeSeries() *stripeSeries {
|
||
s := &stripeSeries{}
|
||
|
||
for i := range s.series {
|
||
s.series[i] = map[uint64]*memSeries{}
|
||
}
|
||
for i := range s.hashes {
|
||
s.hashes[i] = seriesHashmap{}
|
||
}
|
||
return s
|
||
}
|
||
|
||
// gc garbage collects old chunks that are strictly before mint and removes
|
||
// series entirely that have no chunks left.
|
||
func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
|
||
var (
|
||
deleted = map[uint64]struct{}{}
|
||
rmChunks = 0
|
||
)
|
||
// Run through all series and truncate old chunks. Mark those with no
|
||
// chunks left as deleted and store their ID.
|
||
for i := 0; i < stripeSize; i++ {
|
||
s.locks[i].Lock()
|
||
|
||
for hash, all := range s.hashes[i] {
|
||
for _, series := range all {
|
||
series.Lock()
|
||
rmChunks += series.truncateChunksBefore(mint)
|
||
|
||
if len(series.chunks) > 0 || series.pendingCommit {
|
||
series.Unlock()
|
||
continue
|
||
}
|
||
|
||
// The series is gone entirely. We need to keep the series lock
|
||
// and make sure we have acquired the stripe locks for hash and ID of the
|
||
// series alike.
|
||
// If we don't hold them all, there's a very small chance that a series receives
|
||
// samples again while we are half-way into deleting it.
|
||
j := int(series.ref & stripeMask)
|
||
|
||
if i != j {
|
||
s.locks[j].Lock()
|
||
}
|
||
|
||
deleted[series.ref] = struct{}{}
|
||
s.hashes[i].del(hash, series.lset)
|
||
delete(s.series[j], series.ref)
|
||
|
||
if i != j {
|
||
s.locks[j].Unlock()
|
||
}
|
||
|
||
series.Unlock()
|
||
}
|
||
}
|
||
|
||
s.locks[i].Unlock()
|
||
}
|
||
|
||
return deleted, rmChunks
|
||
}
|
||
|
||
func (s *stripeSeries) getByID(id uint64) *memSeries {
|
||
i := id & stripeMask
|
||
|
||
s.locks[i].RLock()
|
||
series := s.series[i][id]
|
||
s.locks[i].RUnlock()
|
||
|
||
return series
|
||
}
|
||
|
||
func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
|
||
i := hash & stripeMask
|
||
|
||
s.locks[i].RLock()
|
||
series := s.hashes[i].get(hash, lset)
|
||
s.locks[i].RUnlock()
|
||
|
||
return series
|
||
}
|
||
|
||
func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
|
||
i := hash & stripeMask
|
||
|
||
s.locks[i].Lock()
|
||
|
||
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
|
||
s.locks[i].Unlock()
|
||
return prev, false
|
||
}
|
||
s.hashes[i].set(hash, series)
|
||
s.locks[i].Unlock()
|
||
|
||
i = series.ref & stripeMask
|
||
|
||
s.locks[i].Lock()
|
||
s.series[i][series.ref] = series
|
||
s.locks[i].Unlock()
|
||
|
||
return series, true
|
||
}
|
||
|
||
type sample struct {
|
||
t int64
|
||
v float64
|
||
}
|
||
|
||
func (s sample) T() int64 {
|
||
return s.t
|
||
}
|
||
|
||
func (s sample) V() float64 {
|
||
return s.v
|
||
}
|
||
|
||
// memSeries is the in-memory representation of a series. None of its methods
|
||
// are goroutine safe and it is the caller's responsibility to lock it.
|
||
type memSeries struct {
|
||
sync.Mutex
|
||
|
||
ref uint64
|
||
lset labels.Labels
|
||
chunks []*memChunk
|
||
headChunk *memChunk
|
||
chunkRange int64
|
||
firstChunkID int
|
||
|
||
nextAt int64 // Timestamp at which to cut the next chunk.
|
||
sampleBuf [4]sample
|
||
pendingCommit bool // Whether there are samples waiting to be committed to this series.
|
||
|
||
app chunkenc.Appender // Current appender for the chunk.
|
||
}
|
||
|
||
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
|
||
s := &memSeries{
|
||
lset: lset,
|
||
ref: id,
|
||
chunkRange: chunkRange,
|
||
nextAt: math.MinInt64,
|
||
}
|
||
return s
|
||
}
|
||
|
||
func (s *memSeries) minTime() int64 {
|
||
if len(s.chunks) == 0 {
|
||
return math.MinInt64
|
||
}
|
||
return s.chunks[0].minTime
|
||
}
|
||
|
||
func (s *memSeries) maxTime() int64 {
|
||
c := s.head()
|
||
if c == nil {
|
||
return math.MinInt64
|
||
}
|
||
return c.maxTime
|
||
}
|
||
|
||
func (s *memSeries) cut(mint int64) *memChunk {
|
||
c := &memChunk{
|
||
chunk: chunkenc.NewXORChunk(),
|
||
minTime: mint,
|
||
maxTime: math.MinInt64,
|
||
}
|
||
s.chunks = append(s.chunks, c)
|
||
s.headChunk = c
|
||
|
||
// Set upper bound on when the next chunk must be started. An earlier timestamp
|
||
// may be chosen dynamically at a later point.
|
||
s.nextAt = rangeForTimestamp(mint, s.chunkRange)
|
||
|
||
app, err := c.chunk.Appender()
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
s.app = app
|
||
return c
|
||
}
|
||
|
||
func (s *memSeries) chunksMetas() []chunks.Meta {
|
||
metas := make([]chunks.Meta, 0, len(s.chunks))
|
||
for _, chk := range s.chunks {
|
||
metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime})
|
||
}
|
||
return metas
|
||
}
|
||
|
||
// reset re-initialises all the variable in the memSeries except 'lset', 'ref',
|
||
// and 'chunkRange', like how it would appear after 'newMemSeries(...)'.
|
||
func (s *memSeries) reset() {
|
||
s.chunks = nil
|
||
s.headChunk = nil
|
||
s.firstChunkID = 0
|
||
s.nextAt = math.MinInt64
|
||
s.sampleBuf = [4]sample{}
|
||
s.pendingCommit = false
|
||
s.app = nil
|
||
}
|
||
|
||
// appendable checks whether the given sample is valid for appending to the series.
|
||
func (s *memSeries) appendable(t int64, v float64) error {
|
||
c := s.head()
|
||
if c == nil {
|
||
return nil
|
||
}
|
||
|
||
if t > c.maxTime {
|
||
return nil
|
||
}
|
||
if t < c.maxTime {
|
||
return ErrOutOfOrderSample
|
||
}
|
||
// We are allowing exact duplicates as we can encounter them in valid cases
|
||
// like federation and erroring out at that time would be extremely noisy.
|
||
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
|
||
return ErrAmendSample
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *memSeries) chunk(id int) *memChunk {
|
||
ix := id - s.firstChunkID
|
||
if ix < 0 || ix >= len(s.chunks) {
|
||
return nil
|
||
}
|
||
return s.chunks[ix]
|
||
}
|
||
|
||
func (s *memSeries) chunkID(pos int) int {
|
||
return pos + s.firstChunkID
|
||
}
|
||
|
||
// truncateChunksBefore removes all chunks from the series that have not timestamp
|
||
// at or after mint. Chunk IDs remain unchanged.
|
||
func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
|
||
var k int
|
||
for i, c := range s.chunks {
|
||
if c.maxTime >= mint {
|
||
break
|
||
}
|
||
k = i + 1
|
||
}
|
||
s.chunks = append(s.chunks[:0], s.chunks[k:]...)
|
||
s.firstChunkID += k
|
||
if len(s.chunks) == 0 {
|
||
s.headChunk = nil
|
||
} else {
|
||
s.headChunk = s.chunks[len(s.chunks)-1]
|
||
}
|
||
|
||
return k
|
||
}
|
||
|
||
// append adds the sample (t, v) to the series.
|
||
func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
||
// Based on Gorilla white papers this offers near-optimal compression ratio
|
||
// so anything bigger that this has diminishing returns and increases
|
||
// the time range within which we have to decompress all samples.
|
||
const samplesPerChunk = 120
|
||
|
||
c := s.head()
|
||
|
||
if c == nil {
|
||
c = s.cut(t)
|
||
chunkCreated = true
|
||
}
|
||
numSamples := c.chunk.NumSamples()
|
||
|
||
// Out of order sample.
|
||
if c.maxTime >= t {
|
||
return false, chunkCreated
|
||
}
|
||
// If we reach 25% of a chunk's desired sample count, set a definitive time
|
||
// at which to start the next chunk.
|
||
// At latest it must happen at the timestamp set when the chunk was cut.
|
||
if numSamples == samplesPerChunk/4 {
|
||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
|
||
}
|
||
if t >= s.nextAt {
|
||
c = s.cut(t)
|
||
chunkCreated = true
|
||
}
|
||
s.app.Append(t, v)
|
||
|
||
c.maxTime = t
|
||
|
||
s.sampleBuf[0] = s.sampleBuf[1]
|
||
s.sampleBuf[1] = s.sampleBuf[2]
|
||
s.sampleBuf[2] = s.sampleBuf[3]
|
||
s.sampleBuf[3] = sample{t: t, v: v}
|
||
|
||
return true, chunkCreated
|
||
}
|
||
|
||
// computeChunkEndTime estimates the end timestamp based the beginning of a chunk,
|
||
// its current timestamp and the upper bound up to which we insert data.
|
||
// It assumes that the time range is 1/4 full.
|
||
func computeChunkEndTime(start, cur, max int64) int64 {
|
||
a := (max - start) / ((cur - start + 1) * 4)
|
||
if a == 0 {
|
||
return max
|
||
}
|
||
return start + (max-start)/a
|
||
}
|
||
|
||
func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator {
|
||
c := s.chunk(id)
|
||
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
|
||
// which got then garbage collected before it got accessed.
|
||
// We must ensure to not garbage collect as long as any readers still hold a reference.
|
||
if c == nil {
|
||
return chunkenc.NewNopIterator()
|
||
}
|
||
|
||
if id-s.firstChunkID < len(s.chunks)-1 {
|
||
return c.chunk.Iterator(it)
|
||
}
|
||
// Serve the last 4 samples for the last chunk from the sample buffer
|
||
// as their compressed bytes may be mutated by added samples.
|
||
if msIter, ok := it.(*memSafeIterator); ok {
|
||
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
|
||
msIter.i = -1
|
||
msIter.total = c.chunk.NumSamples()
|
||
msIter.buf = s.sampleBuf
|
||
return msIter
|
||
}
|
||
return &memSafeIterator{
|
||
Iterator: c.chunk.Iterator(it),
|
||
i: -1,
|
||
total: c.chunk.NumSamples(),
|
||
buf: s.sampleBuf,
|
||
}
|
||
}
|
||
|
||
func (s *memSeries) head() *memChunk {
|
||
return s.headChunk
|
||
}
|
||
|
||
type memChunk struct {
|
||
chunk chunkenc.Chunk
|
||
minTime, maxTime int64
|
||
}
|
||
|
||
// Returns true if the chunk overlaps [mint, maxt].
|
||
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
|
||
return mc.minTime <= maxt && mint <= mc.maxTime
|
||
}
|
||
|
||
type memSafeIterator struct {
|
||
chunkenc.Iterator
|
||
|
||
i int
|
||
total int
|
||
buf [4]sample
|
||
}
|
||
|
||
func (it *memSafeIterator) Next() bool {
|
||
if it.i+1 >= it.total {
|
||
return false
|
||
}
|
||
it.i++
|
||
if it.total-it.i > 4 {
|
||
return it.Iterator.Next()
|
||
}
|
||
return true
|
||
}
|
||
|
||
func (it *memSafeIterator) At() (int64, float64) {
|
||
if it.total-it.i > 4 {
|
||
return it.Iterator.At()
|
||
}
|
||
s := it.buf[4-(it.total-it.i)]
|
||
return s.t, s.v
|
||
}
|
||
|
||
type stringset map[string]struct{}
|
||
|
||
func (ss stringset) set(s string) {
|
||
ss[s] = struct{}{}
|
||
}
|
||
|
||
func (ss stringset) String() string {
|
||
return strings.Join(ss.slice(), ",")
|
||
}
|
||
|
||
func (ss stringset) slice() []string {
|
||
slice := make([]string, 0, len(ss))
|
||
for k := range ss {
|
||
slice = append(slice, k)
|
||
}
|
||
sort.Strings(slice)
|
||
return slice
|
||
}
|