diff --git a/tsdb/head.go b/tsdb/head.go index ae549d253..fff3c4dbc 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -14,12 +14,9 @@ package tsdb import ( - "context" "fmt" "math" "path/filepath" - "runtime" - "sort" "sync" "time" @@ -56,12 +53,6 @@ var ( ErrAppenderClosed = errors.New("appender closed") ) -type ExemplarStorage interface { - storage.ExemplarQueryable - AddExemplar(labels.Labels, exemplar.Exemplar) error - ValidateExemplar(labels.Labels, exemplar.Exemplar) error -} - // Head handles reads and writes of time series data within a time window. type Head struct { chunkRange atomic.Int64 @@ -115,6 +106,12 @@ type Head struct { memTruncationInProcess atomic.Bool } +type ExemplarStorage interface { + storage.ExemplarQueryable + AddExemplar(labels.Labels, exemplar.Exemplar) error + ValidateExemplar(labels.Labels, exemplar.Exemplar) error +} + // HeadOptions are parameters for the Head block. type HeadOptions struct { ChunkRange int64 @@ -144,6 +141,87 @@ func DefaultHeadOptions() *HeadOptions { } } +// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. +// It is always a no-op in Prometheus and mainly meant for external users who import TSDB. +// All the callbacks should be safe to be called concurrently. +// It is up to the user to implement soft or hard consistency by making the callbacks +// atomic or non-atomic. Atomic callbacks can cause degradation performance. +type SeriesLifecycleCallback interface { + // PreCreation is called before creating a series to indicate if the series can be created. + // A non nil error means the series should not be created. + PreCreation(labels.Labels) error + // PostCreation is called after creating a series to indicate a creation of series. + PostCreation(labels.Labels) + // PostDeletion is called after deletion of series. + PostDeletion(...labels.Labels) +} + +// NewHead opens the head block in dir. +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { + var err error + if l == nil { + l = log.NewNopLogger() + } + if opts.ChunkRange < 1 { + return nil, errors.Errorf("invalid chunk range %d", opts.ChunkRange) + } + if opts.SeriesCallback == nil { + opts.SeriesCallback = &noopSeriesLifecycleCallback{} + } + + em := NewExemplarMetrics(r) + es, err := NewCircularExemplarStorage(opts.MaxExemplars.Load(), em) + if err != nil { + return nil, err + } + + if stats == nil { + stats = NewHeadStats() + } + + h := &Head{ + wal: wal, + logger: l, + opts: opts, + exemplarMetrics: em, + exemplars: es, + series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), + symbols: map[string]struct{}{}, + postings: index.NewUnorderedMemPostings(), + tombstones: tombstones.NewMemTombstones(), + iso: newIsolation(), + deleted: map[uint64]int{}, + memChunkPool: sync.Pool{ + New: func() interface{} { + return &memChunk{} + }, + }, + stats: stats, + reg: r, + } + h.chunkRange.Store(opts.ChunkRange) + h.minTime.Store(math.MaxInt64) + h.maxTime.Store(math.MinInt64) + h.lastWALTruncationTime.Store(math.MinInt64) + h.lastMemoryTruncationTime.Store(math.MinInt64) + h.metrics = newHeadMetrics(h, r) + + if opts.ChunkPool == nil { + opts.ChunkPool = chunkenc.NewPool() + } + + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( + mmappedChunksDir(opts.ChunkDirRoot), + opts.ChunkPool, + opts.ChunkWriteBufferSize, + ) + if err != nil { + return nil, err + } + + return h, nil +} + type headMetrics struct { activeAppenders prometheus.Gauge series prometheus.GaugeFunc @@ -318,6 +396,8 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { return m } +func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } + // HeadStats are the statistics for the head component of the DB. type HeadStats struct { WALReplayStatus *WALReplayStatus @@ -352,457 +432,6 @@ func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus { const cardinalityCacheExpirationTime = time.Duration(30) * time.Second -// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names. -func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats { - h.cardinalityMutex.Lock() - defer h.cardinalityMutex.Unlock() - currentTime := time.Duration(time.Now().Unix()) * time.Second - seconds := currentTime - h.lastPostingsStatsCall - if seconds > cardinalityCacheExpirationTime { - h.cardinalityCache = nil - } - if h.cardinalityCache != nil { - return h.cardinalityCache - } - h.cardinalityCache = h.postings.Stats(statsByLabelName) - h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second - - return h.cardinalityCache -} - -// NewHead opens the head block in dir. -func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { - var err error - if l == nil { - l = log.NewNopLogger() - } - if opts.ChunkRange < 1 { - return nil, errors.Errorf("invalid chunk range %d", opts.ChunkRange) - } - if opts.SeriesCallback == nil { - opts.SeriesCallback = &noopSeriesLifecycleCallback{} - } - - em := NewExemplarMetrics(r) - es, err := NewCircularExemplarStorage(opts.MaxExemplars.Load(), em) - if err != nil { - return nil, err - } - - if stats == nil { - stats = NewHeadStats() - } - - h := &Head{ - wal: wal, - logger: l, - opts: opts, - exemplarMetrics: em, - exemplars: es, - series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), - symbols: map[string]struct{}{}, - postings: index.NewUnorderedMemPostings(), - tombstones: tombstones.NewMemTombstones(), - iso: newIsolation(), - deleted: map[uint64]int{}, - memChunkPool: sync.Pool{ - New: func() interface{} { - return &memChunk{} - }, - }, - stats: stats, - reg: r, - } - h.chunkRange.Store(opts.ChunkRange) - h.minTime.Store(math.MaxInt64) - h.maxTime.Store(math.MinInt64) - h.lastWALTruncationTime.Store(math.MinInt64) - h.lastMemoryTruncationTime.Store(math.MinInt64) - h.metrics = newHeadMetrics(h, r) - - if opts.ChunkPool == nil { - opts.ChunkPool = chunkenc.NewPool() - } - - h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( - mmappedChunksDir(opts.ChunkDirRoot), - opts.ChunkPool, - opts.ChunkWriteBufferSize, - ) - if err != nil { - return nil, err - } - - return h, nil -} - -func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } - -func (h *Head) ApplyConfig(cfg *config.Config) error { - if !h.opts.EnableExemplarStorage { - return nil - } - - // Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage - // to decide if it should pass exemplars along to it's exemplar storage, so we - // need to update opts.MaxExemplars here. - prevSize := h.opts.MaxExemplars.Load() - h.opts.MaxExemplars.Store(cfg.StorageConfig.ExemplarsConfig.MaxExemplars) - - if prevSize == h.opts.MaxExemplars.Load() { - return nil - } - - migrated := h.exemplars.(*CircularExemplarStorage).Resize(h.opts.MaxExemplars.Load()) - level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", h.opts.MaxExemplars, "migrated", migrated) - return nil -} - -func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { - return h.exemplars.ExemplarQuerier(ctx) -} - -// processWALSamples adds a partition of samples it receives to the head and passes -// them on to other workers. -// Samples before the mint timestamp are discarded. -func (h *Head) processWALSamples( - minValidTime int64, - input <-chan []record.RefSample, output chan<- []record.RefSample, -) (unknownRefs uint64) { - defer close(output) - - // Mitigate lock contention in getByID. - refSeries := map[uint64]*memSeries{} - - mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) - - for samples := range input { - for _, s := range samples { - if s.T < minValidTime { - continue - } - ms := refSeries[s.Ref] - if ms == nil { - ms = h.series.getByID(s.Ref) - if ms == nil { - unknownRefs++ - continue - } - refSeries[s.Ref] = ms - } - if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { - h.metrics.chunksCreated.Inc() - h.metrics.chunks.Inc() - } - if s.T > maxt { - maxt = s.T - } - if s.T < mint { - mint = s.T - } - } - output <- samples - } - h.updateMinMaxTime(mint, maxt) - - return unknownRefs -} - -func (h *Head) updateMinMaxTime(mint, maxt int64) { - for { - lt := h.MinTime() - if mint >= lt { - break - } - if h.minTime.CAS(lt, mint) { - break - } - } - for { - ht := h.MaxTime() - if maxt <= ht { - break - } - if h.maxTime.CAS(ht, maxt) { - break - } - } -} - -func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) { - // Track number of samples that referenced a series we don't know about - // for error reporting. - var unknownRefs atomic.Uint64 - var unknownExemplarRefs atomic.Uint64 - - // Start workers that each process samples for a partition of the series ID space. - // They are connected through a ring of channels which ensures that all sample batches - // read from the WAL are processed in order. - var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - inputs = make([]chan []record.RefSample, n) - outputs = make([]chan []record.RefSample, n) - exemplarsInput chan record.RefExemplar - - dec record.Decoder - shards = make([][]record.RefSample, n) - - decoded = make(chan interface{}, 10) - decodeErr, seriesCreationErr error - seriesPool = sync.Pool{ - New: func() interface{} { - return []record.RefSeries{} - }, - } - samplesPool = sync.Pool{ - New: func() interface{} { - return []record.RefSample{} - }, - } - tstonesPool = sync.Pool{ - New: func() interface{} { - return []tombstones.Stone{} - }, - } - exemplarsPool = sync.Pool{ - New: func() interface{} { - return []record.RefExemplar{} - }, - } - ) - - defer func() { - // For CorruptionErr ensure to terminate all workers before exiting. - _, ok := err.(*wal.CorruptionErr) - if ok || seriesCreationErr != nil { - for i := 0; i < n; i++ { - close(inputs[i]) - for range outputs[i] { - } - } - close(exemplarsInput) - wg.Wait() - } - }() - - wg.Add(n) - for i := 0; i < n; i++ { - outputs[i] = make(chan []record.RefSample, 300) - inputs[i] = make(chan []record.RefSample, 300) - - go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { - unknown := h.processWALSamples(h.minValidTime.Load(), input, output) - unknownRefs.Add(unknown) - wg.Done() - }(inputs[i], outputs[i]) - } - - wg.Add(1) - exemplarsInput = make(chan record.RefExemplar, 300) - go func(input <-chan record.RefExemplar) { - defer wg.Done() - for e := range input { - ms := h.series.getByID(e.Ref) - if ms == nil { - unknownExemplarRefs.Inc() - continue - } - - if e.T < h.minValidTime.Load() { - continue - } - // At the moment the only possible error here is out of order exemplars, which we shouldn't see when - // replaying the WAL, so lets just log the error if it's not that type. - err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) - if err != nil && err == storage.ErrOutOfOrderExemplar { - level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err) - } - } - }(exemplarsInput) - - go func() { - defer close(decoded) - for r.Next() { - rec := r.Record() - switch dec.Type(rec) { - case record.Series: - series := seriesPool.Get().([]record.RefSeries)[:0] - series, err = dec.Series(rec, series) - if err != nil { - decodeErr = &wal.CorruptionErr{ - Err: errors.Wrap(err, "decode series"), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- series - case record.Samples: - samples := samplesPool.Get().([]record.RefSample)[:0] - samples, err = dec.Samples(rec, samples) - if err != nil { - decodeErr = &wal.CorruptionErr{ - Err: errors.Wrap(err, "decode samples"), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- samples - case record.Tombstones: - tstones := tstonesPool.Get().([]tombstones.Stone)[:0] - tstones, err = dec.Tombstones(rec, tstones) - if err != nil { - decodeErr = &wal.CorruptionErr{ - Err: errors.Wrap(err, "decode tombstones"), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- tstones - case record.Exemplars: - exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0] - exemplars, err = dec.Exemplars(rec, exemplars) - if err != nil { - decodeErr = &wal.CorruptionErr{ - Err: errors.Wrap(err, "decode exemplars"), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- exemplars - default: - // Noop. - } - } - }() - -Outer: - for d := range decoded { - switch v := d.(type) { - case []record.RefSeries: - for _, s := range v { - series, created, err := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) - if err != nil { - seriesCreationErr = err - break Outer - } - - if created { - // If this series gets a duplicate record, we don't restore its mmapped chunks, - // and instead restore everything from WAL records. - series.mmappedChunks = mmappedChunks[series.ref] - - h.metrics.chunks.Add(float64(len(series.mmappedChunks))) - h.metrics.chunksCreated.Add(float64(len(series.mmappedChunks))) - - if len(series.mmappedChunks) > 0 { - h.updateMinMaxTime(series.minTime(), series.maxTime()) - } - } else { - // TODO(codesome) Discard old samples and mmapped chunks and use mmap chunks for the new series ID. - - // There's already a different ref for this series. - multiRef[s.Ref] = series.ref - } - - if h.lastSeriesID.Load() < s.Ref { - h.lastSeriesID.Store(s.Ref) - } - } - //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. - seriesPool.Put(v) - case []record.RefSample: - samples := v - // We split up the samples into chunks of 5000 samples or less. - // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise - // cause thousands of very large in flight buffers occupying large amounts - // of unused memory. - for len(samples) > 0 { - m := 5000 - if len(samples) < m { - m = len(samples) - } - for i := 0; i < n; i++ { - var buf []record.RefSample - select { - case buf = <-outputs[i]: - default: - } - shards[i] = buf[:0] - } - for _, sam := range samples[:m] { - if r, ok := multiRef[sam.Ref]; ok { - sam.Ref = r - } - mod := sam.Ref % uint64(n) - shards[mod] = append(shards[mod], sam) - } - for i := 0; i < n; i++ { - inputs[i] <- shards[i] - } - samples = samples[m:] - } - //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. - samplesPool.Put(v) - case []tombstones.Stone: - for _, s := range v { - for _, itv := range s.Intervals { - if itv.Maxt < h.minValidTime.Load() { - continue - } - if m := h.series.getByID(s.Ref); m == nil { - unknownRefs.Inc() - continue - } - h.tombstones.AddInterval(s.Ref, itv) - } - } - //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. - tstonesPool.Put(v) - case []record.RefExemplar: - for _, e := range v { - exemplarsInput <- e - } - //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. - exemplarsPool.Put(v) - default: - panic(fmt.Errorf("unexpected decoded type: %T", d)) - } - } - - if decodeErr != nil { - return decodeErr - } - if seriesCreationErr != nil { - // Drain the channel to unblock the goroutine. - for range decoded { - } - return seriesCreationErr - } - - // Signal termination to each worker and wait for it to close its output channel. - for i := 0; i < n; i++ { - close(inputs[i]) - for range outputs[i] { - } - } - close(exemplarsInput) - wg.Wait() - - if r.Err() != nil { - return errors.Wrap(r.Err(), "read records") - } - - if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 { - level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load()) - } - return nil -} - // Init loads data from the write ahead log and prepares the head for writes. // It should be called before using an appender so that it // limits the ingested samples to the head min valid time. @@ -904,11 +533,6 @@ func (h *Head) Init(minValidTime int64) error { return nil } -// SetMinValidTime sets the minimum timestamp the head can ingest. -func (h *Head) SetMinValidTime(minValidTime int64) { - h.minValidTime.Store(minValidTime) -} - func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) { mmappedChunks := map[uint64][]*mmappedChunk{} if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { @@ -959,6 +583,70 @@ func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChun return mmappedChunks } +func (h *Head) ApplyConfig(cfg *config.Config) error { + if !h.opts.EnableExemplarStorage { + return nil + } + + // Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage + // to decide if it should pass exemplars along to it's exemplar storage, so we + // need to update opts.MaxExemplars here. + prevSize := h.opts.MaxExemplars.Load() + h.opts.MaxExemplars.Store(cfg.StorageConfig.ExemplarsConfig.MaxExemplars) + + if prevSize == h.opts.MaxExemplars.Load() { + return nil + } + + migrated := h.exemplars.(*CircularExemplarStorage).Resize(h.opts.MaxExemplars.Load()) + level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", h.opts.MaxExemplars, "migrated", migrated) + return nil +} + +// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names. +func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats { + h.cardinalityMutex.Lock() + defer h.cardinalityMutex.Unlock() + currentTime := time.Duration(time.Now().Unix()) * time.Second + seconds := currentTime - h.lastPostingsStatsCall + if seconds > cardinalityCacheExpirationTime { + h.cardinalityCache = nil + } + if h.cardinalityCache != nil { + return h.cardinalityCache + } + h.cardinalityCache = h.postings.Stats(statsByLabelName) + h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second + + return h.cardinalityCache +} + +func (h *Head) updateMinMaxTime(mint, maxt int64) { + for { + lt := h.MinTime() + if mint >= lt { + break + } + if h.minTime.CAS(lt, mint) { + break + } + } + for { + ht := h.MaxTime() + if maxt <= ht { + break + } + if h.maxTime.CAS(ht, maxt) { + break + } + } +} + +// SetMinValidTime sets the minimum timestamp the head can ingest. +func (h *Head) SetMinValidTime(minValidTime int64) { + h.minValidTime.Store(minValidTime) +} + // Truncate removes old data before mint from the head and WAL. func (h *Head) Truncate(mint int64) (err error) { initialize := h.MinTime() == math.MaxInt64 @@ -1186,17 +874,6 @@ func (h *Head) truncateWAL(mint int64) error { return nil } -// initTime initializes a head with the first timestamp. This only needs to be called -// for a completely fresh head with an empty WAL. -func (h *Head) initTime(t int64) { - if !h.minTime.CAS(math.MaxInt64, t) { - return - } - // Ensure that max time is initialized to at least the min time we just set. - // Concurrent appenders may already have set it to a higher value. - h.maxTime.CAS(math.MinInt64, t) -} - type Stats struct { NumSeries uint64 MinTime, MaxTime int64 @@ -1279,419 +956,6 @@ func (h *RangeHead) String() string { return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) } -// initAppender is a helper to initialize the time bounds of the head -// upon the first sample it receives. -type initAppender struct { - app storage.Appender - head *Head -} - -func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { - if a.app != nil { - return a.app.Append(ref, lset, t, v) - } - - a.head.initTime(t) - a.app = a.head.appender() - return a.app.Append(ref, lset, t, v) -} - -func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { - // Check if exemplar storage is enabled. - if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { - return 0, nil - } - - if a.app != nil { - return a.app.AppendExemplar(ref, l, e) - } - // We should never reach here given we would call Append before AppendExemplar - // and we probably want to always base head/WAL min time on sample times. - a.head.initTime(e.Ts) - a.app = a.head.appender() - - return a.app.AppendExemplar(ref, l, e) -} - -var _ storage.GetRef = &initAppender{} - -func (a *initAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) { - if g, ok := a.app.(storage.GetRef); ok { - return g.GetRef(lset) - } - return 0, nil -} - -func (a *initAppender) Commit() error { - if a.app == nil { - return nil - } - return a.app.Commit() -} - -func (a *initAppender) Rollback() error { - if a.app == nil { - return nil - } - return a.app.Rollback() -} - -// Appender returns a new Appender on the database. -func (h *Head) Appender(_ context.Context) storage.Appender { - h.metrics.activeAppenders.Inc() - - // The head cache might not have a starting point yet. The init appender - // picks up the first appended timestamp as the base. - if h.MinTime() == math.MaxInt64 { - return &initAppender{ - head: h, - } - } - return h.appender() -} - -func (h *Head) appender() *headAppender { - appendID, cleanupAppendIDsBelow := h.iso.newAppendID() - - // Allocate the exemplars buffer only if exemplars are enabled. - var exemplarsBuf []exemplarWithSeriesRef - if h.opts.EnableExemplarStorage { - exemplarsBuf = h.getExemplarBuffer() - } - - return &headAppender{ - head: h, - minValidTime: h.appendableMinValidTime(), - mint: math.MaxInt64, - maxt: math.MinInt64, - samples: h.getAppendBuffer(), - sampleSeries: h.getSeriesBuffer(), - exemplars: exemplarsBuf, - appendID: appendID, - cleanupAppendIDsBelow: cleanupAppendIDsBelow, - } -} - -func (h *Head) appendableMinValidTime() int64 { - // Setting the minimum valid time to whichever is greater, the head min valid time or the compaction window, - // ensures that no samples will be added within the compaction window to avoid races. - return max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2) -} - -func max(a, b int64) int64 { - if a > b { - return a - } - return b -} - -func (h *Head) getAppendBuffer() []record.RefSample { - b := h.appendPool.Get() - if b == nil { - return make([]record.RefSample, 0, 512) - } - return b.([]record.RefSample) -} - -func (h *Head) putAppendBuffer(b []record.RefSample) { - //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. - h.appendPool.Put(b[:0]) -} - -func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef { - b := h.exemplarsPool.Get() - if b == nil { - return make([]exemplarWithSeriesRef, 0, 512) - } - return b.([]exemplarWithSeriesRef) -} - -func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { - if b == nil { - return - } - - //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. - h.exemplarsPool.Put(b[:0]) -} - -func (h *Head) getSeriesBuffer() []*memSeries { - b := h.seriesPool.Get() - if b == nil { - return make([]*memSeries, 0, 512) - } - return b.([]*memSeries) -} - -func (h *Head) putSeriesBuffer(b []*memSeries) { - //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. - h.seriesPool.Put(b[:0]) -} - -func (h *Head) getBytesBuffer() []byte { - b := h.bytesPool.Get() - if b == nil { - return make([]byte, 0, 1024) - } - return b.([]byte) -} - -func (h *Head) putBytesBuffer(b []byte) { - //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. - h.bytesPool.Put(b[:0]) -} - -type exemplarWithSeriesRef struct { - ref uint64 - exemplar exemplar.Exemplar -} - -type headAppender struct { - head *Head - minValidTime int64 // No samples below this timestamp are allowed. - mint, maxt int64 - - series []record.RefSeries - samples []record.RefSample - exemplars []exemplarWithSeriesRef - sampleSeries []*memSeries - - appendID, cleanupAppendIDsBelow uint64 - closed bool -} - -func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { - if t < a.minValidTime { - a.head.metrics.outOfBoundSamples.Inc() - return 0, storage.ErrOutOfBounds - } - - s := a.head.series.getByID(ref) - if s == nil { - // Ensure no empty labels have gotten through. - lset = lset.WithoutEmpty() - if len(lset) == 0 { - return 0, errors.Wrap(ErrInvalidSample, "empty labelset") - } - - if l, dup := lset.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) - } - - var created bool - var err error - s, created, err = a.head.getOrCreate(lset.Hash(), lset) - if err != nil { - return 0, err - } - if created { - a.series = append(a.series, record.RefSeries{ - Ref: s.ref, - Labels: lset, - }) - } - } - - s.Lock() - if err := s.appendable(t, v); err != nil { - s.Unlock() - if err == storage.ErrOutOfOrderSample { - a.head.metrics.outOfOrderSamples.Inc() - } - return 0, err - } - s.pendingCommit = true - s.Unlock() - - if t < a.mint { - a.mint = t - } - if t > a.maxt { - a.maxt = t - } - - a.samples = append(a.samples, record.RefSample{ - Ref: s.ref, - T: t, - V: v, - }) - a.sampleSeries = append(a.sampleSeries, s) - return s.ref, nil -} - -// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't -// use getOrCreate or make any of the lset sanity checks that Append does. -func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exemplar) (uint64, error) { - // Check if exemplar storage is enabled. - if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { - return 0, nil - } - s := a.head.series.getByID(ref) - if s == nil { - return 0, fmt.Errorf("unknown series ref. when trying to add exemplar: %d", ref) - } - - // Ensure no empty labels have gotten through. - e.Labels = e.Labels.WithoutEmpty() - - err := a.head.exemplars.ValidateExemplar(s.lset, e) - if err != nil { - if err == storage.ErrDuplicateExemplar || err == storage.ErrExemplarsDisabled { - // Duplicate, don't return an error but don't accept the exemplar. - return 0, nil - } - return 0, err - } - - a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e}) - - return s.ref, nil -} - -var _ storage.GetRef = &headAppender{} - -func (a *headAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) { - s := a.head.series.getByHash(lset.Hash(), lset) - if s == nil { - return 0, nil - } - // returned labels must be suitable to pass to Append() - return s.ref, s.lset -} - -func (a *headAppender) log() error { - if a.head.wal == nil { - return nil - } - - buf := a.head.getBytesBuffer() - defer func() { a.head.putBytesBuffer(buf) }() - - var rec []byte - var enc record.Encoder - - if len(a.series) > 0 { - rec = enc.Series(a.series, buf) - buf = rec[:0] - - if err := a.head.wal.Log(rec); err != nil { - return errors.Wrap(err, "log series") - } - } - if len(a.samples) > 0 { - rec = enc.Samples(a.samples, buf) - buf = rec[:0] - - if err := a.head.wal.Log(rec); err != nil { - return errors.Wrap(err, "log samples") - } - } - if len(a.exemplars) > 0 { - rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf) - buf = rec[:0] - - if err := a.head.wal.Log(rec); err != nil { - return errors.Wrap(err, "log exemplars") - } - } - return nil -} - -func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar { - ret := make([]record.RefExemplar, 0, len(es)) - for _, e := range es { - ret = append(ret, record.RefExemplar{ - Ref: e.ref, - T: e.exemplar.Ts, - V: e.exemplar.Value, - Labels: e.exemplar.Labels, - }) - } - return ret -} - -func (a *headAppender) Commit() (err error) { - if a.closed { - return ErrAppenderClosed - } - defer func() { a.closed = true }() - - if err := a.log(); err != nil { - _ = a.Rollback() // Most likely the same error will happen again. - return errors.Wrap(err, "write to WAL") - } - - // No errors logging to WAL, so pass the exemplars along to the in memory storage. - for _, e := range a.exemplars { - s := a.head.series.getByID(e.ref) - // We don't instrument exemplar appends here, all is instrumented by storage. - if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil { - if err == storage.ErrOutOfOrderExemplar { - continue - } - level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err) - } - } - - defer a.head.metrics.activeAppenders.Dec() - defer a.head.putAppendBuffer(a.samples) - defer a.head.putSeriesBuffer(a.sampleSeries) - defer a.head.putExemplarBuffer(a.exemplars) - defer a.head.iso.closeAppend(a.appendID) - - total := len(a.samples) - var series *memSeries - for i, s := range a.samples { - series = a.sampleSeries[i] - series.Lock() - ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - - if !ok { - total-- - a.head.metrics.outOfOrderSamples.Inc() - } - if chunkCreated { - a.head.metrics.chunks.Inc() - a.head.metrics.chunksCreated.Inc() - } - } - - a.head.metrics.samplesAppended.Add(float64(total)) - a.head.updateMinMaxTime(a.mint, a.maxt) - - return nil -} - -func (a *headAppender) Rollback() (err error) { - if a.closed { - return ErrAppenderClosed - } - defer func() { a.closed = true }() - defer a.head.metrics.activeAppenders.Dec() - defer a.head.iso.closeAppend(a.appendID) - defer a.head.putSeriesBuffer(a.sampleSeries) - - var series *memSeries - for i := range a.samples { - series = a.sampleSeries[i] - series.Lock() - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } - a.head.putAppendBuffer(a.samples) - a.head.putExemplarBuffer(a.exemplars) - a.samples = nil - a.exemplars = nil - - // Series are created in the head memory regardless of rollback. Thus we have - // to log them to the WAL in any case. - return a.log() -} - // Delete all samples in the range of [mint, maxt] for series that satisfy the given // label matchers. func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { @@ -1793,40 +1057,6 @@ func (h *Head) Tombstones() (tombstones.Reader, error) { return h.tombstones, nil } -// Index returns an IndexReader against the block. -func (h *Head) Index() (IndexReader, error) { - return h.indexRange(math.MinInt64, math.MaxInt64), nil -} - -func (h *Head) indexRange(mint, maxt int64) *headIndexReader { - if hmin := h.MinTime(); hmin > mint { - mint = hmin - } - return &headIndexReader{head: h, mint: mint, maxt: maxt} -} - -// Chunks returns a ChunkReader against the block. -func (h *Head) Chunks() (ChunkReader, error) { - return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64)) -} - -func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) { - h.closedMtx.Lock() - defer h.closedMtx.Unlock() - if h.closed { - return nil, errors.New("can't read from a closed head") - } - if hmin := h.MinTime(); hmin > mint { - mint = hmin - } - return &headChunkReader{ - head: h, - mint: mint, - maxt: maxt, - isoState: is, - }, nil -} - // NumSeries returns the number of active series in the head. func (h *Head) NumSeries() uint64 { return h.numSeries.Load() @@ -1883,269 +1113,6 @@ func (h *Head) String() string { return "head" } -type headChunkReader struct { - head *Head - mint, maxt int64 - isoState *isolationState -} - -func (h *headChunkReader) Close() error { - h.isoState.Close() - return nil -} - -// packChunkID packs a seriesID and a chunkID within it into a global 8 byte ID. -// It panicks if the seriesID exceeds 5 bytes or the chunk ID 3 bytes. -func packChunkID(seriesID, chunkID uint64) uint64 { - if seriesID > (1<<40)-1 { - panic("series ID exceeds 5 bytes") - } - if chunkID > (1<<24)-1 { - panic("chunk ID exceeds 3 bytes") - } - return (seriesID << 24) | chunkID -} - -func unpackChunkID(id uint64) (seriesID, chunkID uint64) { - return id >> 24, (id << 40) >> 40 -} - -// Chunk returns the chunk for the reference number. -func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { - sid, cid := unpackChunkID(ref) - - s := h.head.series.getByID(sid) - // This means that the series has been garbage collected. - if s == nil { - return nil, storage.ErrNotFound - } - - s.Lock() - c, garbageCollect, err := s.chunk(int(cid), h.head.chunkDiskMapper) - if err != nil { - s.Unlock() - return nil, err - } - defer func() { - if garbageCollect { - // Set this to nil so that Go GC can collect it after it has been used. - c.chunk = nil - s.memChunkPool.Put(c) - } - }() - - // This means that the chunk is outside the specified range. - if !c.OverlapsClosedInterval(h.mint, h.maxt) { - s.Unlock() - return nil, storage.ErrNotFound - } - s.Unlock() - - return &safeChunk{ - Chunk: c.chunk, - s: s, - cid: int(cid), - isoState: h.isoState, - chunkDiskMapper: h.head.chunkDiskMapper, - }, nil -} - -type safeChunk struct { - chunkenc.Chunk - s *memSeries - cid int - isoState *isolationState - chunkDiskMapper *chunks.ChunkDiskMapper -} - -func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { - c.s.Lock() - it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter) - c.s.Unlock() - return it -} - -type headIndexReader struct { - head *Head - mint, maxt int64 -} - -func (h *headIndexReader) Close() error { - return nil -} - -func (h *headIndexReader) Symbols() index.StringIter { - h.head.symMtx.RLock() - res := make([]string, 0, len(h.head.symbols)) - - for s := range h.head.symbols { - res = append(res, s) - } - h.head.symMtx.RUnlock() - - sort.Strings(res) - return index.NewStringListIter(res) -} - -// SortedLabelValues returns label values present in the head for the -// specific label name that are within the time range mint to maxt. -// If matchers are specified the returned result set is reduced -// to label values of metrics matching the matchers. -func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { - values, err := h.LabelValues(name, matchers...) - if err == nil { - sort.Strings(values) - } - return values, err -} - -// LabelValues returns label values present in the head for the -// specific label name that are within the time range mint to maxt. -// If matchers are specified the returned result set is reduced -// to label values of metrics matching the matchers. -func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { - if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { - return []string{}, nil - } - - if len(matchers) == 0 { - h.head.symMtx.RLock() - defer h.head.symMtx.RUnlock() - return h.head.postings.LabelValues(name), nil - } - - return labelValuesWithMatchers(h, name, matchers...) -} - -// LabelNames returns all the unique label names present in the head -// that are within the time range mint to maxt. -func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { - if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { - return []string{}, nil - } - - if len(matchers) == 0 { - h.head.symMtx.RLock() - labelNames := h.head.postings.LabelNames() - h.head.symMtx.RUnlock() - - sort.Strings(labelNames) - return labelNames, nil - } - - return labelNamesWithMatchers(h, matchers...) -} - -// Postings returns the postings list iterator for the label pairs. -func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { - res := make([]index.Postings, 0, len(values)) - for _, value := range values { - res = append(res, h.head.postings.Get(name, value)) - } - return index.Merge(res...), nil -} - -func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { - series := make([]*memSeries, 0, 128) - - // Fetch all the series only once. - for p.Next() { - s := h.head.series.getByID(p.At()) - if s == nil { - level.Debug(h.head.logger).Log("msg", "Looked up series not found") - } else { - series = append(series, s) - } - } - if err := p.Err(); err != nil { - return index.ErrPostings(errors.Wrap(err, "expand postings")) - } - - sort.Slice(series, func(i, j int) bool { - return labels.Compare(series[i].lset, series[j].lset) < 0 - }) - - // Convert back to list. - ep := make([]uint64, 0, len(series)) - for _, p := range series { - ep = append(ep, p.ref) - } - return index.NewListPostings(ep) -} - -// Series returns the series for the given reference. -func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { - s := h.head.series.getByID(ref) - - if s == nil { - h.head.metrics.seriesNotFound.Inc() - return storage.ErrNotFound - } - *lbls = append((*lbls)[:0], s.lset...) - - s.Lock() - defer s.Unlock() - - *chks = (*chks)[:0] - - for i, c := range s.mmappedChunks { - // Do not expose chunks that are outside of the specified range. - if !c.OverlapsClosedInterval(h.mint, h.maxt) { - continue - } - *chks = append(*chks, chunks.Meta{ - MinTime: c.minTime, - MaxTime: c.maxTime, - Ref: packChunkID(s.ref, uint64(s.chunkID(i))), - }) - } - if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) { - *chks = append(*chks, chunks.Meta{ - MinTime: s.headChunk.minTime, - MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to). - Ref: packChunkID(s.ref, uint64(s.chunkID(len(s.mmappedChunks)))), - }) - } - - return nil -} - -// LabelValueFor returns label value for the given label name in the series referred to by ID. -func (h *headIndexReader) LabelValueFor(id uint64, label string) (string, error) { - memSeries := h.head.series.getByID(id) - if memSeries == nil { - return "", storage.ErrNotFound - } - - value := memSeries.lset.Get(label) - if value == "" { - return "", storage.ErrNotFound - } - - return value, nil -} - -// LabelNamesFor returns all the label names for the series referred to by IDs. -// The names returned are sorted. -func (h *headIndexReader) LabelNamesFor(ids ...uint64) ([]string, error) { - namesMap := make(map[string]struct{}) - for _, id := range ids { - memSeries := h.head.series.getByID(id) - if memSeries == nil { - return nil, storage.ErrNotFound - } - for _, lbl := range memSeries.lset { - namesMap[lbl.Name] = struct{}{} - } - } - names := make([]string, 0, len(namesMap)) - for name := range namesMap { - names = append(names, name) - } - sort.Strings(names) - return names, nil -} - func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { // Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create // a new series on every sample inserted via Add(), which causes allocations @@ -2457,104 +1424,6 @@ func (s *memSeries) maxTime() int64 { return c.maxTime } -func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { - s.mmapCurrentHeadChunk(chunkDiskMapper) - - s.headChunk = &memChunk{ - chunk: chunkenc.NewXORChunk(), - minTime: mint, - maxTime: math.MinInt64, - } - - // Set upper bound on when the next chunk must be started. An earlier timestamp - // may be chosen dynamically at a later point. - s.nextAt = rangeForTimestamp(mint, s.chunkRange) - - app, err := s.headChunk.chunk.Appender() - if err != nil { - panic(err) - } - s.app = app - return s.headChunk -} - -func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { - if s.headChunk == nil { - // There is no head chunk, so nothing to m-map here. - return - } - - chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk) - if err != nil { - if err != chunks.ErrChunkDiskMapperClosed { - panic(err) - } - } - s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ - ref: chunkRef, - numSamples: uint16(s.headChunk.chunk.NumSamples()), - minTime: s.headChunk.minTime, - maxTime: s.headChunk.maxTime, - }) -} - -// appendable checks whether the given sample is valid for appending to the series. -func (s *memSeries) appendable(t int64, v float64) error { - c := s.head() - if c == nil { - return nil - } - - if t > c.maxTime { - return nil - } - if t < c.maxTime { - return storage.ErrOutOfOrderSample - } - // We are allowing exact duplicates as we can encounter them in valid cases - // like federation and erroring out at that time would be extremely noisy. - if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { - return storage.ErrDuplicateSampleForTimestamp - } - return nil -} - -// chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk. -// If garbageCollect is true, it means that the returned *memChunk -// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage. -func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { - // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are - // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. - // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix - // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. - ix := id - s.firstChunkID - if ix < 0 || ix > len(s.mmappedChunks) { - return nil, false, storage.ErrNotFound - } - if ix == len(s.mmappedChunks) { - if s.headChunk == nil { - return nil, false, errors.New("invalid head chunk") - } - return s.headChunk, false, nil - } - chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) - if err != nil { - if _, ok := err.(*chunks.CorruptionErr); ok { - panic(err) - } - return nil, false, err - } - mc := s.memChunkPool.Get().(*memChunk) - mc.chunk = chk - mc.minTime = s.mmappedChunks[ix].minTime - mc.maxTime = s.mmappedChunks[ix].maxTime - return mc, true, nil -} - -func (s *memSeries) chunkID(pos int) int { - return pos + s.firstChunkID -} - // truncateChunksBefore removes all chunks from the series that // have no timestamp at or after mint. // Chunk IDs remain unchanged. @@ -2580,183 +1449,12 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { return removed } -// append adds the sample (t, v) to the series. The caller also has to provide -// the appendID for isolation. (The appendID can be zero, which results in no -// isolation for this append.) -// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { - // Based on Gorilla white papers this offers near-optimal compression ratio - // so anything bigger that this has diminishing returns and increases - // the time range within which we have to decompress all samples. - const samplesPerChunk = 120 - - c := s.head() - - if c == nil { - if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { - // Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it. - return false, false - } - // There is no chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, chunkDiskMapper) - chunkCreated = true - } - numSamples := c.chunk.NumSamples() - - // Out of order sample. - if c.maxTime >= t { - return false, chunkCreated - } - // If we reach 25% of a chunk's desired sample count, predict an end time - // for this chunk that will try to make samples equally distributed within - // the remaining chunks in the current chunk range. - // At latest it must happen at the timestamp set when the chunk was cut. - if numSamples == samplesPerChunk/4 { - s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) - } - if t >= s.nextAt { - c = s.cutNewHeadChunk(t, chunkDiskMapper) - chunkCreated = true - } - s.app.Append(t, v) - - c.maxTime = t - - s.sampleBuf[0] = s.sampleBuf[1] - s.sampleBuf[1] = s.sampleBuf[2] - s.sampleBuf[2] = s.sampleBuf[3] - s.sampleBuf[3] = sample{t: t, v: v} - - if appendID > 0 { - s.txs.add(appendID) - } - - return true, chunkCreated -} - // cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after // acquiring lock. func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { s.txs.cleanupAppendIDsBelow(bound) } -// computeChunkEndTime estimates the end timestamp based the beginning of a -// chunk, its current timestamp and the upper bound up to which we insert data. -// It assumes that the time range is 1/4 full. -// Assuming that the samples will keep arriving at the same rate, it will make the -// remaining n chunks within this chunk range (before max) equally sized. -func computeChunkEndTime(start, cur, max int64) int64 { - n := (max - start) / ((cur - start + 1) * 4) - if n <= 1 { - return max - } - return start + (max-start)/n -} - -// iterator returns a chunk iterator. -// It is unsafe to call this concurrently with s.append(...) without holding the series lock. -func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { - c, garbageCollect, err := s.chunk(id, chunkDiskMapper) - // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a - // series's chunk, which got then garbage collected before it got - // accessed. We must ensure to not garbage collect as long as any - // readers still hold a reference. - if err != nil { - return chunkenc.NewNopIterator() - } - defer func() { - if garbageCollect { - // Set this to nil so that Go GC can collect it after it has been used. - // This should be done always at the end. - c.chunk = nil - s.memChunkPool.Put(c) - } - }() - - ix := id - s.firstChunkID - - numSamples := c.chunk.NumSamples() - stopAfter := numSamples - - if isoState != nil { - totalSamples := 0 // Total samples in this series. - previousSamples := 0 // Samples before this chunk. - - for j, d := range s.mmappedChunks { - totalSamples += int(d.numSamples) - if j < ix { - previousSamples += int(d.numSamples) - } - } - - if s.headChunk != nil { - totalSamples += s.headChunk.chunk.NumSamples() - } - - // Removing the extra transactionIDs that are relevant for samples that - // come after this chunk, from the total transactionIDs. - appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples)) - - // Iterate over the appendIDs, find the first one that the isolation state says not - // to return. - it := s.txs.iterator() - for index := 0; index < appendIDsToConsider; index++ { - appendID := it.At() - if appendID <= isoState.maxAppendID { // Easy check first. - if _, ok := isoState.incompleteAppends[appendID]; !ok { - it.Next() - continue - } - } - stopAfter = numSamples - (appendIDsToConsider - index) - if stopAfter < 0 { - stopAfter = 0 // Stopped in a previous chunk. - } - break - } - } - - if stopAfter == 0 { - return chunkenc.NewNopIterator() - } - - if id-s.firstChunkID < len(s.mmappedChunks) { - if stopAfter == numSamples { - return c.chunk.Iterator(it) - } - if msIter, ok := it.(*stopIterator); ok { - msIter.Iterator = c.chunk.Iterator(msIter.Iterator) - msIter.i = -1 - msIter.stopAfter = stopAfter - return msIter - } - return &stopIterator{ - Iterator: c.chunk.Iterator(it), - i: -1, - stopAfter: stopAfter, - } - } - // Serve the last 4 samples for the last chunk from the sample buffer - // as their compressed bytes may be mutated by added samples. - if msIter, ok := it.(*memSafeIterator); ok { - msIter.Iterator = c.chunk.Iterator(msIter.Iterator) - msIter.i = -1 - msIter.total = numSamples - msIter.stopAfter = stopAfter - msIter.buf = s.sampleBuf - return msIter - } - return &memSafeIterator{ - stopIterator: stopIterator{ - Iterator: c.chunk.Iterator(it), - i: -1, - stopAfter: stopAfter, - }, - total: numSamples, - buf: s.sampleBuf, - } -} - func (s *memSeries) head() *memChunk { return s.headChunk } @@ -2771,63 +1469,6 @@ func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { return mc.minTime <= maxt && mint <= mc.maxTime } -type stopIterator struct { - chunkenc.Iterator - - i, stopAfter int -} - -func (it *stopIterator) Next() bool { - if it.i+1 >= it.stopAfter { - return false - } - it.i++ - return it.Iterator.Next() -} - -type memSafeIterator struct { - stopIterator - - total int - buf [4]sample -} - -func (it *memSafeIterator) Seek(t int64) bool { - if it.Err() != nil { - return false - } - - ts, _ := it.At() - - for t > ts || it.i == -1 { - if !it.Next() { - return false - } - ts, _ = it.At() - } - - return true -} - -func (it *memSafeIterator) Next() bool { - if it.i+1 >= it.stopAfter { - return false - } - it.i++ - if it.total-it.i > 4 { - return it.Iterator.Next() - } - return true -} - -func (it *memSafeIterator) At() (int64, float64) { - if it.total-it.i > 4 { - return it.Iterator.At() - } - s := it.buf[4-(it.total-it.i)] - return s.t, s.v -} - type mmappedChunk struct { ref uint64 numSamples uint16 @@ -2839,21 +1480,6 @@ func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool { return mc.minTime <= maxt && mint <= mc.maxTime } -// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. -// It is always a no-op in Prometheus and mainly meant for external users who import TSDB. -// All the callbacks should be safe to be called concurrently. -// It is up to the user to implement soft or hard consistency by making the callbacks -// atomic or non-atomic. Atomic callbacks can cause degradation performance. -type SeriesLifecycleCallback interface { - // PreCreation is called before creating a series to indicate if the series can be created. - // A non nil error means the series should not be created. - PreCreation(labels.Labels) error - // PostCreation is called after creating a series to indicate a creation of series. - PostCreation(labels.Labels) - // PostDeletion is called after deletion of series. - PostDeletion(...labels.Labels) -} - type noopSeriesLifecycleCallback struct{} func (noopSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil } diff --git a/tsdb/head_append.go b/tsdb/head_append.go new file mode 100644 index 000000000..85a3ff14a --- /dev/null +++ b/tsdb/head_append.go @@ -0,0 +1,583 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "context" + "fmt" + "math" + + "github.com/go-kit/log/level" + "github.com/pkg/errors" + + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/record" +) + +// initAppender is a helper to initialize the time bounds of the head +// upon the first sample it receives. +type initAppender struct { + app storage.Appender + head *Head +} + +var _ storage.GetRef = &initAppender{} + +func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { + if a.app != nil { + return a.app.Append(ref, lset, t, v) + } + + a.head.initTime(t) + a.app = a.head.appender() + return a.app.Append(ref, lset, t, v) +} + +func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + // Check if exemplar storage is enabled. + if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { + return 0, nil + } + + if a.app != nil { + return a.app.AppendExemplar(ref, l, e) + } + // We should never reach here given we would call Append before AppendExemplar + // and we probably want to always base head/WAL min time on sample times. + a.head.initTime(e.Ts) + a.app = a.head.appender() + + return a.app.AppendExemplar(ref, l, e) +} + +// initTime initializes a head with the first timestamp. This only needs to be called +// for a completely fresh head with an empty WAL. +func (h *Head) initTime(t int64) { + if !h.minTime.CAS(math.MaxInt64, t) { + return + } + // Ensure that max time is initialized to at least the min time we just set. + // Concurrent appenders may already have set it to a higher value. + h.maxTime.CAS(math.MinInt64, t) +} + +func (a *initAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) { + if g, ok := a.app.(storage.GetRef); ok { + return g.GetRef(lset) + } + return 0, nil +} + +func (a *initAppender) Commit() error { + if a.app == nil { + return nil + } + return a.app.Commit() +} + +func (a *initAppender) Rollback() error { + if a.app == nil { + return nil + } + return a.app.Rollback() +} + +// Appender returns a new Appender on the database. +func (h *Head) Appender(_ context.Context) storage.Appender { + h.metrics.activeAppenders.Inc() + + // The head cache might not have a starting point yet. The init appender + // picks up the first appended timestamp as the base. + if h.MinTime() == math.MaxInt64 { + return &initAppender{ + head: h, + } + } + return h.appender() +} + +func (h *Head) appender() *headAppender { + appendID, cleanupAppendIDsBelow := h.iso.newAppendID() + + // Allocate the exemplars buffer only if exemplars are enabled. + var exemplarsBuf []exemplarWithSeriesRef + if h.opts.EnableExemplarStorage { + exemplarsBuf = h.getExemplarBuffer() + } + + return &headAppender{ + head: h, + minValidTime: h.appendableMinValidTime(), + mint: math.MaxInt64, + maxt: math.MinInt64, + samples: h.getAppendBuffer(), + sampleSeries: h.getSeriesBuffer(), + exemplars: exemplarsBuf, + appendID: appendID, + cleanupAppendIDsBelow: cleanupAppendIDsBelow, + } +} + +func (h *Head) appendableMinValidTime() int64 { + // Setting the minimum valid time to whichever is greater, the head min valid time or the compaction window, + // ensures that no samples will be added within the compaction window to avoid races. + return max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2) +} + +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} + +func (h *Head) getAppendBuffer() []record.RefSample { + b := h.appendPool.Get() + if b == nil { + return make([]record.RefSample, 0, 512) + } + return b.([]record.RefSample) +} + +func (h *Head) putAppendBuffer(b []record.RefSample) { + //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + h.appendPool.Put(b[:0]) +} + +func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef { + b := h.exemplarsPool.Get() + if b == nil { + return make([]exemplarWithSeriesRef, 0, 512) + } + return b.([]exemplarWithSeriesRef) +} + +func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { + if b == nil { + return + } + + //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + h.exemplarsPool.Put(b[:0]) +} + +func (h *Head) getSeriesBuffer() []*memSeries { + b := h.seriesPool.Get() + if b == nil { + return make([]*memSeries, 0, 512) + } + return b.([]*memSeries) +} + +func (h *Head) putSeriesBuffer(b []*memSeries) { + //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + h.seriesPool.Put(b[:0]) +} + +func (h *Head) getBytesBuffer() []byte { + b := h.bytesPool.Get() + if b == nil { + return make([]byte, 0, 1024) + } + return b.([]byte) +} + +func (h *Head) putBytesBuffer(b []byte) { + //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + h.bytesPool.Put(b[:0]) +} + +type exemplarWithSeriesRef struct { + ref uint64 + exemplar exemplar.Exemplar +} + +type headAppender struct { + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 + + series []record.RefSeries + samples []record.RefSample + exemplars []exemplarWithSeriesRef + sampleSeries []*memSeries + + appendID, cleanupAppendIDsBelow uint64 + closed bool +} + +func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { + if t < a.minValidTime { + a.head.metrics.outOfBoundSamples.Inc() + return 0, storage.ErrOutOfBounds + } + + s := a.head.series.getByID(ref) + if s == nil { + // Ensure no empty labels have gotten through. + lset = lset.WithoutEmpty() + if len(lset) == 0 { + return 0, errors.Wrap(ErrInvalidSample, "empty labelset") + } + + if l, dup := lset.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) + } + + var created bool + var err error + s, created, err = a.head.getOrCreate(lset.Hash(), lset) + if err != nil { + return 0, err + } + if created { + a.series = append(a.series, record.RefSeries{ + Ref: s.ref, + Labels: lset, + }) + } + } + + s.Lock() + if err := s.appendable(t, v); err != nil { + s.Unlock() + if err == storage.ErrOutOfOrderSample { + a.head.metrics.outOfOrderSamples.Inc() + } + return 0, err + } + s.pendingCommit = true + s.Unlock() + + if t < a.mint { + a.mint = t + } + if t > a.maxt { + a.maxt = t + } + + a.samples = append(a.samples, record.RefSample{ + Ref: s.ref, + T: t, + V: v, + }) + a.sampleSeries = append(a.sampleSeries, s) + return s.ref, nil +} + +// appendable checks whether the given sample is valid for appending to the series. +func (s *memSeries) appendable(t int64, v float64) error { + c := s.head() + if c == nil { + return nil + } + + if t > c.maxTime { + return nil + } + if t < c.maxTime { + return storage.ErrOutOfOrderSample + } + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { + return storage.ErrDuplicateSampleForTimestamp + } + return nil +} + +// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't +// use getOrCreate or make any of the lset sanity checks that Append does. +func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exemplar) (uint64, error) { + // Check if exemplar storage is enabled. + if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { + return 0, nil + } + s := a.head.series.getByID(ref) + if s == nil { + return 0, fmt.Errorf("unknown series ref. when trying to add exemplar: %d", ref) + } + + // Ensure no empty labels have gotten through. + e.Labels = e.Labels.WithoutEmpty() + + err := a.head.exemplars.ValidateExemplar(s.lset, e) + if err != nil { + if err == storage.ErrDuplicateExemplar || err == storage.ErrExemplarsDisabled { + // Duplicate, don't return an error but don't accept the exemplar. + return 0, nil + } + return 0, err + } + + a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e}) + + return s.ref, nil +} + +var _ storage.GetRef = &headAppender{} + +func (a *headAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) { + s := a.head.series.getByHash(lset.Hash(), lset) + if s == nil { + return 0, nil + } + // returned labels must be suitable to pass to Append() + return s.ref, s.lset +} + +func (a *headAppender) log() error { + if a.head.wal == nil { + return nil + } + + buf := a.head.getBytesBuffer() + defer func() { a.head.putBytesBuffer(buf) }() + + var rec []byte + var enc record.Encoder + + if len(a.series) > 0 { + rec = enc.Series(a.series, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log series") + } + } + if len(a.samples) > 0 { + rec = enc.Samples(a.samples, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log samples") + } + } + if len(a.exemplars) > 0 { + rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log exemplars") + } + } + return nil +} + +func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar { + ret := make([]record.RefExemplar, 0, len(es)) + for _, e := range es { + ret = append(ret, record.RefExemplar{ + Ref: e.ref, + T: e.exemplar.Ts, + V: e.exemplar.Value, + Labels: e.exemplar.Labels, + }) + } + return ret +} + +func (a *headAppender) Commit() (err error) { + if a.closed { + return ErrAppenderClosed + } + defer func() { a.closed = true }() + + if err := a.log(); err != nil { + _ = a.Rollback() // Most likely the same error will happen again. + return errors.Wrap(err, "write to WAL") + } + + // No errors logging to WAL, so pass the exemplars along to the in memory storage. + for _, e := range a.exemplars { + s := a.head.series.getByID(e.ref) + // We don't instrument exemplar appends here, all is instrumented by storage. + if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil { + if err == storage.ErrOutOfOrderExemplar { + continue + } + level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err) + } + } + + defer a.head.metrics.activeAppenders.Dec() + defer a.head.putAppendBuffer(a.samples) + defer a.head.putSeriesBuffer(a.sampleSeries) + defer a.head.putExemplarBuffer(a.exemplars) + defer a.head.iso.closeAppend(a.appendID) + + total := len(a.samples) + var series *memSeries + for i, s := range a.samples { + series = a.sampleSeries[i] + series.Lock() + ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + + if !ok { + total-- + a.head.metrics.outOfOrderSamples.Inc() + } + if chunkCreated { + a.head.metrics.chunks.Inc() + a.head.metrics.chunksCreated.Inc() + } + } + + a.head.metrics.samplesAppended.Add(float64(total)) + a.head.updateMinMaxTime(a.mint, a.maxt) + + return nil +} + +// append adds the sample (t, v) to the series. The caller also has to provide +// the appendID for isolation. (The appendID can be zero, which results in no +// isolation for this append.) +// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { + // Based on Gorilla white papers this offers near-optimal compression ratio + // so anything bigger that this has diminishing returns and increases + // the time range within which we have to decompress all samples. + const samplesPerChunk = 120 + + c := s.head() + + if c == nil { + if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { + // Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it. + return false, false + } + // There is no chunk in this series yet, create the first chunk for the sample. + c = s.cutNewHeadChunk(t, chunkDiskMapper) + chunkCreated = true + } + numSamples := c.chunk.NumSamples() + + // Out of order sample. + if c.maxTime >= t { + return false, chunkCreated + } + // If we reach 25% of a chunk's desired sample count, predict an end time + // for this chunk that will try to make samples equally distributed within + // the remaining chunks in the current chunk range. + // At latest it must happen at the timestamp set when the chunk was cut. + if numSamples == samplesPerChunk/4 { + s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) + } + if t >= s.nextAt { + c = s.cutNewHeadChunk(t, chunkDiskMapper) + chunkCreated = true + } + s.app.Append(t, v) + + c.maxTime = t + + s.sampleBuf[0] = s.sampleBuf[1] + s.sampleBuf[1] = s.sampleBuf[2] + s.sampleBuf[2] = s.sampleBuf[3] + s.sampleBuf[3] = sample{t: t, v: v} + + if appendID > 0 { + s.txs.add(appendID) + } + + return true, chunkCreated +} + +// computeChunkEndTime estimates the end timestamp based the beginning of a +// chunk, its current timestamp and the upper bound up to which we insert data. +// It assumes that the time range is 1/4 full. +// Assuming that the samples will keep arriving at the same rate, it will make the +// remaining n chunks within this chunk range (before max) equally sized. +func computeChunkEndTime(start, cur, max int64) int64 { + n := (max - start) / ((cur - start + 1) * 4) + if n <= 1 { + return max + } + return start + (max-start)/n +} + +func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { + s.mmapCurrentHeadChunk(chunkDiskMapper) + + s.headChunk = &memChunk{ + chunk: chunkenc.NewXORChunk(), + minTime: mint, + maxTime: math.MinInt64, + } + + // Set upper bound on when the next chunk must be started. An earlier timestamp + // may be chosen dynamically at a later point. + s.nextAt = rangeForTimestamp(mint, s.chunkRange) + + app, err := s.headChunk.chunk.Appender() + if err != nil { + panic(err) + } + s.app = app + return s.headChunk +} + +func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { + if s.headChunk == nil { + // There is no head chunk, so nothing to m-map here. + return + } + + chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk) + if err != nil { + if err != chunks.ErrChunkDiskMapperClosed { + panic(err) + } + } + s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ + ref: chunkRef, + numSamples: uint16(s.headChunk.chunk.NumSamples()), + minTime: s.headChunk.minTime, + maxTime: s.headChunk.maxTime, + }) +} + +func (a *headAppender) Rollback() (err error) { + if a.closed { + return ErrAppenderClosed + } + defer func() { a.closed = true }() + defer a.head.metrics.activeAppenders.Dec() + defer a.head.iso.closeAppend(a.appendID) + defer a.head.putSeriesBuffer(a.sampleSeries) + + var series *memSeries + for i := range a.samples { + series = a.sampleSeries[i] + series.Lock() + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } + a.head.putAppendBuffer(a.samples) + a.head.putExemplarBuffer(a.exemplars) + a.samples = nil + a.exemplars = nil + + // Series are created in the head memory regardless of rollback. Thus we have + // to log them to the WAL in any case. + return a.log() +} diff --git a/tsdb/head_read.go b/tsdb/head_read.go new file mode 100644 index 000000000..9e43265ba --- /dev/null +++ b/tsdb/head_read.go @@ -0,0 +1,527 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "context" + "math" + "sort" + + "github.com/go-kit/log/level" + "github.com/pkg/errors" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" +) + +func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { + return h.exemplars.ExemplarQuerier(ctx) +} + +// Index returns an IndexReader against the block. +func (h *Head) Index() (IndexReader, error) { + return h.indexRange(math.MinInt64, math.MaxInt64), nil +} + +func (h *Head) indexRange(mint, maxt int64) *headIndexReader { + if hmin := h.MinTime(); hmin > mint { + mint = hmin + } + return &headIndexReader{head: h, mint: mint, maxt: maxt} +} + +type headIndexReader struct { + head *Head + mint, maxt int64 +} + +func (h *headIndexReader) Close() error { + return nil +} + +func (h *headIndexReader) Symbols() index.StringIter { + h.head.symMtx.RLock() + res := make([]string, 0, len(h.head.symbols)) + + for s := range h.head.symbols { + res = append(res, s) + } + h.head.symMtx.RUnlock() + + sort.Strings(res) + return index.NewStringListIter(res) +} + +// SortedLabelValues returns label values present in the head for the +// specific label name that are within the time range mint to maxt. +// If matchers are specified the returned result set is reduced +// to label values of metrics matching the matchers. +func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + values, err := h.LabelValues(name, matchers...) + if err == nil { + sort.Strings(values) + } + return values, err +} + +// LabelValues returns label values present in the head for the +// specific label name that are within the time range mint to maxt. +// If matchers are specified the returned result set is reduced +// to label values of metrics matching the matchers. +func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { + return []string{}, nil + } + + if len(matchers) == 0 { + h.head.symMtx.RLock() + defer h.head.symMtx.RUnlock() + return h.head.postings.LabelValues(name), nil + } + + return labelValuesWithMatchers(h, name, matchers...) +} + +// LabelNames returns all the unique label names present in the head +// that are within the time range mint to maxt. +func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { + if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { + return []string{}, nil + } + + if len(matchers) == 0 { + h.head.symMtx.RLock() + labelNames := h.head.postings.LabelNames() + h.head.symMtx.RUnlock() + + sort.Strings(labelNames) + return labelNames, nil + } + + return labelNamesWithMatchers(h, matchers...) +} + +// Postings returns the postings list iterator for the label pairs. +func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { + res := make([]index.Postings, 0, len(values)) + for _, value := range values { + res = append(res, h.head.postings.Get(name, value)) + } + return index.Merge(res...), nil +} + +func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { + series := make([]*memSeries, 0, 128) + + // Fetch all the series only once. + for p.Next() { + s := h.head.series.getByID(p.At()) + if s == nil { + level.Debug(h.head.logger).Log("msg", "Looked up series not found") + } else { + series = append(series, s) + } + } + if err := p.Err(); err != nil { + return index.ErrPostings(errors.Wrap(err, "expand postings")) + } + + sort.Slice(series, func(i, j int) bool { + return labels.Compare(series[i].lset, series[j].lset) < 0 + }) + + // Convert back to list. + ep := make([]uint64, 0, len(series)) + for _, p := range series { + ep = append(ep, p.ref) + } + return index.NewListPostings(ep) +} + +// Series returns the series for the given reference. +func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { + s := h.head.series.getByID(ref) + + if s == nil { + h.head.metrics.seriesNotFound.Inc() + return storage.ErrNotFound + } + *lbls = append((*lbls)[:0], s.lset...) + + s.Lock() + defer s.Unlock() + + *chks = (*chks)[:0] + + for i, c := range s.mmappedChunks { + // Do not expose chunks that are outside of the specified range. + if !c.OverlapsClosedInterval(h.mint, h.maxt) { + continue + } + *chks = append(*chks, chunks.Meta{ + MinTime: c.minTime, + MaxTime: c.maxTime, + Ref: packChunkID(s.ref, uint64(s.chunkID(i))), + }) + } + if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) { + *chks = append(*chks, chunks.Meta{ + MinTime: s.headChunk.minTime, + MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to). + Ref: packChunkID(s.ref, uint64(s.chunkID(len(s.mmappedChunks)))), + }) + } + + return nil +} + +func (s *memSeries) chunkID(pos int) int { + return pos + s.firstChunkID +} + +// LabelValueFor returns label value for the given label name in the series referred to by ID. +func (h *headIndexReader) LabelValueFor(id uint64, label string) (string, error) { + memSeries := h.head.series.getByID(id) + if memSeries == nil { + return "", storage.ErrNotFound + } + + value := memSeries.lset.Get(label) + if value == "" { + return "", storage.ErrNotFound + } + + return value, nil +} + +// LabelNamesFor returns all the label names for the series referred to by IDs. +// The names returned are sorted. +func (h *headIndexReader) LabelNamesFor(ids ...uint64) ([]string, error) { + namesMap := make(map[string]struct{}) + for _, id := range ids { + memSeries := h.head.series.getByID(id) + if memSeries == nil { + return nil, storage.ErrNotFound + } + for _, lbl := range memSeries.lset { + namesMap[lbl.Name] = struct{}{} + } + } + names := make([]string, 0, len(namesMap)) + for name := range namesMap { + names = append(names, name) + } + sort.Strings(names) + return names, nil +} + +// Chunks returns a ChunkReader against the block. +func (h *Head) Chunks() (ChunkReader, error) { + return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64)) +} + +func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) { + h.closedMtx.Lock() + defer h.closedMtx.Unlock() + if h.closed { + return nil, errors.New("can't read from a closed head") + } + if hmin := h.MinTime(); hmin > mint { + mint = hmin + } + return &headChunkReader{ + head: h, + mint: mint, + maxt: maxt, + isoState: is, + }, nil +} + +type headChunkReader struct { + head *Head + mint, maxt int64 + isoState *isolationState +} + +func (h *headChunkReader) Close() error { + h.isoState.Close() + return nil +} + +// packChunkID packs a seriesID and a chunkID within it into a global 8 byte ID. +// It panicks if the seriesID exceeds 5 bytes or the chunk ID 3 bytes. +func packChunkID(seriesID, chunkID uint64) uint64 { + if seriesID > (1<<40)-1 { + panic("series ID exceeds 5 bytes") + } + if chunkID > (1<<24)-1 { + panic("chunk ID exceeds 3 bytes") + } + return (seriesID << 24) | chunkID +} + +func unpackChunkID(id uint64) (seriesID, chunkID uint64) { + return id >> 24, (id << 40) >> 40 +} + +// Chunk returns the chunk for the reference number. +func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { + sid, cid := unpackChunkID(ref) + + s := h.head.series.getByID(sid) + // This means that the series has been garbage collected. + if s == nil { + return nil, storage.ErrNotFound + } + + s.Lock() + c, garbageCollect, err := s.chunk(int(cid), h.head.chunkDiskMapper) + if err != nil { + s.Unlock() + return nil, err + } + defer func() { + if garbageCollect { + // Set this to nil so that Go GC can collect it after it has been used. + c.chunk = nil + s.memChunkPool.Put(c) + } + }() + + // This means that the chunk is outside the specified range. + if !c.OverlapsClosedInterval(h.mint, h.maxt) { + s.Unlock() + return nil, storage.ErrNotFound + } + s.Unlock() + + return &safeChunk{ + Chunk: c.chunk, + s: s, + cid: int(cid), + isoState: h.isoState, + chunkDiskMapper: h.head.chunkDiskMapper, + }, nil +} + +// chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk. +// If garbageCollect is true, it means that the returned *memChunk +// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage. +func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { + // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are + // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. + // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix + // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. + ix := id - s.firstChunkID + if ix < 0 || ix > len(s.mmappedChunks) { + return nil, false, storage.ErrNotFound + } + if ix == len(s.mmappedChunks) { + if s.headChunk == nil { + return nil, false, errors.New("invalid head chunk") + } + return s.headChunk, false, nil + } + chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) + if err != nil { + if _, ok := err.(*chunks.CorruptionErr); ok { + panic(err) + } + return nil, false, err + } + mc := s.memChunkPool.Get().(*memChunk) + mc.chunk = chk + mc.minTime = s.mmappedChunks[ix].minTime + mc.maxTime = s.mmappedChunks[ix].maxTime + return mc, true, nil +} + +type safeChunk struct { + chunkenc.Chunk + s *memSeries + cid int + isoState *isolationState + chunkDiskMapper *chunks.ChunkDiskMapper +} + +func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { + c.s.Lock() + it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter) + c.s.Unlock() + return it +} + +// iterator returns a chunk iterator. +// It is unsafe to call this concurrently with s.append(...) without holding the series lock. +func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { + c, garbageCollect, err := s.chunk(id, chunkDiskMapper) + // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a + // series's chunk, which got then garbage collected before it got + // accessed. We must ensure to not garbage collect as long as any + // readers still hold a reference. + if err != nil { + return chunkenc.NewNopIterator() + } + defer func() { + if garbageCollect { + // Set this to nil so that Go GC can collect it after it has been used. + // This should be done always at the end. + c.chunk = nil + s.memChunkPool.Put(c) + } + }() + + ix := id - s.firstChunkID + + numSamples := c.chunk.NumSamples() + stopAfter := numSamples + + if isoState != nil { + totalSamples := 0 // Total samples in this series. + previousSamples := 0 // Samples before this chunk. + + for j, d := range s.mmappedChunks { + totalSamples += int(d.numSamples) + if j < ix { + previousSamples += int(d.numSamples) + } + } + + if s.headChunk != nil { + totalSamples += s.headChunk.chunk.NumSamples() + } + + // Removing the extra transactionIDs that are relevant for samples that + // come after this chunk, from the total transactionIDs. + appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples)) + + // Iterate over the appendIDs, find the first one that the isolation state says not + // to return. + it := s.txs.iterator() + for index := 0; index < appendIDsToConsider; index++ { + appendID := it.At() + if appendID <= isoState.maxAppendID { // Easy check first. + if _, ok := isoState.incompleteAppends[appendID]; !ok { + it.Next() + continue + } + } + stopAfter = numSamples - (appendIDsToConsider - index) + if stopAfter < 0 { + stopAfter = 0 // Stopped in a previous chunk. + } + break + } + } + + if stopAfter == 0 { + return chunkenc.NewNopIterator() + } + + if id-s.firstChunkID < len(s.mmappedChunks) { + if stopAfter == numSamples { + return c.chunk.Iterator(it) + } + if msIter, ok := it.(*stopIterator); ok { + msIter.Iterator = c.chunk.Iterator(msIter.Iterator) + msIter.i = -1 + msIter.stopAfter = stopAfter + return msIter + } + return &stopIterator{ + Iterator: c.chunk.Iterator(it), + i: -1, + stopAfter: stopAfter, + } + } + // Serve the last 4 samples for the last chunk from the sample buffer + // as their compressed bytes may be mutated by added samples. + if msIter, ok := it.(*memSafeIterator); ok { + msIter.Iterator = c.chunk.Iterator(msIter.Iterator) + msIter.i = -1 + msIter.total = numSamples + msIter.stopAfter = stopAfter + msIter.buf = s.sampleBuf + return msIter + } + return &memSafeIterator{ + stopIterator: stopIterator{ + Iterator: c.chunk.Iterator(it), + i: -1, + stopAfter: stopAfter, + }, + total: numSamples, + buf: s.sampleBuf, + } +} + +type memSafeIterator struct { + stopIterator + + total int + buf [4]sample +} + +func (it *memSafeIterator) Seek(t int64) bool { + if it.Err() != nil { + return false + } + + ts, _ := it.At() + + for t > ts || it.i == -1 { + if !it.Next() { + return false + } + ts, _ = it.At() + } + + return true +} + +func (it *memSafeIterator) Next() bool { + if it.i+1 >= it.stopAfter { + return false + } + it.i++ + if it.total-it.i > 4 { + return it.Iterator.Next() + } + return true +} + +func (it *memSafeIterator) At() (int64, float64) { + if it.total-it.i > 4 { + return it.Iterator.At() + } + s := it.buf[4-(it.total-it.i)] + return s.t, s.v +} + +type stopIterator struct { + chunkenc.Iterator + + i, stopAfter int +} + +func (it *stopIterator) Next() bool { + if it.i+1 >= it.stopAfter { + return false + } + it.i++ + return it.Iterator.Next() +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go new file mode 100644 index 000000000..707bb0fd9 --- /dev/null +++ b/tsdb/head_wal.go @@ -0,0 +1,351 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "fmt" + "math" + "runtime" + "sync" + + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "go.uber.org/atomic" + + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/tsdb/wal" +) + +func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) { + // Track number of samples that referenced a series we don't know about + // for error reporting. + var unknownRefs atomic.Uint64 + var unknownExemplarRefs atomic.Uint64 + + // Start workers that each process samples for a partition of the series ID space. + // They are connected through a ring of channels which ensures that all sample batches + // read from the WAL are processed in order. + var ( + wg sync.WaitGroup + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []record.RefSample, n) + outputs = make([]chan []record.RefSample, n) + exemplarsInput chan record.RefExemplar + + dec record.Decoder + shards = make([][]record.RefSample, n) + + decoded = make(chan interface{}, 10) + decodeErr, seriesCreationErr error + seriesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSeries{} + }, + } + samplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSample{} + }, + } + tstonesPool = sync.Pool{ + New: func() interface{} { + return []tombstones.Stone{} + }, + } + exemplarsPool = sync.Pool{ + New: func() interface{} { + return []record.RefExemplar{} + }, + } + ) + + defer func() { + // For CorruptionErr ensure to terminate all workers before exiting. + _, ok := err.(*wal.CorruptionErr) + if ok || seriesCreationErr != nil { + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } + } + close(exemplarsInput) + wg.Wait() + } + }() + + wg.Add(n) + for i := 0; i < n; i++ { + outputs[i] = make(chan []record.RefSample, 300) + inputs[i] = make(chan []record.RefSample, 300) + + go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { + unknown := h.processWALSamples(h.minValidTime.Load(), input, output) + unknownRefs.Add(unknown) + wg.Done() + }(inputs[i], outputs[i]) + } + + wg.Add(1) + exemplarsInput = make(chan record.RefExemplar, 300) + go func(input <-chan record.RefExemplar) { + defer wg.Done() + for e := range input { + ms := h.series.getByID(e.Ref) + if ms == nil { + unknownExemplarRefs.Inc() + continue + } + + if e.T < h.minValidTime.Load() { + continue + } + // At the moment the only possible error here is out of order exemplars, which we shouldn't see when + // replaying the WAL, so lets just log the error if it's not that type. + err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) + if err != nil && err == storage.ErrOutOfOrderExemplar { + level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err) + } + } + }(exemplarsInput) + + go func() { + defer close(decoded) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series := seriesPool.Get().([]record.RefSeries)[:0] + series, err = dec.Series(rec, series) + if err != nil { + decodeErr = &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode series"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- series + case record.Samples: + samples := samplesPool.Get().([]record.RefSample)[:0] + samples, err = dec.Samples(rec, samples) + if err != nil { + decodeErr = &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode samples"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- samples + case record.Tombstones: + tstones := tstonesPool.Get().([]tombstones.Stone)[:0] + tstones, err = dec.Tombstones(rec, tstones) + if err != nil { + decodeErr = &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode tombstones"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- tstones + case record.Exemplars: + exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0] + exemplars, err = dec.Exemplars(rec, exemplars) + if err != nil { + decodeErr = &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode exemplars"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- exemplars + default: + // Noop. + } + } + }() + +Outer: + for d := range decoded { + switch v := d.(type) { + case []record.RefSeries: + for _, s := range v { + series, created, err := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + if err != nil { + seriesCreationErr = err + break Outer + } + + if created { + // If this series gets a duplicate record, we don't restore its mmapped chunks, + // and instead restore everything from WAL records. + series.mmappedChunks = mmappedChunks[series.ref] + + h.metrics.chunks.Add(float64(len(series.mmappedChunks))) + h.metrics.chunksCreated.Add(float64(len(series.mmappedChunks))) + + if len(series.mmappedChunks) > 0 { + h.updateMinMaxTime(series.minTime(), series.maxTime()) + } + } else { + // TODO(codesome) Discard old samples and mmapped chunks and use mmap chunks for the new series ID. + + // There's already a different ref for this series. + multiRef[s.Ref] = series.ref + } + + if h.lastSeriesID.Load() < s.Ref { + h.lastSeriesID.Store(s.Ref) + } + } + //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. + seriesPool.Put(v) + case []record.RefSample: + samples := v + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + m := 5000 + if len(samples) < m { + m = len(samples) + } + for i := 0; i < n; i++ { + var buf []record.RefSample + select { + case buf = <-outputs[i]: + default: + } + shards[i] = buf[:0] + } + for _, sam := range samples[:m] { + if r, ok := multiRef[sam.Ref]; ok { + sam.Ref = r + } + mod := sam.Ref % uint64(n) + shards[mod] = append(shards[mod], sam) + } + for i := 0; i < n; i++ { + inputs[i] <- shards[i] + } + samples = samples[m:] + } + //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. + samplesPool.Put(v) + case []tombstones.Stone: + for _, s := range v { + for _, itv := range s.Intervals { + if itv.Maxt < h.minValidTime.Load() { + continue + } + if m := h.series.getByID(s.Ref); m == nil { + unknownRefs.Inc() + continue + } + h.tombstones.AddInterval(s.Ref, itv) + } + } + //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. + tstonesPool.Put(v) + case []record.RefExemplar: + for _, e := range v { + exemplarsInput <- e + } + //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. + exemplarsPool.Put(v) + default: + panic(fmt.Errorf("unexpected decoded type: %T", d)) + } + } + + if decodeErr != nil { + return decodeErr + } + if seriesCreationErr != nil { + // Drain the channel to unblock the goroutine. + for range decoded { + } + return seriesCreationErr + } + + // Signal termination to each worker and wait for it to close its output channel. + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } + } + close(exemplarsInput) + wg.Wait() + + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } + + if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 { + level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load()) + } + return nil +} + +// processWALSamples adds a partition of samples it receives to the head and passes +// them on to other workers. +// Samples before the mint timestamp are discarded. +func (h *Head) processWALSamples( + minValidTime int64, + input <-chan []record.RefSample, output chan<- []record.RefSample, +) (unknownRefs uint64) { + defer close(output) + + // Mitigate lock contention in getByID. + refSeries := map[uint64]*memSeries{} + + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) + + for samples := range input { + for _, s := range samples { + if s.T < minValidTime { + continue + } + ms := refSeries[s.Ref] + if ms == nil { + ms = h.series.getByID(s.Ref) + if ms == nil { + unknownRefs++ + continue + } + refSeries[s.Ref] = ms + } + if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { + h.metrics.chunksCreated.Inc() + h.metrics.chunks.Inc() + } + if s.T > maxt { + maxt = s.T + } + if s.T < mint { + mint = s.T + } + } + output <- samples + } + h.updateMinMaxTime(mint, maxt) + + return unknownRefs +}