prometheus/tsdb/head.go

2040 lines
53 KiB
Go
Raw Normal View History

2017-04-10 11:59:45 -07:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-12-04 04:16:11 -08:00
package tsdb
import (
"fmt"
"math"
2017-10-07 06:55:11 -07:00
"runtime"
"sort"
"strings"
2016-12-04 04:16:11 -08:00
"sync"
2017-02-04 02:53:52 -08:00
"sync/atomic"
"time"
2017-05-17 07:43:01 -07:00
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/wal"
2016-12-04 04:16:11 -08:00
)
var (
// ErrInvalidSample is returned if an appended sample is not valid and can't
// be ingested.
ErrInvalidSample = errors.New("invalid sample")
)
// Head handles reads and writes of time series data within a time window.
type Head struct {
// Keep all 64bit atomically accessed variables at the top of this struct.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info.
chunkRange int64
numSeries uint64
minTime, maxTime int64 // Current min and max of the samples included in the head.
minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
lastSeriesID uint64
metrics *headMetrics
wal *wal.WAL
logger log.Logger
appendPool sync.Pool
seriesPool sync.Pool
bytesPool sync.Pool
2017-02-04 02:53:52 -08:00
// All series addressable by their ID or hash.
series *stripeSeries
2016-12-21 16:12:28 -08:00
symMtx sync.RWMutex
symbols map[string]struct{}
values map[string]stringset // Label names to possible values.
deletedMtx sync.Mutex
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
postings *index.MemPostings // Postings lists for terms.
tombstones *tombstones.MemTombstones
iso *isolation
cardinalityMutex sync.Mutex
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec.
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
2016-12-04 04:16:11 -08:00
}
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.GaugeFunc
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
chunks prometheus.Gauge
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
headTruncateFail prometheus.Counter
headTruncateTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
}
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m := &headMetrics{
activeAppenders: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_active_appenders",
Help: "Number of currently active appender transactions",
}),
series: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.",
}, func() float64 {
return float64(h.NumSeries())
}),
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head",
}),
seriesRemoved: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_removed_total",
Help: "Total number of series removed in the head",
}),
seriesNotFound: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_not_found_total",
Help: "Total number of requests for series that were not found.",
}),
chunks: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_chunks",
Help: "Total number of chunks in the head block.",
}),
chunksCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head",
}),
chunksRemoved: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head",
}),
gcDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
}),
walTruncateDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
}),
walCorruptionsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_corruptions_total",
Help: "Total number of WAL corruptions.",
}),
samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.",
}),
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
}),
headTruncateTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_total",
Help: "Total number of head truncations attempted.",
}),
checkpointDeleteFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
}),
checkpointDeleteTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
}),
checkpointCreationFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
}),
checkpointCreationTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
}),
}
if r != nil {
r.MustRegister(
m.activeAppenders,
m.series,
m.chunks,
m.chunksCreated,
m.chunksRemoved,
m.seriesCreated,
m.seriesRemoved,
m.seriesNotFound,
m.gcDuration,
m.walTruncateDuration,
m.walCorruptionsTotal,
m.samplesAppended,
m.headTruncateFail,
m.headTruncateTotal,
m.checkpointDeleteFail,
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
// Metrics bound to functions and not needed in tests
// can be created and registered on the spot.
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time",
Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MaxTime())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time",
Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MinTime())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_isolation_low_watermark",
Help: "The lowest TSDB append ID that is still referenced.",
}, func() float64 {
return float64(h.iso.lowWatermark())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_isolation_high_watermark",
Help: "The highest TSDB append ID that has been given out.",
}, func() float64 {
h.iso.appendMtx.Lock()
defer h.iso.appendMtx.Unlock()
return float64(h.iso.lastAppendID)
}),
)
}
return m
}
const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.
func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats {
h.cardinalityMutex.Lock()
defer h.cardinalityMutex.Unlock()
currentTime := time.Duration(time.Now().Unix()) * time.Second
seconds := currentTime - h.lastPostingsStatsCall
if seconds > cardinalityCacheExpirationTime {
h.cardinalityCache = nil
}
if h.cardinalityCache != nil {
return h.cardinalityCache
}
h.cardinalityCache = h.postings.Stats(statsByLabelName)
h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second
return h.cardinalityCache
}
// NewHead opens the head block in dir.
// stripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series.
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, stripeSize int) (*Head, error) {
if l == nil {
l = log.NewNopLogger()
}
if chunkRange < 1 {
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
}
h := &Head{
wal: wal,
logger: l,
chunkRange: chunkRange,
minTime: math.MaxInt64,
maxTime: math.MinInt64,
series: newStripeSeries(stripeSize),
values: map[string]stringset{},
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
tombstones: tombstones.NewMemTombstones(),
iso: newIsolation(),
deleted: map[uint64]int{},
2017-01-07 07:20:32 -08:00
}
h.metrics = newHeadMetrics(h, r)
2017-09-06 07:20:37 -07:00
return h, nil
}
2017-10-07 06:55:11 -07:00
// processWALSamples adds a partition of samples it receives to the head and passes
// them on to other workers.
// Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples(
minValidTime int64,
input <-chan []record.RefSample, output chan<- []record.RefSample,
2017-10-07 06:55:11 -07:00
) (unknownRefs uint64) {
defer close(output)
// Mitigate lock contention in getByID.
refSeries := map[uint64]*memSeries{}
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
2017-10-07 06:55:11 -07:00
for samples := range input {
for _, s := range samples {
if s.T < minValidTime {
2017-10-07 06:55:11 -07:00
continue
}
ms := refSeries[s.Ref]
2017-10-07 06:55:11 -07:00
if ms == nil {
ms = h.series.getByID(s.Ref)
if ms == nil {
unknownRefs++
continue
}
refSeries[s.Ref] = ms
2017-10-07 06:55:11 -07:00
}
if _, chunkCreated := ms.append(s.T, s.V, 0); chunkCreated {
2017-10-07 06:55:11 -07:00
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if s.T > maxt {
maxt = s.T
}
if s.T < mint {
mint = s.T
}
2017-10-07 06:55:11 -07:00
}
output <- samples
}
h.updateMinMaxTime(mint, maxt)
return unknownRefs
}
func (h *Head) updateMinMaxTime(mint, maxt int64) {
for {
lt := h.MinTime()
if mint >= lt {
break
}
if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) {
break
}
}
for {
ht := h.MaxTime()
if maxt <= ht {
break
}
if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) {
break
}
}
2017-10-07 06:55:11 -07:00
}
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
// Track number of samples that referenced a series we don't know about
// for error reporting.
2017-10-07 06:55:11 -07:00
var unknownRefs uint64
// Start workers that each process samples for a partition of the series ID space.
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []record.RefSample, n)
outputs = make([]chan []record.RefSample, n)
2017-10-07 06:55:11 -07:00
)
wg.Add(n)
defer func() {
// For CorruptionErr ensure to terminate all workers before exiting.
if _, ok := err.(*wal.CorruptionErr); ok {
for i := 0; i < n; i++ {
close(inputs[i])
for range outputs[i] {
}
}
wg.Wait()
}
}()
2017-10-07 06:55:11 -07:00
for i := 0; i < n; i++ {
outputs[i] = make(chan []record.RefSample, 300)
inputs[i] = make(chan []record.RefSample, 300)
2017-10-07 06:55:11 -07:00
go func(input <-chan []record.RefSample, output chan<- []record.RefSample) {
unknown := h.processWALSamples(h.minValidTime, input, output)
2017-10-07 06:55:11 -07:00
atomic.AddUint64(&unknownRefs, unknown)
wg.Done()
}(inputs[i], outputs[i])
2017-10-07 06:55:11 -07:00
}
var (
dec record.Decoder
shards = make([][]record.RefSample, n)
)
var (
decoded = make(chan interface{}, 10)
errCh = make(chan error, 1)
seriesPool = sync.Pool{
New: func() interface{} {
return []record.RefSeries{}
},
}
samplesPool = sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
tstonesPool = sync.Pool{
New: func() interface{} {
return []tombstones.Stone{}
},
}
)
go func() {
defer close(decoded)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get().([]record.RefSeries)[:0]
series, err = dec.Series(rec, series)
if err != nil {
errCh <- &wal.CorruptionErr{
Err: errors.Wrap(err, "decode series"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- series
case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
errCh <- &wal.CorruptionErr{
Err: errors.Wrap(err, "decode samples"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- samples
case record.Tombstones:
tstones := tstonesPool.Get().([]tombstones.Stone)[:0]
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
errCh <- &wal.CorruptionErr{
Err: errors.Wrap(err, "decode tombstones"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- tstones
default:
errCh <- &wal.CorruptionErr{
Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
}
}()
for d := range decoded {
switch v := d.(type) {
case []record.RefSeries:
for _, s := range v {
series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if !created {
// There's already a different ref for this series.
multiRef[s.Ref] = series.ref
}
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
}
}
//lint:ignore SA6002 relax staticcheck verification.
seriesPool.Put(v)
case []record.RefSample:
samples := v
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
m := 5000
if len(samples) < m {
m = len(samples)
}
for i := 0; i < n; i++ {
var buf []record.RefSample
select {
case buf = <-outputs[i]:
default:
}
shards[i] = buf[:0]
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := sam.Ref % uint64(n)
shards[mod] = append(shards[mod], sam)
}
for i := 0; i < n; i++ {
inputs[i] <- shards[i]
}
samples = samples[m:]
}
//lint:ignore SA6002 relax staticcheck verification.
samplesPool.Put(v)
case []tombstones.Stone:
for _, s := range v {
for _, itv := range s.Intervals {
if itv.Maxt < h.minValidTime {
continue
}
if m := h.series.getByID(s.Ref); m == nil {
unknownRefs++
continue
}
h.tombstones.AddInterval(s.Ref, itv)
2017-09-06 07:20:37 -07:00
}
}
//lint:ignore SA6002 relax staticcheck verification.
tstonesPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
}
select {
case err := <-errCh:
return err
default:
}
// Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ {
close(inputs[i])
for range outputs[i] {
}
2017-10-07 06:55:11 -07:00
}
wg.Wait()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
}
return nil
}
// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime = minValidTime
defer h.postings.EnsureOrder()
defer h.gc() // After loading the wal remove the obsolete data from the head.
if h.wal == nil {
return nil
}
level.Info(h.logger).Log("msg", "replaying WAL, this may take awhile")
start := time.Now()
// Backfill the checkpoint first if it exists.
dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir())
if err != nil && err != record.ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
}
multiRef := map[uint64]uint64{}
if err == nil {
sr, err := wal.NewSegmentsReader(dir)
if err != nil {
return errors.Wrap(err, "open checkpoint")
}
defer func() {
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
}()
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil {
return errors.Wrap(err, "backfill checkpoint")
}
startFrom++
level.Info(h.logger).Log("msg", "WAL checkpoint loaded")
}
// Find the last segment.
_, last, err := h.wal.Segments()
2017-10-07 06:55:11 -07:00
if err != nil {
return errors.Wrap(err, "finding WAL segments")
}
// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
}
sr := wal.NewSegmentBufReader(s)
err = h.loadWAL(wal.NewReader(sr), multiRef)
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
if err != nil {
return err
}
level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
2017-10-07 06:55:11 -07:00
}
level.Info(h.logger).Log("msg", "WAL replay completed", "duration", time.Since(start).String())
return nil
2017-05-13 09:14:18 -07:00
}
// Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) (err error) {
defer func() {
if err != nil {
h.metrics.headTruncateFail.Inc()
}
}()
initialize := h.MinTime() == math.MaxInt64
2017-09-06 07:20:37 -07:00
if h.MinTime() >= mint && !initialize {
2017-09-01 05:38:49 -07:00
return nil
}
atomic.StoreInt64(&h.minTime, mint)
atomic.StoreInt64(&h.minValidTime, mint)
// Ensure that max time is at least as high as min time.
for h.MaxTime() < mint {
atomic.CompareAndSwapInt64(&h.maxTime, h.MaxTime(), mint)
}
2017-09-06 07:20:37 -07:00
// This was an initial call to Truncate after loading blocks on startup.
// We haven't read back the WAL yet, so do not attempt to truncate it.
if initialize {
return nil
}
h.metrics.headTruncateTotal.Inc()
start := time.Now()
h.gc()
level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
if h.wal == nil {
return nil
}
start = time.Now()
first, last, err := h.wal.Segments()
if err != nil {
return errors.Wrap(err, "get segment range")
}
// Start a new segment, so low ingestion volume TSDB don't have more WAL than
// needed.
err = h.wal.NextSegment()
if err != nil {
return errors.Wrap(err, "next segment")
}
last-- // Never consider last segment for checkpoint.
if last < 0 {
return nil // no segments yet.
}
// The lower third of segments should contain mostly obsolete samples.
// If we have less than three segments, it's not worth checkpointing yet.
last = first + (last-first)/3
if last <= first {
return nil
}
keep := func(id uint64) bool {
if h.series.getByID(id) != nil {
return true
}
h.deletedMtx.Lock()
_, ok := h.deleted[id]
h.deletedMtx.Unlock()
return ok
}
h.metrics.checkpointCreationTotal.Inc()
if _, err = wal.Checkpoint(h.wal, first, last, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc()
return errors.Wrap(err, "create checkpoint")
}
if err := h.wal.Truncate(last + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}
// The checkpoint is written and segments before it is truncated, so we no
// longer need to track deleted series that are before it.
h.deletedMtx.Lock()
for ref, segment := range h.deleted {
if segment < first {
delete(h.deleted, ref)
}
}
h.deletedMtx.Unlock()
h.metrics.checkpointDeleteTotal.Inc()
if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
h.metrics.checkpointDeleteFail.Inc()
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
2017-09-01 05:38:49 -07:00
level.Info(h.logger).Log("msg", "WAL checkpoint complete",
"first", first, "last", last, "duration", time.Since(start))
2017-09-01 05:38:49 -07:00
return nil
}
// initTime initializes a head with the first timestamp. This only needs to be called
// for a completely fresh head with an empty WAL.
// Returns true if the initialization took an effect.
func (h *Head) initTime(t int64) (initialized bool) {
if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) {
return false
}
// Ensure that max time is initialized to at least the min time we just set.
// Concurrent appenders may already have set it to a higher value.
atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t)
return true
}
type RangeHead struct {
head *Head
mint, maxt int64
}
// NewRangeHead returns a *RangeHead.
func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
return &RangeHead{
head: head,
mint: mint,
maxt: maxt,
}
}
func (h *RangeHead) Index() (IndexReader, error) {
return h.head.indexRange(h.mint, h.maxt), nil
}
func (h *RangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil
}
func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
return h.head.tombstones, nil
}
func (h *RangeHead) MinTime() int64 {
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
return h.mint
}
func (h *RangeHead) MaxTime() int64 {
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
return h.maxt
}
func (h *RangeHead) NumSeries() uint64 {
return h.head.NumSeries()
}
func (h *RangeHead) Meta() BlockMeta {
return BlockMeta{
MinTime: h.MinTime(),
MaxTime: h.MaxTime(),
ULID: h.head.Meta().ULID,
Stats: BlockStats{
NumSeries: h.NumSeries(),
},
}
}
// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {
app storage.Appender
head *Head
appendID, cleanupAppendIDsBelow uint64
}
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if a.app != nil {
return a.app.Add(lset, t, v)
}
2017-09-01 03:09:29 -07:00
a.head.initTime(t)
a.app = a.head.appender(a.appendID, a.cleanupAppendIDsBelow)
2017-09-01 03:09:29 -07:00
return a.app.Add(lset, t, v)
}
func (a *initAppender) AddFast(ref uint64, t int64, v float64) error {
if a.app == nil {
return storage.ErrNotFound
}
return a.app.AddFast(ref, t, v)
}
func (a *initAppender) Commit() error {
if a.app == nil {
return nil
}
return a.app.Commit()
}
func (a *initAppender) Rollback() error {
if a.app == nil {
return nil
}
return a.app.Rollback()
}
// Appender returns a new Appender on the database.
func (h *Head) Appender() storage.Appender {
h.metrics.activeAppenders.Inc()
appendID := h.iso.newAppendID()
cleanupAppendIDsBelow := h.iso.lowWatermark()
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MinTime() == math.MaxInt64 {
return &initAppender{
head: h,
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
}
}
return h.appender(appendID, cleanupAppendIDsBelow)
}
func (h *Head) appender(appendID, cleanupAppendIDsBelow uint64) *headAppender {
return &headAppender{
head: h,
Spelling (#6517) * spelling: alertmanager Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: attributes Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: autocomplete Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: bootstrap Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: caught Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: chunkenc Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: compaction Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: corrupted Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: deletable Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: expected Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: fine-grained Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: initialized Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: iteration Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: javascript Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: multiple Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: number Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: overlapping Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: possible Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: postings Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: procedure Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: programmatic Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: queuing Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: querier Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: repairing Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: received Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: reproducible Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: retention Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: sample Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: segements Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: semantic Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: software [LICENSE] Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: staging Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: timestamp Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: unfortunately Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: uvarint Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: subsequently Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: ressamples Signed-off-by: Josh Soref <jsoref@users.noreply.github.com>
2020-01-02 06:54:09 -08:00
// Set the minimum valid time to whichever is greater the head min valid time or the compaction window.
// This ensures that no samples will be added within the compaction window to avoid races.
minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2),
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
sampleSeries: h.getSeriesBuffer(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
}
}
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
func (h *Head) getAppendBuffer() []record.RefSample {
b := h.appendPool.Get()
if b == nil {
return make([]record.RefSample, 0, 512)
}
return b.([]record.RefSample)
}
func (h *Head) putAppendBuffer(b []record.RefSample) {
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h.appendPool.Put(b[:0])
}
func (h *Head) getSeriesBuffer() []*memSeries {
b := h.seriesPool.Get()
if b == nil {
return make([]*memSeries, 0, 512)
}
return b.([]*memSeries)
}
func (h *Head) putSeriesBuffer(b []*memSeries) {
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h.seriesPool.Put(b[:0])
}
func (h *Head) getBytesBuffer() []byte {
b := h.bytesPool.Get()
if b == nil {
return make([]byte, 0, 1024)
}
return b.([]byte)
}
func (h *Head) putBytesBuffer(b []byte) {
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h.bytesPool.Put(b[:0])
}
type headAppender struct {
head *Head
minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64
series []record.RefSeries
samples []record.RefSample
sampleSeries []*memSeries
appendID, cleanupAppendIDsBelow uint64
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if t < a.minValidTime {
return 0, storage.ErrOutOfBounds
}
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if len(lset) == 0 {
return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
}
2017-09-18 03:28:56 -07:00
s, created := a.head.getOrCreate(lset.Hash(), lset)
if created {
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
return s.ref, a.AddFast(s.ref, t, v)
}
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if t < a.minValidTime {
return storage.ErrOutOfBounds
}
s := a.head.series.getByID(ref)
if s == nil {
return errors.Wrap(storage.ErrNotFound, "unknown series")
}
2017-09-07 23:48:19 -07:00
s.Lock()
if err := s.appendable(t, v); err != nil {
s.Unlock()
return err
}
s.pendingCommit = true
s.Unlock()
if t < a.mint {
a.mint = t
}
if t > a.maxt {
a.maxt = t
}
a.samples = append(a.samples, record.RefSample{
Ref: ref,
T: t,
V: v,
})
a.sampleSeries = append(a.sampleSeries, s)
return nil
}
func (a *headAppender) log() error {
if a.head.wal == nil {
return nil
}
buf := a.head.getBytesBuffer()
defer func() { a.head.putBytesBuffer(buf) }()
var rec []byte
var enc record.Encoder
if len(a.series) > 0 {
rec = enc.Series(a.series, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log series")
}
}
if len(a.samples) > 0 {
rec = enc.Samples(a.samples, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log samples")
}
}
return nil
}
func (a *headAppender) Commit() error {
if err := a.log(); err != nil {
//nolint: errcheck
a.Rollback() // Most likely the same error will happen again.
return errors.Wrap(err, "write to WAL")
}
defer a.head.metrics.activeAppenders.Dec()
defer a.head.putAppendBuffer(a.samples)
defer a.head.putSeriesBuffer(a.sampleSeries)
defer a.head.iso.closeAppend(a.appendID)
total := len(a.samples)
var series *memSeries
for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
ok, chunkCreated := series.append(s.T, s.V, a.appendID)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
2017-09-07 23:48:19 -07:00
if !ok {
total--
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
}
a.head.metrics.samplesAppended.Add(float64(total))
a.head.updateMinMaxTime(a.mint, a.maxt)
return nil
}
func (a *headAppender) Rollback() error {
defer a.head.metrics.activeAppenders.Dec()
defer a.head.iso.closeAppend(a.appendID)
defer a.head.putSeriesBuffer(a.sampleSeries)
var series *memSeries
for i := range a.samples {
series = a.sampleSeries[i]
series.Lock()
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
a.head.putAppendBuffer(a.samples)
a.samples = nil
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
return a.log()
}
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
// label matchers.
func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
// Do not delete anything beyond the currently valid range.
mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime())
ir := h.indexRange(mint, maxt)
p, err := PostingsForMatchers(ir, ms...)
if err != nil {
return errors.Wrap(err, "select series")
}
var stones []tombstones.Stone
for p.Next() {
series := h.series.getByID(p.At())
series.RLock()
2018-02-07 05:43:21 -08:00
t0, t1 := series.minTime(), series.maxTime()
series.RUnlock()
2018-02-07 05:43:21 -08:00
if t0 == math.MinInt64 || t1 == math.MinInt64 {
continue
}
// Delete only until the current values and not beyond.
2018-02-07 05:43:21 -08:00
t0, t1 = clampInterval(mint, maxt, t0, t1)
stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}})
}
if p.Err() != nil {
return p.Err()
}
if h.wal != nil {
var enc record.Encoder
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
return err
}
}
for _, s := range stones {
h.tombstones.AddInterval(s.Ref, s.Intervals[0])
}
return nil
}
// gc removes data before the minimum timestamp from the head.
func (h *Head) gc() {
// Only data strictly lower than this timestamp must be deleted.
mint := h.MinTime()
2017-01-19 05:01:38 -08:00
// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, chunksRemoved := h.series.gc(mint)
seriesRemoved := len(deleted)
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved))
// Using AddUint64 to subtract series removed.
// See: https://golang.org/pkg/sync/atomic/#AddUint64.
atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1))
// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)
if h.wal != nil {
_, last, _ := h.wal.Segments()
h.deletedMtx.Lock()
// Keep series records until we're past segment 'last'
// because the WAL will still have samples records with
// this ref ID. If we didn't keep these series records then
// on start up when we replay the WAL, or any other code
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted {
h.deleted[ref] = last
}
h.deletedMtx.Unlock()
}
// Rebuild symbols and label value indices from what is left in the postings terms.
symbols := make(map[string]struct{}, len(h.symbols))
values := make(map[string]stringset, len(h.values))
if err := h.postings.Iter(func(t labels.Label, _ index.Postings) error {
symbols[t.Name] = struct{}{}
symbols[t.Value] = struct{}{}
ss, ok := values[t.Name]
if !ok {
ss = stringset{}
values[t.Name] = ss
}
ss.set(t.Value)
return nil
}); err != nil {
// This should never happen, as the iteration function only returns nil.
panic(err)
}
h.symMtx.Lock()
h.symbols = symbols
h.values = values
2017-08-30 08:38:25 -07:00
h.symMtx.Unlock()
}
// Tombstones returns a new reader over the head's tombstones
func (h *Head) Tombstones() (tombstones.Reader, error) {
return h.tombstones, nil
}
// Index returns an IndexReader against the block.
func (h *Head) Index() (IndexReader, error) {
return h.indexRange(math.MinInt64, math.MaxInt64), nil
}
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
if hmin := h.MinTime(); hmin > mint {
mint = hmin
2017-01-12 11:00:36 -08:00
}
return &headIndexReader{head: h, mint: mint, maxt: maxt}
}
// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil
}
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReader {
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headChunkReader{
head: h,
mint: mint,
maxt: maxt,
isoState: is,
}
}
// NumSeries returns the number of active series in the head.
func (h *Head) NumSeries() uint64 {
return atomic.LoadUint64(&h.numSeries)
}
// Meta returns meta information about the head.
// The head is dynamic so will return dynamic results.
func (h *Head) Meta() BlockMeta {
var id [16]byte
copy(id[:], "______head______")
return BlockMeta{
MinTime: h.MinTime(),
MaxTime: h.MaxTime(),
ULID: ulid.ULID(id),
Stats: BlockStats{
NumSeries: h.NumSeries(),
},
}
}
// MinTime returns the lowest time bound on visible data in the head.
func (h *Head) MinTime() int64 {
return atomic.LoadInt64(&h.minTime)
}
// MaxTime returns the highest timestamp seen in data of the head.
func (h *Head) MaxTime() int64 {
return atomic.LoadInt64(&h.maxTime)
}
// compactable returns whether the head has a compactable range.
// The head has a compactable range when the head time range is 1.5 times the chunk range.
// The 0.5 acts as a buffer of the appendable window.
func (h *Head) compactable() bool {
return h.MaxTime()-h.MinTime() > h.chunkRange/2*3
}
// Close flushes the WAL and closes the head.
func (h *Head) Close() error {
if h.wal == nil {
return nil
}
return h.wal.Close()
}
type headChunkReader struct {
head *Head
mint, maxt int64
isoState *isolationState
}
func (h *headChunkReader) Close() error {
h.isoState.Close()
return nil
}
2017-09-04 07:08:38 -07:00
// packChunkID packs a seriesID and a chunkID within it into a global 8 byte ID.
// It panicks if the seriesID exceeds 5 bytes or the chunk ID 3 bytes.
func packChunkID(seriesID, chunkID uint64) uint64 {
if seriesID > (1<<40)-1 {
panic("series ID exceeds 5 bytes")
}
if chunkID > (1<<24)-1 {
panic("chunk ID exceeds 3 bytes")
}
return (seriesID << 24) | chunkID
}
func unpackChunkID(id uint64) (seriesID, chunkID uint64) {
return id >> 24, (id << 40) >> 40
}
// Chunk returns the chunk for the reference number.
func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
2017-09-04 07:08:38 -07:00
sid, cid := unpackChunkID(ref)
s := h.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, storage.ErrNotFound
}
2017-09-07 23:48:19 -07:00
s.Lock()
2017-09-04 07:08:38 -07:00
c := s.chunk(int(cid))
// This means that the chunk has been garbage collected or is outside
// the specified range.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, storage.ErrNotFound
}
2017-09-07 23:48:19 -07:00
s.Unlock()
return &safeChunk{
Chunk: c.chunk,
s: s,
cid: int(cid),
isoState: h.isoState,
}, nil
}
type safeChunk struct {
chunkenc.Chunk
s *memSeries
cid int
isoState *isolationState
}
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
2017-09-07 23:48:19 -07:00
c.s.Lock()
it := c.s.iterator(c.cid, c.isoState, reuseIter)
2017-09-07 23:48:19 -07:00
c.s.Unlock()
return it
}
type headIndexReader struct {
head *Head
mint, maxt int64
}
func (h *headIndexReader) Close() error {
return nil
}
Stream symbols during compaction. (#6468) Rather than buffer up symbols in RAM, do it one by one during compaction. Then use the reader's symbol handling for symbol lookups during the rest of the index write. There is some slowdown in compaction, due to having to look through a file rather than a hash lookup. This is noise to the overall cost of compacting series with thousands of samples though. benchmark old ns/op new ns/op delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35% benchmark old allocs new allocs delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59% benchmark old bytes new bytes delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-17 11:49:54 -08:00
func (h *headIndexReader) Symbols() index.StringIter {
h.head.symMtx.RLock()
Stream symbols during compaction. (#6468) Rather than buffer up symbols in RAM, do it one by one during compaction. Then use the reader's symbol handling for symbol lookups during the rest of the index write. There is some slowdown in compaction, due to having to look through a file rather than a hash lookup. This is noise to the overall cost of compacting series with thousands of samples though. benchmark old ns/op new ns/op delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35% benchmark old allocs new allocs delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59% benchmark old bytes new bytes delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-17 11:49:54 -08:00
res := make([]string, 0, len(h.head.symbols))
for s := range h.head.symbols {
Stream symbols during compaction. (#6468) Rather than buffer up symbols in RAM, do it one by one during compaction. Then use the reader's symbol handling for symbol lookups during the rest of the index write. There is some slowdown in compaction, due to having to look through a file rather than a hash lookup. This is noise to the overall cost of compacting series with thousands of samples though. benchmark old ns/op new ns/op delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35% benchmark old allocs new allocs delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59% benchmark old bytes new bytes delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-17 11:49:54 -08:00
res = append(res, s)
}
Stream symbols during compaction. (#6468) Rather than buffer up symbols in RAM, do it one by one during compaction. Then use the reader's symbol handling for symbol lookups during the rest of the index write. There is some slowdown in compaction, due to having to look through a file rather than a hash lookup. This is noise to the overall cost of compacting series with thousands of samples though. benchmark old ns/op new ns/op delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35% benchmark old allocs new allocs delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59% benchmark old bytes new bytes delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-17 11:49:54 -08:00
h.head.symMtx.RUnlock()
sort.Strings(res)
return index.NewStringListIter(res)
}
// LabelValues returns the possible label values
Replace StringTuples with []string Benchmarks show slight cpu/allocs improvements. benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Head/n="1"-4 269978625 235305110 -12.84% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 129739974 121646193 -6.24% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 123826274 122056253 -1.43% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 126962188 130038235 +2.42% BenchmarkPostingsForMatchers/Head/i=~".*"-4 6423653989 5991126455 -6.73% BenchmarkPostingsForMatchers/Head/i=~".+"-4 6934647521 7033370634 +1.42% BenchmarkPostingsForMatchers/Head/i=~""-4 1177781285 1121497736 -4.78% BenchmarkPostingsForMatchers/Head/i!=""-4 7033680256 7246094991 +3.02% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 293702332 287440212 -2.13% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 307628268 307039964 -0.19% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 512247746 480003862 -6.29% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 361199794 367066917 +1.62% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 478863761 476037784 -0.59% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 103394659 102902098 -0.48% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 482552781 475453903 -1.47% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 559257389 589297047 +5.37% BenchmarkPostingsForMatchers/Block/n="1"-4 36492 37012 +1.42% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 557788 611903 +9.70% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 554443 573814 +3.49% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 553227 553826 +0.11% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113855090 111707221 -1.89% BenchmarkPostingsForMatchers/Block/i=~".+"-4 133994674 136520728 +1.89% BenchmarkPostingsForMatchers/Block/i=~""-4 38138091 36299898 -4.82% BenchmarkPostingsForMatchers/Block/i!=""-4 28861213 27396723 -5.07% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112699941 110853868 -1.64% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 113198026 111389742 -1.60% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 28994069 27363804 -5.62% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 29709406 28589223 -3.77% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 134695119 135736971 +0.77% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 26783286 25826928 -3.57% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 134733254 134116739 -0.46% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 160713937 158802768 -1.19% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Head/n="1"-4 36 36 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 38 38 +0.00% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 38 38 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 42 40 -4.76% BenchmarkPostingsForMatchers/Head/i=~".*"-4 61 59 -3.28% BenchmarkPostingsForMatchers/Head/i=~".+"-4 100088 100087 -0.00% BenchmarkPostingsForMatchers/Head/i=~""-4 100053 100051 -0.00% BenchmarkPostingsForMatchers/Head/i!=""-4 100087 100085 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 44 42 -4.55% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 50 48 -4.00% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 100076 100074 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 100077 100075 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 100077 100074 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 11167 11165 -0.02% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 100082 100080 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 111265 111261 -0.00% BenchmarkPostingsForMatchers/Block/n="1"-4 6 6 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 11 11 +0.00% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 11 11 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 13 -13.33% BenchmarkPostingsForMatchers/Block/i=~".*"-4 12 10 -16.67% BenchmarkPostingsForMatchers/Block/i=~".+"-4 100040 100038 -0.00% BenchmarkPostingsForMatchers/Block/i=~""-4 100045 100043 -0.00% BenchmarkPostingsForMatchers/Block/i!=""-4 100041 100039 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 17 15 -11.76% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 23 21 -8.70% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 100046 100044 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 100050 100048 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 100049 100047 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 11150 11148 -0.02% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 100055 100053 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 111238 111234 -0.00% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Head/n="1"-4 10887816 10887817 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 5456648 5456648 +0.00% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 5456648 5456648 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 5456792 5456712 -0.00% BenchmarkPostingsForMatchers/Head/i=~".*"-4 258254408 258254328 -0.00% BenchmarkPostingsForMatchers/Head/i=~".+"-4 273912888 273912904 +0.00% BenchmarkPostingsForMatchers/Head/i=~""-4 17266680 17266600 -0.00% BenchmarkPostingsForMatchers/Head/i!=""-4 273912416 273912336 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 7062578 7062498 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 7062770 7062690 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 28152346 28152266 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 22721178 22721098 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 22721336 22721224 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 3623804 3623733 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 22721480 22721400 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 24816652 24816444 -0.00% BenchmarkPostingsForMatchers/Block/n="1"-4 296 296 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 424 424 +0.00% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 424 424 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 1544 1464 -5.18% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1606114 1606045 -0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 17264709 17264629 -0.00% BenchmarkPostingsForMatchers/Block/i=~""-4 17264780 17264696 -0.00% BenchmarkPostingsForMatchers/Block/i!=""-4 17264680 17264600 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1606253 1606165 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1606445 1606348 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 17264808 17264728 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 17264936 17264856 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 17264965 17264885 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3148262 3148182 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 17265141 17265061 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 20416944 20416784 -0.00% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2020-01-01 03:38:01 -08:00
func (h *headIndexReader) LabelValues(name string) ([]string, error) {
h.head.symMtx.RLock()
sl := make([]string, 0, len(h.head.values[name]))
for s := range h.head.values[name] {
sl = append(sl, s)
}
h.head.symMtx.RUnlock()
sort.Strings(sl)
Replace StringTuples with []string Benchmarks show slight cpu/allocs improvements. benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Head/n="1"-4 269978625 235305110 -12.84% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 129739974 121646193 -6.24% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 123826274 122056253 -1.43% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 126962188 130038235 +2.42% BenchmarkPostingsForMatchers/Head/i=~".*"-4 6423653989 5991126455 -6.73% BenchmarkPostingsForMatchers/Head/i=~".+"-4 6934647521 7033370634 +1.42% BenchmarkPostingsForMatchers/Head/i=~""-4 1177781285 1121497736 -4.78% BenchmarkPostingsForMatchers/Head/i!=""-4 7033680256 7246094991 +3.02% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 293702332 287440212 -2.13% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 307628268 307039964 -0.19% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 512247746 480003862 -6.29% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 361199794 367066917 +1.62% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 478863761 476037784 -0.59% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 103394659 102902098 -0.48% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 482552781 475453903 -1.47% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 559257389 589297047 +5.37% BenchmarkPostingsForMatchers/Block/n="1"-4 36492 37012 +1.42% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 557788 611903 +9.70% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 554443 573814 +3.49% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 553227 553826 +0.11% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113855090 111707221 -1.89% BenchmarkPostingsForMatchers/Block/i=~".+"-4 133994674 136520728 +1.89% BenchmarkPostingsForMatchers/Block/i=~""-4 38138091 36299898 -4.82% BenchmarkPostingsForMatchers/Block/i!=""-4 28861213 27396723 -5.07% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112699941 110853868 -1.64% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 113198026 111389742 -1.60% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 28994069 27363804 -5.62% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 29709406 28589223 -3.77% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 134695119 135736971 +0.77% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 26783286 25826928 -3.57% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 134733254 134116739 -0.46% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 160713937 158802768 -1.19% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Head/n="1"-4 36 36 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 38 38 +0.00% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 38 38 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 42 40 -4.76% BenchmarkPostingsForMatchers/Head/i=~".*"-4 61 59 -3.28% BenchmarkPostingsForMatchers/Head/i=~".+"-4 100088 100087 -0.00% BenchmarkPostingsForMatchers/Head/i=~""-4 100053 100051 -0.00% BenchmarkPostingsForMatchers/Head/i!=""-4 100087 100085 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 44 42 -4.55% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 50 48 -4.00% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 100076 100074 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 100077 100075 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 100077 100074 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 11167 11165 -0.02% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 100082 100080 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 111265 111261 -0.00% BenchmarkPostingsForMatchers/Block/n="1"-4 6 6 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 11 11 +0.00% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 11 11 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 13 -13.33% BenchmarkPostingsForMatchers/Block/i=~".*"-4 12 10 -16.67% BenchmarkPostingsForMatchers/Block/i=~".+"-4 100040 100038 -0.00% BenchmarkPostingsForMatchers/Block/i=~""-4 100045 100043 -0.00% BenchmarkPostingsForMatchers/Block/i!=""-4 100041 100039 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 17 15 -11.76% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 23 21 -8.70% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 100046 100044 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 100050 100048 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 100049 100047 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 11150 11148 -0.02% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 100055 100053 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 111238 111234 -0.00% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Head/n="1"-4 10887816 10887817 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 5456648 5456648 +0.00% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 5456648 5456648 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 5456792 5456712 -0.00% BenchmarkPostingsForMatchers/Head/i=~".*"-4 258254408 258254328 -0.00% BenchmarkPostingsForMatchers/Head/i=~".+"-4 273912888 273912904 +0.00% BenchmarkPostingsForMatchers/Head/i=~""-4 17266680 17266600 -0.00% BenchmarkPostingsForMatchers/Head/i!=""-4 273912416 273912336 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 7062578 7062498 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 7062770 7062690 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 28152346 28152266 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 22721178 22721098 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 22721336 22721224 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 3623804 3623733 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 22721480 22721400 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 24816652 24816444 -0.00% BenchmarkPostingsForMatchers/Block/n="1"-4 296 296 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 424 424 +0.00% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 424 424 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 1544 1464 -5.18% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1606114 1606045 -0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 17264709 17264629 -0.00% BenchmarkPostingsForMatchers/Block/i=~""-4 17264780 17264696 -0.00% BenchmarkPostingsForMatchers/Block/i!=""-4 17264680 17264600 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1606253 1606165 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1606445 1606348 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 17264808 17264728 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 17264936 17264856 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 17264965 17264885 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3148262 3148182 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 17265141 17265061 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 20416944 20416784 -0.00% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2020-01-01 03:38:01 -08:00
return sl, nil
}
// LabelNames returns all the unique label names present in the head.
func (h *headIndexReader) LabelNames() ([]string, error) {
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
labelNames := make([]string, 0, len(h.head.values))
for name := range h.head.values {
if name == "" {
continue
}
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
Reduce memory used by postings offset table. Rather than keeping the offset of each postings list, instead keep the nth offset of the offset of the posting list. As postings list offsets have always been sorted, we can then get to the closest entry before the one we want an iterate forwards. I haven't done much tuning on the 32 number, it was chosen to try not to read through more than a 4k page of data. Switch to a bulk interface for fetching postings. Use it to avoid having to re-read parts of the posting offset table when querying lots of it. For a index with what BenchmarkHeadPostingForMatchers uses RAM for r.postings drops from 3.79MB to 80.19kB or about 48x. Bytes allocated go down by 30%, and suprisingly CPU usage drops by 4-6% for typical queries too. benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Block/n="1"-4 35231 36673 +4.09% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 563380 540627 -4.04% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 536782 534186 -0.48% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 533990 541550 +1.42% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113374598 117969608 +4.05% BenchmarkPostingsForMatchers/Block/i=~".+"-4 146329884 139651442 -4.56% BenchmarkPostingsForMatchers/Block/i=~""-4 50346510 44961127 -10.70% BenchmarkPostingsForMatchers/Block/i!=""-4 41261550 35356165 -14.31% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112544418 116904010 +3.87% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 112487086 116864918 +3.89% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 41094758 35457904 -13.72% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 41906372 36151473 -13.73% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 147262414 140424800 -4.64% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 28615629 27872072 -2.60% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 147117177 140462403 -4.52% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 175096826 167902298 -4.11% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Block/n="1"-4 4 6 +50.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 17 +13.33% BenchmarkPostingsForMatchers/Block/i=~".*"-4 100010 100012 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 200069 200040 -0.01% BenchmarkPostingsForMatchers/Block/i=~""-4 200072 200045 -0.01% BenchmarkPostingsForMatchers/Block/i!=""-4 200070 200041 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 100013 100017 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 100017 100023 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 200073 200046 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 200075 200050 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 200074 200049 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 111165 111150 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 200078 200055 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 311282 311238 -0.01% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Block/n="1"-4 264 296 +12.12% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 520 552 +6.15% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1600461 1600482 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 24900801 17259077 -30.69% BenchmarkPostingsForMatchers/Block/i=~""-4 24900836 17259151 -30.69% BenchmarkPostingsForMatchers/Block/i!=""-4 24900760 17259048 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1600557 1600621 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1600717 1600813 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 24900856 17259176 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 24900952 17259304 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 24900993 17259333 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3788311 3142630 -17.04% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 24901137 17259509 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 28693086 20405680 -28.88% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-05 10:27:40 -08:00
// Postings returns the postings list iterator for the label pairs.
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
res := make([]index.Postings, 0, len(values))
for _, value := range values {
res = append(res, h.head.postings.Get(name, value))
Reduce memory used by postings offset table. Rather than keeping the offset of each postings list, instead keep the nth offset of the offset of the posting list. As postings list offsets have always been sorted, we can then get to the closest entry before the one we want an iterate forwards. I haven't done much tuning on the 32 number, it was chosen to try not to read through more than a 4k page of data. Switch to a bulk interface for fetching postings. Use it to avoid having to re-read parts of the posting offset table when querying lots of it. For a index with what BenchmarkHeadPostingForMatchers uses RAM for r.postings drops from 3.79MB to 80.19kB or about 48x. Bytes allocated go down by 30%, and suprisingly CPU usage drops by 4-6% for typical queries too. benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Block/n="1"-4 35231 36673 +4.09% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 563380 540627 -4.04% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 536782 534186 -0.48% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 533990 541550 +1.42% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113374598 117969608 +4.05% BenchmarkPostingsForMatchers/Block/i=~".+"-4 146329884 139651442 -4.56% BenchmarkPostingsForMatchers/Block/i=~""-4 50346510 44961127 -10.70% BenchmarkPostingsForMatchers/Block/i!=""-4 41261550 35356165 -14.31% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112544418 116904010 +3.87% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 112487086 116864918 +3.89% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 41094758 35457904 -13.72% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 41906372 36151473 -13.73% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 147262414 140424800 -4.64% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 28615629 27872072 -2.60% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 147117177 140462403 -4.52% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 175096826 167902298 -4.11% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Block/n="1"-4 4 6 +50.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 17 +13.33% BenchmarkPostingsForMatchers/Block/i=~".*"-4 100010 100012 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 200069 200040 -0.01% BenchmarkPostingsForMatchers/Block/i=~""-4 200072 200045 -0.01% BenchmarkPostingsForMatchers/Block/i!=""-4 200070 200041 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 100013 100017 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 100017 100023 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 200073 200046 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 200075 200050 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 200074 200049 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 111165 111150 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 200078 200055 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 311282 311238 -0.01% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Block/n="1"-4 264 296 +12.12% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 520 552 +6.15% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1600461 1600482 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 24900801 17259077 -30.69% BenchmarkPostingsForMatchers/Block/i=~""-4 24900836 17259151 -30.69% BenchmarkPostingsForMatchers/Block/i!=""-4 24900760 17259048 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1600557 1600621 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1600717 1600813 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 24900856 17259176 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 24900952 17259304 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 24900993 17259333 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3788311 3142630 -17.04% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 24901137 17259509 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 28693086 20405680 -28.88% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-05 10:27:40 -08:00
}
return index.Merge(res...), nil
}
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)
// Fetch all the series only once.
for p.Next() {
s := h.head.series.getByID(p.At())
if s == nil {
level.Debug(h.head.logger).Log("msg", "looked up series not found")
} else {
series = append(series, s)
}
}
if err := p.Err(); err != nil {
return index.ErrPostings(errors.Wrap(err, "expand postings"))
}
sort.Slice(series, func(i, j int) bool {
return labels.Compare(series[i].lset, series[j].lset) < 0
})
// Convert back to list.
ep := make([]uint64, 0, len(series))
for _, p := range series {
ep = append(ep, p.ref)
}
return index.NewListPostings(ep)
}
// Series returns the series for the given reference.
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
s := h.head.series.getByID(ref)
2017-05-17 07:43:01 -07:00
if s == nil {
h.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
2017-05-17 07:43:01 -07:00
}
*lbls = append((*lbls)[:0], s.lset...)
2017-09-07 23:48:19 -07:00
s.Lock()
defer s.Unlock()
*chks = (*chks)[:0]
for i, c := range s.chunks {
// Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
}
// Set the head chunks as open (being appended to).
maxTime := c.maxTime
if s.headChunk == c {
maxTime = math.MaxInt64
}
*chks = append(*chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: maxTime,
2017-09-04 07:08:38 -07:00
Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
})
2017-01-03 06:43:26 -08:00
}
return nil
}
2017-09-18 03:28:56 -07:00
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
// Just using `getOrSet` below would be semantically sufficient, but we'd create
// a new series on every sample inserted via Add(), which causes allocations
// and makes our series IDs rather random and harder to compress in postings.
s := h.series.getByHash(hash, lset)
if s != nil {
return s, false
}
// Optimistically assume that we are the first one to create the series.
2017-09-04 07:08:38 -07:00
id := atomic.AddUint64(&h.lastSeriesID, 1)
return h.getOrCreateWithID(id, hash, lset)
}
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
s := newMemSeries(lset, id, h.chunkRange)
2016-12-21 16:12:28 -08:00
s, created := h.series.getOrSet(hash, s)
if !created {
2017-09-18 03:28:56 -07:00
return s, false
}
2017-09-18 03:28:56 -07:00
h.metrics.seriesCreated.Inc()
atomic.AddUint64(&h.numSeries, 1)
2017-09-18 03:28:56 -07:00
h.postings.Add(id, lset)
h.symMtx.Lock()
defer h.symMtx.Unlock()
2016-12-21 16:12:28 -08:00
for _, l := range lset {
valset, ok := h.values[l.Name]
if !ok {
valset = stringset{}
h.values[l.Name] = valset
}
valset.set(l.Value)
2017-01-03 06:43:26 -08:00
h.symbols[l.Name] = struct{}{}
h.symbols[l.Value] = struct{}{}
2016-12-21 16:12:28 -08:00
}
2017-01-03 06:43:26 -08:00
2017-09-18 03:28:56 -07:00
return s, true
}
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
// Its methods require the hash to be submitted with it to avoid re-computations throughout
// the code.
type seriesHashmap map[uint64][]*memSeries
2016-12-04 04:16:11 -08:00
func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
for _, s := range m[hash] {
if labels.Equal(s.lset, lset) {
return s
}
}
return nil
}
func (m seriesHashmap) set(hash uint64, s *memSeries) {
l := m[hash]
for i, prev := range l {
if labels.Equal(prev.lset, s.lset) {
l[i] = s
return
}
}
m[hash] = append(l, s)
}
func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
var rem []*memSeries
for _, s := range m[hash] {
if !labels.Equal(s.lset, lset) {
rem = append(rem, s)
}
}
if len(rem) == 0 {
delete(m, hash)
} else {
m[hash] = rem
}
}
const (
// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
DefaultStripeSize = 1 << 14
)
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention.
// The locks are padded to not be on the same cache line. Filling the padded space
// with the maps was profiled to be slower likely due to the additional pointer
// dereferences.
type stripeSeries struct {
size int
series []map[uint64]*memSeries
hashes []seriesHashmap
locks []stripeLock
}
type stripeLock struct {
sync.RWMutex
// Padding to avoid multiple locks being on the same cache line.
_ [40]byte
}
func newStripeSeries(stripeSize int) *stripeSeries {
s := &stripeSeries{
size: stripeSize,
series: make([]map[uint64]*memSeries, stripeSize),
hashes: make([]seriesHashmap, stripeSize),
locks: make([]stripeLock, stripeSize),
}
for i := range s.series {
s.series[i] = map[uint64]*memSeries{}
}
for i := range s.hashes {
s.hashes[i] = seriesHashmap{}
}
return s
2016-12-04 04:16:11 -08:00
}
// gc garbage collects old chunks that are strictly before mint and removes
// series entirely that have no chunks left.
func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
var (
deleted = map[uint64]struct{}{}
rmChunks = 0
)
// Run through all series and truncate old chunks. Mark those with no
2017-09-06 07:20:37 -07:00
// chunks left as deleted and store their ID.
for i := 0; i < s.size; i++ {
s.locks[i].Lock()
for hash, all := range s.hashes[i] {
for _, series := range all {
2017-09-07 23:48:19 -07:00
series.Lock()
rmChunks += series.truncateChunksBefore(mint)
if len(series.chunks) > 0 || series.pendingCommit {
2017-09-07 23:48:19 -07:00
series.Unlock()
continue
}
// The series is gone entirely. We need to keep the series lock
// and make sure we have acquired the stripe locks for hash and ID of the
// series alike.
// If we don't hold them all, there's a very small chance that a series receives
// samples again while we are half-way into deleting it.
j := int(series.ref) & (s.size - 1)
if i != j {
s.locks[j].Lock()
}
deleted[series.ref] = struct{}{}
s.hashes[i].del(hash, series.lset)
delete(s.series[j], series.ref)
if i != j {
s.locks[j].Unlock()
}
2017-09-07 23:48:19 -07:00
series.Unlock()
}
}
s.locks[i].Unlock()
}
return deleted, rmChunks
}
func (s *stripeSeries) getByID(id uint64) *memSeries {
i := id & uint64(s.size-1)
s.locks[i].RLock()
series := s.series[i][id]
s.locks[i].RUnlock()
return series
}
func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
i := hash & uint64(s.size-1)
s.locks[i].RLock()
series := s.hashes[i].get(hash, lset)
s.locks[i].RUnlock()
return series
}
func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
i := hash & uint64(s.size-1)
s.locks[i].Lock()
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
2017-09-18 02:23:22 -07:00
s.locks[i].Unlock()
return prev, false
}
s.hashes[i].set(hash, series)
s.locks[i].Unlock()
i = series.ref & uint64(s.size-1)
s.locks[i].Lock()
s.series[i][series.ref] = series
s.locks[i].Unlock()
return series, true
}
type sample struct {
t int64
v float64
}
func (s sample) T() int64 {
return s.t
}
func (s sample) V() float64 {
return s.v
}
2017-09-07 23:48:19 -07:00
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and it is the caller's responsibility to lock it.
type memSeries struct {
sync.RWMutex
2017-09-04 07:08:38 -07:00
ref uint64
lset labels.Labels
chunks []*memChunk
headChunk *memChunk
chunkRange int64
firstChunkID int
nextAt int64 // Timestamp at which to cut the next chunk.
sampleBuf [4]sample
pendingCommit bool // Whether there are samples waiting to be committed to this series.
app chunkenc.Appender // Current appender for the chunk.
txs *txRing
}
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
txs: newTxRing(4),
}
return s
}
func (s *memSeries) minTime() int64 {
2018-02-07 05:43:21 -08:00
if len(s.chunks) == 0 {
return math.MinInt64
}
return s.chunks[0].minTime
}
func (s *memSeries) maxTime() int64 {
2018-02-07 05:43:21 -08:00
c := s.head()
if c == nil {
return math.MinInt64
}
return c.maxTime
}
func (s *memSeries) cut(mint int64) *memChunk {
c := &memChunk{
chunk: chunkenc.NewXORChunk(),
minTime: mint,
maxTime: math.MinInt64,
}
s.chunks = append(s.chunks, c)
s.headChunk = c
// Remove exceeding capacity from the previous chunk byte slice to save memory.
if l := len(s.chunks); l > 1 {
s.chunks[l-2].chunk.Compact()
}
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
s.nextAt = rangeForTimestamp(mint, s.chunkRange)
app, err := c.chunk.Appender()
if err != nil {
panic(err)
}
s.app = app
return c
}
// appendable checks whether the given sample is valid for appending to the series.
func (s *memSeries) appendable(t int64, v float64) error {
2017-09-01 05:38:49 -07:00
c := s.head()
if c == nil {
return nil
}
if t > c.maxTime {
return nil
}
if t < c.maxTime {
return storage.ErrOutOfOrderSample
}
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
return storage.ErrDuplicateSampleForTimestamp
}
return nil
}
func (s *memSeries) chunk(id int) *memChunk {
2017-09-01 05:38:49 -07:00
ix := id - s.firstChunkID
if ix < 0 || ix >= len(s.chunks) {
return nil
}
return s.chunks[ix]
}
func (s *memSeries) chunkID(pos int) int {
return pos + s.firstChunkID
}
// truncateChunksBefore removes all chunks from the series that have not timestamp
// at or after mint. Chunk IDs remain unchanged.
2017-08-30 08:38:25 -07:00
func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
var k int
for i, c := range s.chunks {
if c.maxTime >= mint {
break
}
k = i + 1
}
s.chunks = append(s.chunks[:0], s.chunks[k:]...)
s.firstChunkID += k
if len(s.chunks) == 0 {
s.headChunk = nil
} else {
s.headChunk = s.chunks[len(s.chunks)-1]
}
2017-08-30 08:38:25 -07:00
return k
}
// append adds the sample (t, v) to the series. The caller also has to provide
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
const samplesPerChunk = 120
2017-09-01 05:38:49 -07:00
c := s.head()
2017-09-01 05:38:49 -07:00
if c == nil {
c = s.cut(t)
2017-08-30 08:38:25 -07:00
chunkCreated = true
}
2017-10-07 06:55:11 -07:00
numSamples := c.chunk.NumSamples()
// Out of order sample.
if c.maxTime >= t {
2017-08-30 08:38:25 -07:00
return false, chunkCreated
}
// If we reach 25% of a chunk's desired sample count, set a definitive time
// at which to start the next chunk.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
}
if t >= s.nextAt {
c = s.cut(t)
chunkCreated = true
}
s.app.Append(t, v)
c.maxTime = t
s.sampleBuf[0] = s.sampleBuf[1]
s.sampleBuf[1] = s.sampleBuf[2]
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}
if appendID > 0 {
s.txs.add(appendID)
}
2017-08-30 08:38:25 -07:00
return true, chunkCreated
}
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
// acquiring lock.
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
s.txs.cleanupAppendIDsBelow(bound)
}
// computeChunkEndTime estimates the end timestamp based the beginning of a
// chunk, its current timestamp and the upper bound up to which we insert data.
// It assumes that the time range is 1/4 full.
func computeChunkEndTime(start, cur, max int64) int64 {
a := (max - start) / ((cur - start + 1) * 4)
if a == 0 {
return max
}
return start + (max-start)/a
}
func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator {
c := s.chunk(id)
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
// readers still hold a reference.
if c == nil {
return chunkenc.NewNopIterator()
}
ix := id - s.firstChunkID
numSamples := c.chunk.NumSamples()
stopAfter := numSamples
if isoState != nil {
totalSamples := 0 // Total samples in this series.
previousSamples := 0 // Samples before this chunk.
for j, d := range s.chunks {
totalSamples += d.chunk.NumSamples()
if j < ix {
previousSamples += d.chunk.NumSamples()
}
}
// Removing the extra transactionIDs that are relevant for samples that
// come after this chunk, from the total transactionIDs.
appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples))
// Iterate over the appendIDs, find the first one that the isolation state says not
// to return.
it := s.txs.iterator()
for index := 0; index < appendIDsToConsider; index++ {
appendID := it.At()
if appendID <= isoState.maxAppendID { // Easy check first.
if _, ok := isoState.incompleteAppends[appendID]; !ok {
it.Next()
continue
}
}
stopAfter = numSamples - (appendIDsToConsider - index)
if stopAfter < 0 {
stopAfter = 0 // Stopped in a previous chunk.
}
break
}
}
if stopAfter == 0 {
return chunkenc.NewNopIterator()
}
if id-s.firstChunkID < len(s.chunks)-1 {
if stopAfter == numSamples {
return c.chunk.Iterator(it)
}
if msIter, ok := it.(*stopIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.stopAfter = stopAfter
return msIter
}
return &stopIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
stopAfter: stopAfter,
}
}
// Serve the last 4 samples for the last chunk from the sample buffer
// as their compressed bytes may be mutated by added samples.
if msIter, ok := it.(*memSafeIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.total = numSamples
msIter.stopAfter = stopAfter
msIter.buf = s.sampleBuf
return msIter
}
return &memSafeIterator{
stopIterator: stopIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
stopAfter: stopAfter,
},
total: numSamples,
buf: s.sampleBuf,
}
}
func (s *memSeries) head() *memChunk {
return s.headChunk
}
type memChunk struct {
chunk chunkenc.Chunk
minTime, maxTime int64
}
// Returns true if the chunk overlaps [mint, maxt].
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}
type stopIterator struct {
chunkenc.Iterator
i, stopAfter int
}
func (it *stopIterator) Next() bool {
if it.i+1 >= it.stopAfter {
return false
}
it.i++
return it.Iterator.Next()
}
type memSafeIterator struct {
stopIterator
total int
buf [4]sample
}
func (it *memSafeIterator) Next() bool {
if it.i+1 >= it.stopAfter {
return false
}
it.i++
if it.total-it.i > 4 {
return it.Iterator.Next()
}
return true
}
func (it *memSafeIterator) At() (int64, float64) {
if it.total-it.i > 4 {
return it.Iterator.At()
}
s := it.buf[4-(it.total-it.i)]
return s.t, s.v
}
type stringset map[string]struct{}
func (ss stringset) set(s string) {
ss[s] = struct{}{}
}
func (ss stringset) String() string {
return strings.Join(ss.slice(), ",")
}
func (ss stringset) slice() []string {
slice := make([]string, 0, len(ss))
for k := range ss {
slice = append(slice, k)
}
sort.Strings(slice)
return slice
}