mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 17:14:05 -08:00
vendor: update TSDB
This commit is contained in:
parent
3bee362727
commit
2037778d14
|
@ -56,11 +56,21 @@ type Options struct {
|
||||||
|
|
||||||
// Open returns a new storage backed by a TSDB database that is configured for Prometheus.
|
// Open returns a new storage backed by a TSDB database that is configured for Prometheus.
|
||||||
func Open(path string, r prometheus.Registerer, opts *Options) (*tsdb.DB, error) {
|
func Open(path string, r prometheus.Registerer, opts *Options) (*tsdb.DB, error) {
|
||||||
|
// Start with smallest block duration and create exponential buckets until the exceed the
|
||||||
|
// configured maximum block duration.
|
||||||
|
rngs := tsdb.ExponentialBlockRanges(int64(time.Duration(opts.MinBlockDuration).Seconds()*1000), 3, 10)
|
||||||
|
|
||||||
|
for i, v := range rngs {
|
||||||
|
if v > int64(time.Duration(opts.MaxBlockDuration).Seconds()*1000) {
|
||||||
|
rngs = rngs[:i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
db, err := tsdb.Open(path, nil, r, &tsdb.Options{
|
db, err := tsdb.Open(path, nil, r, &tsdb.Options{
|
||||||
WALFlushInterval: 10 * time.Second,
|
WALFlushInterval: 10 * time.Second,
|
||||||
MinBlockDuration: uint64(time.Duration(opts.MinBlockDuration).Seconds() * 1000),
|
|
||||||
MaxBlockDuration: uint64(time.Duration(opts.MaxBlockDuration).Seconds() * 1000),
|
|
||||||
RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000),
|
RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000),
|
||||||
|
BlockRanges: rngs,
|
||||||
NoLockfile: opts.NoLockfile,
|
NoLockfile: opts.NoLockfile,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
9
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
9
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
|
@ -251,9 +251,12 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
// Choose only valid postings which have chunks in the time-range.
|
// Choose only valid postings which have chunks in the time-range.
|
||||||
stones := map[uint32]intervals{}
|
stones := map[uint32]intervals{}
|
||||||
|
|
||||||
|
var lset labels.Labels
|
||||||
|
var chks []*ChunkMeta
|
||||||
|
|
||||||
Outer:
|
Outer:
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
lset, chunks, err := ir.Series(p.At())
|
err := ir.Series(p.At(), &lset, &chks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -264,10 +267,10 @@ Outer:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, chk := range chunks {
|
for _, chk := range chks {
|
||||||
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
||||||
// Delete only until the current vlaues and not beyond.
|
// Delete only until the current vlaues and not beyond.
|
||||||
tmin, tmax := clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime)
|
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
|
||||||
stones[p.At()] = intervals{{tmin, tmax}}
|
stones[p.At()] = intervals{{tmin, tmax}}
|
||||||
continue Outer
|
continue Outer
|
||||||
}
|
}
|
||||||
|
|
11
vendor/github.com/prometheus/tsdb/chunks/chunk.go
generated
vendored
11
vendor/github.com/prometheus/tsdb/chunks/chunk.go
generated
vendored
|
@ -13,10 +13,7 @@
|
||||||
|
|
||||||
package chunks
|
package chunks
|
||||||
|
|
||||||
import (
|
import "fmt"
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Encoding is the identifier for a chunk encoding.
|
// Encoding is the identifier for a chunk encoding.
|
||||||
type Encoding uint8
|
type Encoding uint8
|
||||||
|
@ -43,16 +40,14 @@ type Chunk interface {
|
||||||
Encoding() Encoding
|
Encoding() Encoding
|
||||||
Appender() (Appender, error)
|
Appender() (Appender, error)
|
||||||
Iterator() Iterator
|
Iterator() Iterator
|
||||||
|
NumSamples() int
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromData returns a chunk from a byte slice of chunk data.
|
// FromData returns a chunk from a byte slice of chunk data.
|
||||||
func FromData(e Encoding, d []byte) (Chunk, error) {
|
func FromData(e Encoding, d []byte) (Chunk, error) {
|
||||||
switch e {
|
switch e {
|
||||||
case EncXOR:
|
case EncXOR:
|
||||||
return &XORChunk{
|
return &XORChunk{b: &bstream{count: 0, stream: d}}, nil
|
||||||
b: &bstream{count: 0, stream: d},
|
|
||||||
num: binary.BigEndian.Uint16(d),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("unknown chunk encoding: %d", e)
|
return nil, fmt.Errorf("unknown chunk encoding: %d", e)
|
||||||
}
|
}
|
||||||
|
|
6
vendor/github.com/prometheus/tsdb/chunks/xor.go
generated
vendored
6
vendor/github.com/prometheus/tsdb/chunks/xor.go
generated
vendored
|
@ -53,7 +53,6 @@ import (
|
||||||
// XORChunk holds XOR encoded sample data.
|
// XORChunk holds XOR encoded sample data.
|
||||||
type XORChunk struct {
|
type XORChunk struct {
|
||||||
b *bstream
|
b *bstream
|
||||||
num uint16
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewXORChunk returns a new chunk with XOR encoding of the given size.
|
// NewXORChunk returns a new chunk with XOR encoding of the given size.
|
||||||
|
@ -72,6 +71,11 @@ func (c *XORChunk) Bytes() []byte {
|
||||||
return c.b.bytes()
|
return c.b.bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NumSamples returns the number of samples in the chunk.
|
||||||
|
func (c *XORChunk) NumSamples() int {
|
||||||
|
return int(binary.BigEndian.Uint16(c.Bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
// Appender implements the Chunk interface.
|
// Appender implements the Chunk interface.
|
||||||
func (c *XORChunk) Appender() (Appender, error) {
|
func (c *XORChunk) Appender() (Appender, error) {
|
||||||
it := c.iterator()
|
it := c.iterator()
|
||||||
|
|
183
vendor/github.com/prometheus/tsdb/compact.go
generated
vendored
183
vendor/github.com/prometheus/tsdb/compact.go
generated
vendored
|
@ -30,6 +30,18 @@ import (
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ExponentialBlockRanges returns the time ranges based on the stepSize
|
||||||
|
func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
|
||||||
|
ranges := make([]int64, 0, steps)
|
||||||
|
curRange := minSize
|
||||||
|
for i := 0; i < steps; i++ {
|
||||||
|
ranges = append(ranges, curRange)
|
||||||
|
curRange = curRange * int64(stepSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ranges
|
||||||
|
}
|
||||||
|
|
||||||
// Compactor provides compaction against an underlying storage
|
// Compactor provides compaction against an underlying storage
|
||||||
// of time series data.
|
// of time series data.
|
||||||
type Compactor interface {
|
type Compactor interface {
|
||||||
|
@ -87,7 +99,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactorOptions struct {
|
type compactorOptions struct {
|
||||||
maxBlockRange uint64
|
blockRanges []int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
||||||
|
@ -133,37 +145,113 @@ func (c *compactor) Plan() ([][]string, error) {
|
||||||
return dms[i].meta.MinTime < dms[j].meta.MinTime
|
return dms[i].meta.MinTime < dms[j].meta.MinTime
|
||||||
})
|
})
|
||||||
|
|
||||||
if len(dms) == 0 {
|
if len(dms) <= 1 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sliceDirs := func(i, j int) [][]string {
|
sliceDirs := func(dms []dirMeta) [][]string {
|
||||||
|
if len(dms) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
var res []string
|
var res []string
|
||||||
for k := i; k < j; k++ {
|
for _, dm := range dms {
|
||||||
res = append(res, dms[k].dir)
|
res = append(res, dm.dir)
|
||||||
}
|
}
|
||||||
return [][]string{res}
|
return [][]string{res}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then we care about compacting multiple blocks, starting with the oldest.
|
planDirs := sliceDirs(c.selectDirs(dms))
|
||||||
for i := 0; i < len(dms)-compactionBlocksLen+1; i++ {
|
if len(dirs) > 1 {
|
||||||
if c.match(dms[i : i+3]) {
|
return planDirs, nil
|
||||||
return sliceDirs(i, i+compactionBlocksLen), nil
|
}
|
||||||
|
|
||||||
|
// Compact any blocks that have >5% tombstones.
|
||||||
|
for i := len(dms) - 1; i >= 0; i-- {
|
||||||
|
meta := dms[i].meta
|
||||||
|
if meta.MaxTime-meta.MinTime < c.opts.blockRanges[len(c.opts.blockRanges)/2] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5%
|
||||||
|
return [][]string{{dms[i].dir}}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) match(dirs []dirMeta) bool {
|
// selectDirs returns the dir metas that should be compacted into a single new block.
|
||||||
g := dirs[0].meta.Compaction.Generation
|
// If only a single block range is configured, the result is always nil.
|
||||||
|
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
||||||
|
if len(c.opts.blockRanges) < 2 || len(ds) < 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
for _, d := range dirs {
|
highTime := ds[len(ds)-1].meta.MinTime
|
||||||
if d.meta.Compaction.Generation != g {
|
|
||||||
return false
|
for _, iv := range c.opts.blockRanges[1:] {
|
||||||
|
parts := splitByRange(ds, iv)
|
||||||
|
if len(parts) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range parts {
|
||||||
|
mint := p[0].meta.MinTime
|
||||||
|
maxt := p[len(p)-1].meta.MaxTime
|
||||||
|
// Pick the range of blocks if it spans the full range (potentially with gaps)
|
||||||
|
// or is before the most recent block.
|
||||||
|
// This ensures we don't compact blocks prematurely when another one of the same
|
||||||
|
// size still fits in the range.
|
||||||
|
if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 {
|
||||||
|
return p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitByRange splits the directories by the time range. The range sequence starts at 0.
|
||||||
|
//
|
||||||
|
// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
|
||||||
|
// it returns [0-10, 10-20], [50-60], [90-100].
|
||||||
|
func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
||||||
|
var splitDirs [][]dirMeta
|
||||||
|
|
||||||
|
for i := 0; i < len(ds); {
|
||||||
|
var (
|
||||||
|
group []dirMeta
|
||||||
|
t0 int64
|
||||||
|
m = ds[i].meta
|
||||||
|
)
|
||||||
|
// Compute start of aligned time range of size tr closest to the current block's start.
|
||||||
|
if m.MinTime >= 0 {
|
||||||
|
t0 = tr * (m.MinTime / tr)
|
||||||
|
} else {
|
||||||
|
t0 = tr * ((m.MinTime - tr + 1) / tr)
|
||||||
|
}
|
||||||
|
// Skip blocks that don't fall into the range. This can happen via mis-alignment or
|
||||||
|
// by being the multiple of the intended range.
|
||||||
|
if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr {
|
||||||
|
i++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add all dirs to the current group that are within [t0, t0+tr].
|
||||||
|
for ; i < len(ds); i++ {
|
||||||
|
// Either the block falls into the next range or doesn't fit at all (checked above).
|
||||||
|
if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
group = append(group, ds[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(group) > 0 {
|
||||||
|
splitDirs = append(splitDirs, group)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return splitDirs
|
||||||
}
|
}
|
||||||
|
|
||||||
func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
||||||
|
@ -173,8 +261,6 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
||||||
sources := map[ulid.ULID]struct{}{}
|
sources := map[ulid.ULID]struct{}{}
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
res.Stats.NumSamples += b.Stats.NumSamples
|
|
||||||
|
|
||||||
if b.Compaction.Generation > res.Compaction.Generation {
|
if b.Compaction.Generation > res.Compaction.Generation {
|
||||||
res.Compaction.Generation = b.Compaction.Generation
|
res.Compaction.Generation = b.Compaction.Generation
|
||||||
}
|
}
|
||||||
|
@ -312,17 +398,31 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
// populateBlock fills the index and chunk writers with new data gathered as the union
|
// populateBlock fills the index and chunk writers with new data gathered as the union
|
||||||
// of the provided blocks. It returns meta information for the new block.
|
// of the provided blocks. It returns meta information for the new block.
|
||||||
func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
||||||
var set compactionSet
|
var (
|
||||||
var metas []BlockMeta
|
set compactionSet
|
||||||
|
metas []BlockMeta
|
||||||
|
allSymbols = make(map[string]struct{}, 1<<16)
|
||||||
|
)
|
||||||
for i, b := range blocks {
|
for i, b := range blocks {
|
||||||
metas = append(metas, b.Meta())
|
metas = append(metas, b.Meta())
|
||||||
|
|
||||||
all, err := b.Index().Postings("", "")
|
symbols, err := b.Index().Symbols()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read symbols")
|
||||||
|
}
|
||||||
|
for s := range symbols {
|
||||||
|
allSymbols[s] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
indexr := b.Index()
|
||||||
|
|
||||||
|
all, err := indexr.Postings("", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all)
|
all = indexr.SortedPostings(all)
|
||||||
|
|
||||||
|
s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all)
|
||||||
|
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
set = s
|
set = s
|
||||||
|
@ -342,9 +442,18 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
||||||
meta = compactBlockMetas(metas...)
|
meta = compactBlockMetas(metas...)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if err := indexw.AddSymbols(allSymbols); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "add symbols")
|
||||||
|
}
|
||||||
|
|
||||||
for set.Next() {
|
for set.Next() {
|
||||||
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
|
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
|
||||||
|
|
||||||
|
// Skip the series with all deleted chunks.
|
||||||
|
if len(chks) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if len(dranges) > 0 {
|
if len(dranges) > 0 {
|
||||||
// Re-encode the chunk to not have deleted values.
|
// Re-encode the chunk to not have deleted values.
|
||||||
for _, chk := range chks {
|
for _, chk := range chks {
|
||||||
|
@ -370,10 +479,15 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
indexw.AddSeries(i, lset, chks...)
|
if err := indexw.AddSeries(i, lset, chks...); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "add series")
|
||||||
|
}
|
||||||
|
|
||||||
meta.Stats.NumChunks += uint64(len(chks))
|
meta.Stats.NumChunks += uint64(len(chks))
|
||||||
meta.Stats.NumSeries++
|
meta.Stats.NumSeries++
|
||||||
|
for _, chk := range chks {
|
||||||
|
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
||||||
|
}
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
valset, ok := values[l.Name]
|
valset, ok := values[l.Name]
|
||||||
|
@ -431,6 +545,7 @@ type compactionSeriesSet struct {
|
||||||
index IndexReader
|
index IndexReader
|
||||||
chunks ChunkReader
|
chunks ChunkReader
|
||||||
tombstones TombstoneReader
|
tombstones TombstoneReader
|
||||||
|
series SeriesSet
|
||||||
|
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []*ChunkMeta
|
c []*ChunkMeta
|
||||||
|
@ -451,11 +566,9 @@ func (c *compactionSeriesSet) Next() bool {
|
||||||
if !c.p.Next() {
|
if !c.p.Next() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
c.intervals = c.tombstones.Get(c.p.At())
|
c.intervals = c.tombstones.Get(c.p.At())
|
||||||
|
|
||||||
c.l, c.c, c.err = c.index.Series(c.p.At())
|
if c.err = c.index.Series(c.p.At(), &c.l, &c.c); c.err != nil {
|
||||||
if c.err != nil {
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -535,14 +648,24 @@ func (c *compactionMerger) Next() bool {
|
||||||
if !c.aok && !c.bok || c.Err() != nil {
|
if !c.aok && !c.bok || c.Err() != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
// While advancing child iterators the memory used for labels and chunks
|
||||||
|
// may be reused. When picking a series we have to store the result.
|
||||||
|
var lset labels.Labels
|
||||||
|
var chks []*ChunkMeta
|
||||||
|
|
||||||
d := c.compare()
|
d := c.compare()
|
||||||
// Both sets contain the current series. Chain them into a single one.
|
// Both sets contain the current series. Chain them into a single one.
|
||||||
if d > 0 {
|
if d > 0 {
|
||||||
c.l, c.c, c.intervals = c.b.At()
|
lset, chks, c.intervals = c.b.At()
|
||||||
|
c.l = append(c.l[:0], lset...)
|
||||||
|
c.c = append(c.c[:0], chks...)
|
||||||
|
|
||||||
c.bok = c.b.Next()
|
c.bok = c.b.Next()
|
||||||
} else if d < 0 {
|
} else if d < 0 {
|
||||||
c.l, c.c, c.intervals = c.a.At()
|
lset, chks, c.intervals = c.a.At()
|
||||||
|
c.l = append(c.l[:0], lset...)
|
||||||
|
c.c = append(c.c[:0], chks...)
|
||||||
|
|
||||||
c.aok = c.a.Next()
|
c.aok = c.a.Next()
|
||||||
} else {
|
} else {
|
||||||
l, ca, ra := c.a.At()
|
l, ca, ra := c.a.At()
|
||||||
|
@ -551,8 +674,8 @@ func (c *compactionMerger) Next() bool {
|
||||||
ra = ra.add(r)
|
ra = ra.add(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.l = l
|
c.l = append(c.l[:0], l...)
|
||||||
c.c = append(ca, cb...)
|
c.c = append(append(c.c[:0], ca...), cb...)
|
||||||
c.intervals = ra
|
c.intervals = ra
|
||||||
|
|
||||||
c.aok = c.a.Next()
|
c.aok = c.a.Next()
|
||||||
|
|
155
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
155
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
|
@ -45,8 +45,7 @@ import (
|
||||||
var DefaultOptions = &Options{
|
var DefaultOptions = &Options{
|
||||||
WALFlushInterval: 5 * time.Second,
|
WALFlushInterval: 5 * time.Second,
|
||||||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
|
||||||
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,12 +57,8 @@ type Options struct {
|
||||||
// Duration of persisted data to keep.
|
// Duration of persisted data to keep.
|
||||||
RetentionDuration uint64
|
RetentionDuration uint64
|
||||||
|
|
||||||
// The timestamp range of head blocks after which they get persisted.
|
// The sizes of the Blocks.
|
||||||
// It's the minimum duration of any persisted block.
|
BlockRanges []int64
|
||||||
MinBlockDuration uint64
|
|
||||||
|
|
||||||
// The maximum timestamp range of compacted blocks.
|
|
||||||
MaxBlockDuration uint64
|
|
||||||
|
|
||||||
// NoLockfile disables creation and consideration of a lock file.
|
// NoLockfile disables creation and consideration of a lock file.
|
||||||
NoLockfile bool
|
NoLockfile bool
|
||||||
|
@ -104,14 +99,13 @@ type DB struct {
|
||||||
metrics *dbMetrics
|
metrics *dbMetrics
|
||||||
opts *Options
|
opts *Options
|
||||||
|
|
||||||
// Mutex for that must be held when modifying the general
|
// Mutex for that must be held when modifying the general block layout.
|
||||||
// block layout.
|
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
blocks []Block
|
blocks []Block
|
||||||
|
|
||||||
// Mutex that must be held when modifying just the head blocks
|
// Mutex that must be held when modifying just the head blocks
|
||||||
// or the general layout.
|
// or the general layout.
|
||||||
// Must never be held when acquiring a blocks's mutex!
|
// mtx must be held before acquiring.
|
||||||
headmtx sync.RWMutex
|
headmtx sync.RWMutex
|
||||||
heads []headBlock
|
heads []headBlock
|
||||||
|
|
||||||
|
@ -123,7 +117,7 @@ type DB struct {
|
||||||
|
|
||||||
// cmtx is used to control compactions and deletions.
|
// cmtx is used to control compactions and deletions.
|
||||||
cmtx sync.Mutex
|
cmtx sync.Mutex
|
||||||
compacting bool
|
compactionsEnabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type dbMetrics struct {
|
type dbMetrics struct {
|
||||||
|
@ -208,7 +202,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
compactc: make(chan struct{}, 1),
|
compactc: make(chan struct{}, 1),
|
||||||
donec: make(chan struct{}),
|
donec: make(chan struct{}),
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
compacting: true,
|
compactionsEnabled: true,
|
||||||
}
|
}
|
||||||
db.metrics = newDBMetrics(db, r)
|
db.metrics = newDBMetrics(db, r)
|
||||||
|
|
||||||
|
@ -227,9 +221,24 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
db.lockf = &lockf
|
db.lockf = &lockf
|
||||||
}
|
}
|
||||||
|
|
||||||
db.compactor = newCompactor(dir, r, l, &compactorOptions{
|
copts := &compactorOptions{
|
||||||
maxBlockRange: opts.MaxBlockDuration,
|
blockRanges: opts.BlockRanges,
|
||||||
})
|
}
|
||||||
|
|
||||||
|
if len(copts.blockRanges) == 0 {
|
||||||
|
return nil, errors.New("at least one block-range must exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 {
|
||||||
|
if len(copts.blockRanges) == 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Max overflow is restricted to 20%.
|
||||||
|
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
db.compactor = newCompactor(dir, r, l, copts)
|
||||||
|
|
||||||
if err := db.reloadBlocks(); err != nil {
|
if err := db.reloadBlocks(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -315,37 +324,62 @@ func headFullness(h headBlock) float64 {
|
||||||
return a / b
|
return a / b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to.
|
||||||
|
func (db *DB) appendableHeads() (r []headBlock) {
|
||||||
|
switch l := len(db.heads); l {
|
||||||
|
case 0:
|
||||||
|
case 1:
|
||||||
|
r = append(r, db.heads[0])
|
||||||
|
default:
|
||||||
|
if headFullness(db.heads[l-1]) < 0.5 {
|
||||||
|
r = append(r, db.heads[l-2])
|
||||||
|
}
|
||||||
|
r = append(r, db.heads[l-1])
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) completedHeads() (r []headBlock) {
|
||||||
|
db.mtx.RLock()
|
||||||
|
defer db.mtx.RUnlock()
|
||||||
|
|
||||||
|
db.headmtx.RLock()
|
||||||
|
defer db.headmtx.RUnlock()
|
||||||
|
|
||||||
|
if len(db.heads) < 2 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Select all old heads unless they still have pending appenders.
|
||||||
|
for _, h := range db.heads[:len(db.heads)-2] {
|
||||||
|
if h.ActiveWriters() > 0 {
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
r = append(r, h)
|
||||||
|
}
|
||||||
|
// Add the 2nd last head if the last head is more than 50% filled.
|
||||||
|
// Compacting it early allows us to free its memory before allocating
|
||||||
|
// more for the next block and thus reduces spikes.
|
||||||
|
h0 := db.heads[len(db.heads)-1]
|
||||||
|
h1 := db.heads[len(db.heads)-2]
|
||||||
|
|
||||||
|
if headFullness(h0) >= 0.5 && h1.ActiveWriters() == 0 {
|
||||||
|
r = append(r, h1)
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) compact() (changes bool, err error) {
|
func (db *DB) compact() (changes bool, err error) {
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
db.headmtx.RLock()
|
if !db.compactionsEnabled {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Check whether we have pending head blocks that are ready to be persisted.
|
// Check whether we have pending head blocks that are ready to be persisted.
|
||||||
// They have the highest priority.
|
// They have the highest priority.
|
||||||
var singles []Block
|
for _, h := range db.completedHeads() {
|
||||||
|
|
||||||
// Collect head blocks that are ready for compaction. Write them after
|
|
||||||
// returning the lock to not block Appenders.
|
|
||||||
// Selected blocks are semantically ensured to not be written to afterwards
|
|
||||||
// by appendable().
|
|
||||||
if len(db.heads) > 1 {
|
|
||||||
f := headFullness(db.heads[len(db.heads)-1])
|
|
||||||
|
|
||||||
for _, h := range db.heads[:len(db.heads)-1] {
|
|
||||||
// Blocks that won't be appendable when instantiating a new appender
|
|
||||||
// might still have active appenders on them.
|
|
||||||
// Abort at the first one we encounter.
|
|
||||||
if h.ActiveWriters() > 0 || f < 0.5 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
singles = append(singles, h)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
db.headmtx.RUnlock()
|
|
||||||
|
|
||||||
for _, h := range singles {
|
|
||||||
select {
|
select {
|
||||||
case <-db.stopc:
|
case <-db.stopc:
|
||||||
return changes, nil
|
return changes, nil
|
||||||
|
@ -551,30 +585,30 @@ func (db *DB) Close() error {
|
||||||
|
|
||||||
// DisableCompactions disables compactions.
|
// DisableCompactions disables compactions.
|
||||||
func (db *DB) DisableCompactions() {
|
func (db *DB) DisableCompactions() {
|
||||||
if db.compacting {
|
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
db.compacting = false
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
|
db.compactionsEnabled = false
|
||||||
db.logger.Log("msg", "compactions disabled")
|
db.logger.Log("msg", "compactions disabled")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// EnableCompactions enables compactions.
|
// EnableCompactions enables compactions.
|
||||||
func (db *DB) EnableCompactions() {
|
func (db *DB) EnableCompactions() {
|
||||||
if !db.compacting {
|
db.cmtx.Lock()
|
||||||
db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
db.compacting = true
|
|
||||||
|
db.compactionsEnabled = true
|
||||||
db.logger.Log("msg", "compactions enabled")
|
db.logger.Log("msg", "compactions enabled")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot writes the current data to the directory.
|
// Snapshot writes the current data to the directory.
|
||||||
func (db *DB) Snapshot(dir string) error {
|
func (db *DB) Snapshot(dir string) error {
|
||||||
db.mtx.Lock() // To block any appenders.
|
|
||||||
defer db.mtx.Unlock()
|
|
||||||
|
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
|
db.mtx.Lock() // To block any appenders.
|
||||||
|
defer db.mtx.Unlock()
|
||||||
|
|
||||||
blocks := db.blocks[:]
|
blocks := db.blocks[:]
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
db.logger.Log("msg", "snapshotting block", "block", b)
|
db.logger.Log("msg", "snapshotting block", "block", b)
|
||||||
|
@ -667,7 +701,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var hb headBlock
|
var hb headBlock
|
||||||
for _, h := range a.db.appendable() {
|
for _, h := range a.db.appendableHeads() {
|
||||||
m := h.Meta()
|
m := h.Meta()
|
||||||
|
|
||||||
if intervalContains(m.MinTime, m.MaxTime-1, t) {
|
if intervalContains(m.MinTime, m.MaxTime-1, t) {
|
||||||
|
@ -699,20 +733,20 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
||||||
// it is within or after the currently appendable window.
|
// it is within or after the currently appendable window.
|
||||||
func (db *DB) ensureHead(t int64) error {
|
func (db *DB) ensureHead(t int64) error {
|
||||||
var (
|
var (
|
||||||
mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0]))
|
||||||
addBuffer = len(db.blocks) == 0
|
addBuffer = len(db.blocks) == 0
|
||||||
last BlockMeta
|
last BlockMeta
|
||||||
)
|
)
|
||||||
|
|
||||||
if !addBuffer {
|
if !addBuffer {
|
||||||
last = db.blocks[len(db.blocks)-1].Meta()
|
last = db.blocks[len(db.blocks)-1].Meta()
|
||||||
addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration)
|
addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0])
|
||||||
}
|
}
|
||||||
// Create another block of buffer in front if the DB is initialized or retrieving
|
// Create another block of buffer in front if the DB is initialized or retrieving
|
||||||
// new data after a long gap.
|
// new data after a long gap.
|
||||||
// This ensures we always have a full block width of append window.
|
// This ensures we always have a full block width of append window.
|
||||||
if addBuffer {
|
if addBuffer {
|
||||||
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// If the previous block reaches into our new window, make it smaller.
|
// If the previous block reaches into our new window, make it smaller.
|
||||||
|
@ -779,6 +813,7 @@ func (a *dbAppender) Rollback() error {
|
||||||
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
db.mtx.Lock()
|
db.mtx.Lock()
|
||||||
defer db.mtx.Unlock()
|
defer db.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -799,18 +834,6 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
|
||||||
func (db *DB) appendable() (r []headBlock) {
|
|
||||||
switch len(db.heads) {
|
|
||||||
case 0:
|
|
||||||
case 1:
|
|
||||||
r = append(r, db.heads[0])
|
|
||||||
default:
|
|
||||||
r = append(r, db.heads[len(db.heads)-2:]...)
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||||
// Checks Overlap: http://stackoverflow.com/questions/3269434/
|
// Checks Overlap: http://stackoverflow.com/questions/3269434/
|
||||||
return amin <= bmax && bmin <= amax
|
return amin <= bmax && bmin <= amax
|
||||||
|
|
87
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
87
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
|
@ -67,6 +67,7 @@ type HeadBlock struct {
|
||||||
// to their chunk descs.
|
// to their chunk descs.
|
||||||
hashes map[uint64][]*memSeries
|
hashes map[uint64][]*memSeries
|
||||||
|
|
||||||
|
symbols map[string]struct{}
|
||||||
values map[string]stringset // label names to possible values
|
values map[string]stringset // label names to possible values
|
||||||
postings *memPostings // postings lists for terms
|
postings *memPostings // postings lists for terms
|
||||||
|
|
||||||
|
@ -117,6 +118,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
||||||
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
||||||
hashes: map[uint64][]*memSeries{},
|
hashes: map[uint64][]*memSeries{},
|
||||||
values: map[string]stringset{},
|
values: map[string]stringset{},
|
||||||
|
symbols: map[string]struct{}{},
|
||||||
postings: &memPostings{m: make(map[term][]uint32)},
|
postings: &memPostings{m: make(map[term][]uint32)},
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
tombstones: newEmptyTombstoneReader(),
|
tombstones: newEmptyTombstoneReader(),
|
||||||
|
@ -332,7 +334,12 @@ func (h *HeadBlock) Snapshot(snapshotDir string) error {
|
||||||
func (h *HeadBlock) Dir() string { return h.dir }
|
func (h *HeadBlock) Dir() string { return h.dir }
|
||||||
|
|
||||||
// Index returns an IndexReader against the block.
|
// Index returns an IndexReader against the block.
|
||||||
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
|
func (h *HeadBlock) Index() IndexReader {
|
||||||
|
h.mtx.RLock()
|
||||||
|
defer h.mtx.RUnlock()
|
||||||
|
|
||||||
|
return &headIndexReader{HeadBlock: h, maxSeries: uint32(len(h.series) - 1)}
|
||||||
|
}
|
||||||
|
|
||||||
// Chunks returns a ChunkReader against the block.
|
// Chunks returns a ChunkReader against the block.
|
||||||
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
||||||
|
@ -340,14 +347,10 @@ func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
||||||
// Querier returns a new Querier against the block for the range [mint, maxt].
|
// Querier returns a new Querier against the block for the range [mint, maxt].
|
||||||
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||||
h.mtx.RLock()
|
h.mtx.RLock()
|
||||||
defer h.mtx.RUnlock()
|
|
||||||
|
|
||||||
if h.closed {
|
if h.closed {
|
||||||
panic(fmt.Sprintf("block %s already closed", h.dir))
|
panic(fmt.Sprintf("block %s already closed", h.dir))
|
||||||
}
|
}
|
||||||
|
h.mtx.RUnlock()
|
||||||
// Reference on the original slice to use for postings mapping.
|
|
||||||
series := h.series[:]
|
|
||||||
|
|
||||||
return &blockQuerier{
|
return &blockQuerier{
|
||||||
mint: mint,
|
mint: mint,
|
||||||
|
@ -355,27 +358,6 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||||
index: h.Index(),
|
index: h.Index(),
|
||||||
chunks: h.Chunks(),
|
chunks: h.Chunks(),
|
||||||
tombstones: h.Tombstones(),
|
tombstones: h.Tombstones(),
|
||||||
|
|
||||||
postingsMapper: func(p Postings) Postings {
|
|
||||||
ep := make([]uint32, 0, 64)
|
|
||||||
|
|
||||||
for p.Next() {
|
|
||||||
// Skip posting entries that include series added after we
|
|
||||||
// instantiated the querier.
|
|
||||||
if int(p.At()) >= len(series) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
ep = append(ep, p.At())
|
|
||||||
}
|
|
||||||
if err := p.Err(); err != nil {
|
|
||||||
return errPostings{err: errors.Wrap(err, "expand postings")}
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(ep, func(i, j int) bool {
|
|
||||||
return labels.Compare(series[ep[i]].lset, series[ep[j]].lset) < 0
|
|
||||||
})
|
|
||||||
return newListPostings(ep)
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -661,6 +643,12 @@ func (c *safeChunk) Iterator() chunks.Iterator {
|
||||||
|
|
||||||
type headIndexReader struct {
|
type headIndexReader struct {
|
||||||
*HeadBlock
|
*HeadBlock
|
||||||
|
// Highest series that existed when the index reader was instantiated.
|
||||||
|
maxSeries uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *headIndexReader) Symbols() (map[string]struct{}, error) {
|
||||||
|
return h.symbols, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValues returns the possible label values
|
// LabelValues returns the possible label values
|
||||||
|
@ -689,33 +677,59 @@ func (h *headIndexReader) Postings(name, value string) (Postings, error) {
|
||||||
return h.postings.get(term{name: name, value: value}), nil
|
return h.postings.get(term{name: name, value: value}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Series returns the series for the given reference.
|
func (h *headIndexReader) SortedPostings(p Postings) Postings {
|
||||||
func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
|
||||||
h.mtx.RLock()
|
h.mtx.RLock()
|
||||||
defer h.mtx.RUnlock()
|
defer h.mtx.RUnlock()
|
||||||
|
|
||||||
if int(ref) >= len(h.series) {
|
ep := make([]uint32, 0, 1024)
|
||||||
return nil, nil, ErrNotFound
|
|
||||||
|
for p.Next() {
|
||||||
|
// Skip posting entries that include series added after we
|
||||||
|
// instantiated the index reader.
|
||||||
|
if p.At() > h.maxSeries {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
ep = append(ep, p.At())
|
||||||
|
}
|
||||||
|
if err := p.Err(); err != nil {
|
||||||
|
return errPostings{err: errors.Wrap(err, "expand postings")}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(ep, func(i, j int) bool {
|
||||||
|
return labels.Compare(h.series[ep[i]].lset, h.series[ep[j]].lset) < 0
|
||||||
|
})
|
||||||
|
return newListPostings(ep)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Series returns the series for the given reference.
|
||||||
|
func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error {
|
||||||
|
h.mtx.RLock()
|
||||||
|
defer h.mtx.RUnlock()
|
||||||
|
|
||||||
|
if ref > h.maxSeries {
|
||||||
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
s := h.series[ref]
|
s := h.series[ref]
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return nil, nil, ErrNotFound
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
metas := make([]*ChunkMeta, 0, len(s.chunks))
|
*lbls = append((*lbls)[:0], s.lset...)
|
||||||
|
|
||||||
s.mtx.RLock()
|
s.mtx.RLock()
|
||||||
defer s.mtx.RUnlock()
|
defer s.mtx.RUnlock()
|
||||||
|
|
||||||
|
*chks = (*chks)[:0]
|
||||||
|
|
||||||
for i, c := range s.chunks {
|
for i, c := range s.chunks {
|
||||||
metas = append(metas, &ChunkMeta{
|
*chks = append(*chks, &ChunkMeta{
|
||||||
MinTime: c.minTime,
|
MinTime: c.minTime,
|
||||||
MaxTime: c.maxTime,
|
MaxTime: c.maxTime,
|
||||||
Ref: (uint64(ref) << 32) | uint64(i),
|
Ref: (uint64(ref) << 32) | uint64(i),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.lset, metas, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
||||||
|
@ -760,6 +774,9 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
|
||||||
valset.set(l.Value)
|
valset.set(l.Value)
|
||||||
|
|
||||||
h.postings.add(s.ref, term{name: l.Name, value: l.Value})
|
h.postings.add(s.ref, term{name: l.Name, value: l.Value})
|
||||||
|
|
||||||
|
h.symbols[l.Name] = struct{}{}
|
||||||
|
h.symbols[l.Value] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
h.postings.add(s.ref, term{})
|
h.postings.add(s.ref, term{})
|
||||||
|
|
253
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
253
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
|
@ -61,7 +61,9 @@ func (s indexWriterSeriesSlice) Less(i, j int) bool {
|
||||||
type indexWriterStage uint8
|
type indexWriterStage uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
idxStagePopulate indexWriterStage = iota
|
idxStageNone indexWriterStage = iota
|
||||||
|
idxStageSymbols
|
||||||
|
idxStageSeries
|
||||||
idxStageLabelIndex
|
idxStageLabelIndex
|
||||||
idxStagePostings
|
idxStagePostings
|
||||||
idxStageDone
|
idxStageDone
|
||||||
|
@ -69,8 +71,12 @@ const (
|
||||||
|
|
||||||
func (s indexWriterStage) String() string {
|
func (s indexWriterStage) String() string {
|
||||||
switch s {
|
switch s {
|
||||||
case idxStagePopulate:
|
case idxStageNone:
|
||||||
return "populate"
|
return "none"
|
||||||
|
case idxStageSymbols:
|
||||||
|
return "symbols"
|
||||||
|
case idxStageSeries:
|
||||||
|
return "series"
|
||||||
case idxStageLabelIndex:
|
case idxStageLabelIndex:
|
||||||
return "label index"
|
return "label index"
|
||||||
case idxStagePostings:
|
case idxStagePostings:
|
||||||
|
@ -82,12 +88,18 @@ func (s indexWriterStage) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IndexWriter serializes the index for a block of series data.
|
// IndexWriter serializes the index for a block of series data.
|
||||||
// The methods must generally be called in the order they are specified in.
|
// The methods must be called in the order they are specified in.
|
||||||
type IndexWriter interface {
|
type IndexWriter interface {
|
||||||
|
// AddSymbols registers all string symbols that are encountered in series
|
||||||
|
// and other indices.
|
||||||
|
AddSymbols(sym map[string]struct{}) error
|
||||||
|
|
||||||
// AddSeries populates the index writer with a series and its offsets
|
// AddSeries populates the index writer with a series and its offsets
|
||||||
// of chunks that the index can reference.
|
// of chunks that the index can reference.
|
||||||
// The reference number is used to resolve a series against the postings
|
// Implementations may require series to be insert in increasing order by
|
||||||
// list iterator. It only has to be available during the write processing.
|
// their labels.
|
||||||
|
// The reference numbers are used to resolve entries in postings lists that
|
||||||
|
// are added later.
|
||||||
AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error
|
AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error
|
||||||
|
|
||||||
// WriteLabelIndex serializes an index from label names to values.
|
// WriteLabelIndex serializes an index from label names to values.
|
||||||
|
@ -118,11 +130,14 @@ type indexWriter struct {
|
||||||
buf2 encbuf
|
buf2 encbuf
|
||||||
uint32s []uint32
|
uint32s []uint32
|
||||||
|
|
||||||
series map[uint32]*indexWriterSeries
|
|
||||||
symbols map[string]uint32 // symbol offsets
|
symbols map[string]uint32 // symbol offsets
|
||||||
|
seriesOffsets map[uint32]uint64 // offsets of series
|
||||||
labelIndexes []hashEntry // label index offsets
|
labelIndexes []hashEntry // label index offsets
|
||||||
postings []hashEntry // postings lists offsets
|
postings []hashEntry // postings lists offsets
|
||||||
|
|
||||||
|
// Hold last series to validate that clients insert new series in order.
|
||||||
|
lastSeries labels.Labels
|
||||||
|
|
||||||
crc32 hash.Hash
|
crc32 hash.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +167,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
||||||
f: f,
|
f: f,
|
||||||
fbuf: bufio.NewWriterSize(f, 1<<22),
|
fbuf: bufio.NewWriterSize(f, 1<<22),
|
||||||
pos: 0,
|
pos: 0,
|
||||||
stage: idxStagePopulate,
|
stage: idxStageNone,
|
||||||
|
|
||||||
// Reusable memory.
|
// Reusable memory.
|
||||||
buf1: encbuf{b: make([]byte, 0, 1<<22)},
|
buf1: encbuf{b: make([]byte, 0, 1<<22)},
|
||||||
|
@ -161,7 +176,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
||||||
|
|
||||||
// Caches.
|
// Caches.
|
||||||
symbols: make(map[string]uint32, 1<<13),
|
symbols: make(map[string]uint32, 1<<13),
|
||||||
series: make(map[uint32]*indexWriterSeries, 1<<16),
|
seriesOffsets: make(map[uint32]uint64, 1<<16),
|
||||||
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
||||||
}
|
}
|
||||||
if err := iw.writeMeta(); err != nil {
|
if err := iw.writeMeta(); err != nil {
|
||||||
|
@ -207,20 +222,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error {
|
||||||
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Complete population stage by writing symbols and series.
|
|
||||||
if w.stage == idxStagePopulate {
|
|
||||||
w.toc.symbols = w.pos
|
|
||||||
if err := w.writeSymbols(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
w.toc.series = w.pos
|
|
||||||
if err := w.writeSeries(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark start of sections in table of contents.
|
// Mark start of sections in table of contents.
|
||||||
switch s {
|
switch s {
|
||||||
|
case idxStageSymbols:
|
||||||
|
w.toc.symbols = w.pos
|
||||||
|
case idxStageSeries:
|
||||||
|
w.toc.series = w.pos
|
||||||
|
|
||||||
case idxStageLabelIndex:
|
case idxStageLabelIndex:
|
||||||
w.toc.labelIndices = w.pos
|
w.toc.labelIndices = w.pos
|
||||||
|
|
||||||
|
@ -254,84 +262,38 @@ func (w *indexWriter) writeMeta() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
|
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
|
||||||
if _, ok := w.series[ref]; ok {
|
if err := w.ensureStage(idxStageSeries); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if labels.Compare(lset, w.lastSeries) <= 0 {
|
||||||
|
return errors.Errorf("out-of-order series added with label set %q", lset)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := w.seriesOffsets[ref]; ok {
|
||||||
return errors.Errorf("series with reference %d already added", ref)
|
return errors.Errorf("series with reference %d already added", ref)
|
||||||
}
|
}
|
||||||
// Populate the symbol table from all label sets we have to reference.
|
w.seriesOffsets[ref] = w.pos
|
||||||
|
|
||||||
|
w.buf2.reset()
|
||||||
|
w.buf2.putUvarint(len(lset))
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
w.symbols[l.Name] = 0
|
offset, ok := w.symbols[l.Name]
|
||||||
w.symbols[l.Value] = 0
|
if !ok {
|
||||||
|
return errors.Errorf("symbol entry for %q does not exist", l.Name)
|
||||||
|
}
|
||||||
|
w.buf2.putUvarint32(offset)
|
||||||
|
|
||||||
|
offset, ok = w.symbols[l.Value]
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("symbol entry for %q does not exist", l.Value)
|
||||||
|
}
|
||||||
|
w.buf2.putUvarint32(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.series[ref] = &indexWriterSeries{
|
w.buf2.putUvarint(len(chunks))
|
||||||
labels: lset,
|
|
||||||
chunks: chunks,
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *indexWriter) writeSymbols() error {
|
for _, c := range chunks {
|
||||||
// Generate sorted list of strings we will store as reference table.
|
|
||||||
symbols := make([]string, 0, len(w.symbols))
|
|
||||||
for s := range w.symbols {
|
|
||||||
symbols = append(symbols, s)
|
|
||||||
}
|
|
||||||
sort.Strings(symbols)
|
|
||||||
|
|
||||||
const headerSize = 4
|
|
||||||
|
|
||||||
w.buf1.reset()
|
|
||||||
w.buf2.reset()
|
|
||||||
|
|
||||||
w.buf2.putBE32int(len(symbols))
|
|
||||||
|
|
||||||
for _, s := range symbols {
|
|
||||||
w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
|
|
||||||
|
|
||||||
// NOTE: len(s) gives the number of runes, not the number of bytes.
|
|
||||||
// Therefore the read-back length for strings with unicode characters will
|
|
||||||
// be off when not using putCstr.
|
|
||||||
w.buf2.putUvarintStr(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
w.buf1.putBE32int(w.buf2.len())
|
|
||||||
w.buf2.putHash(w.crc32)
|
|
||||||
|
|
||||||
err := w.write(w.buf1.get(), w.buf2.get())
|
|
||||||
return errors.Wrap(err, "write symbols")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *indexWriter) writeSeries() error {
|
|
||||||
// Series must be stored sorted along their labels.
|
|
||||||
series := make(indexWriterSeriesSlice, 0, len(w.series))
|
|
||||||
|
|
||||||
for _, s := range w.series {
|
|
||||||
series = append(series, s)
|
|
||||||
}
|
|
||||||
sort.Sort(series)
|
|
||||||
|
|
||||||
// Header holds number of series.
|
|
||||||
w.buf1.reset()
|
|
||||||
w.buf1.putBE32int(len(series))
|
|
||||||
|
|
||||||
if err := w.write(w.buf1.get()); err != nil {
|
|
||||||
return errors.Wrap(err, "write series count")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, s := range series {
|
|
||||||
s.offset = uint32(w.pos)
|
|
||||||
|
|
||||||
w.buf2.reset()
|
|
||||||
w.buf2.putUvarint(len(s.labels))
|
|
||||||
|
|
||||||
for _, l := range s.labels {
|
|
||||||
w.buf2.putUvarint32(w.symbols[l.Name])
|
|
||||||
w.buf2.putUvarint32(w.symbols[l.Value])
|
|
||||||
}
|
|
||||||
|
|
||||||
w.buf2.putUvarint(len(s.chunks))
|
|
||||||
|
|
||||||
for _, c := range s.chunks {
|
|
||||||
w.buf2.putVarint64(c.MinTime)
|
w.buf2.putVarint64(c.MinTime)
|
||||||
w.buf2.putVarint64(c.MaxTime)
|
w.buf2.putVarint64(c.MaxTime)
|
||||||
w.buf2.putUvarint64(c.Ref)
|
w.buf2.putUvarint64(c.Ref)
|
||||||
|
@ -345,11 +307,49 @@ func (w *indexWriter) writeSeries() error {
|
||||||
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
|
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
|
||||||
return errors.Wrap(err, "write series data")
|
return errors.Wrap(err, "write series data")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
w.lastSeries = append(w.lastSeries[:0], lset...)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *indexWriter) AddSymbols(sym map[string]struct{}) error {
|
||||||
|
if err := w.ensureStage(idxStageSymbols); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Generate sorted list of strings we will store as reference table.
|
||||||
|
symbols := make([]string, 0, len(sym))
|
||||||
|
|
||||||
|
for s := range sym {
|
||||||
|
symbols = append(symbols, s)
|
||||||
|
}
|
||||||
|
sort.Strings(symbols)
|
||||||
|
|
||||||
|
const headerSize = 4
|
||||||
|
|
||||||
|
w.buf1.reset()
|
||||||
|
w.buf2.reset()
|
||||||
|
|
||||||
|
w.buf2.putBE32int(len(symbols))
|
||||||
|
|
||||||
|
w.symbols = make(map[string]uint32, len(symbols))
|
||||||
|
|
||||||
|
for _, s := range symbols {
|
||||||
|
w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
|
||||||
|
|
||||||
|
// NOTE: len(s) gives the number of runes, not the number of bytes.
|
||||||
|
// Therefore the read-back length for strings with unicode characters will
|
||||||
|
// be off when not using putUvarintStr.
|
||||||
|
w.buf2.putUvarintStr(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.buf1.putBE32int(w.buf2.len())
|
||||||
|
w.buf2.putHash(w.crc32)
|
||||||
|
|
||||||
|
err := w.write(w.buf1.get(), w.buf2.get())
|
||||||
|
return errors.Wrap(err, "write symbols")
|
||||||
|
}
|
||||||
|
|
||||||
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
if len(values)%len(names) != 0 {
|
if len(values)%len(names) != 0 {
|
||||||
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
|
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
|
||||||
|
@ -379,7 +379,11 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
w.buf2.putBE32int(valt.Len())
|
w.buf2.putBE32int(valt.Len())
|
||||||
|
|
||||||
for _, v := range valt.s {
|
for _, v := range valt.s {
|
||||||
w.buf2.putBE32(w.symbols[v])
|
offset, ok := w.symbols[v]
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("symbol entry for %q does not exist", v)
|
||||||
|
}
|
||||||
|
w.buf2.putBE32(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.buf1.reset()
|
w.buf1.reset()
|
||||||
|
@ -450,11 +454,11 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
||||||
refs := w.uint32s[:0]
|
refs := w.uint32s[:0]
|
||||||
|
|
||||||
for it.Next() {
|
for it.Next() {
|
||||||
s, ok := w.series[it.At()]
|
offset, ok := w.seriesOffsets[it.At()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.Errorf("series for reference %d not found", it.At())
|
return errors.Errorf("%p series for reference %d not found", w, it.At())
|
||||||
}
|
}
|
||||||
refs = append(refs, s.offset)
|
refs = append(refs, uint32(offset)) // XXX(fabxc): get uint64 vs uint32 sorted out.
|
||||||
}
|
}
|
||||||
if err := it.Err(); err != nil {
|
if err := it.Err(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -503,6 +507,10 @@ func (w *indexWriter) Close() error {
|
||||||
|
|
||||||
// IndexReader provides reading access of serialized index data.
|
// IndexReader provides reading access of serialized index data.
|
||||||
type IndexReader interface {
|
type IndexReader interface {
|
||||||
|
// Symbols returns a set of string symbols that may occur in series' labels
|
||||||
|
// and indices.
|
||||||
|
Symbols() (map[string]struct{}, error)
|
||||||
|
|
||||||
// LabelValues returns the possible label values
|
// LabelValues returns the possible label values
|
||||||
LabelValues(names ...string) (StringTuples, error)
|
LabelValues(names ...string) (StringTuples, error)
|
||||||
|
|
||||||
|
@ -510,8 +518,13 @@ type IndexReader interface {
|
||||||
// The Postings here contain the offsets to the series inside the index.
|
// The Postings here contain the offsets to the series inside the index.
|
||||||
Postings(name, value string) (Postings, error)
|
Postings(name, value string) (Postings, error)
|
||||||
|
|
||||||
// Series returns the series for the given reference.
|
// SortedPostings returns a postings list that is reordered to be sorted
|
||||||
Series(ref uint32) (labels.Labels, []*ChunkMeta, error)
|
// by the label set of the underlying series.
|
||||||
|
SortedPostings(Postings) Postings
|
||||||
|
|
||||||
|
// Series populates the given labels and chunk metas for the series identified
|
||||||
|
// by the reference.
|
||||||
|
Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error
|
||||||
|
|
||||||
// LabelIndices returns the label pairs for which indices exist.
|
// LabelIndices returns the label pairs for which indices exist.
|
||||||
LabelIndices() ([][]string, error)
|
LabelIndices() ([][]string, error)
|
||||||
|
@ -664,6 +677,21 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) Symbols() (map[string]struct{}, error) {
|
||||||
|
d1 := r.decbufAt(int(r.toc.symbols))
|
||||||
|
d2 := d1.decbuf(d1.be32int())
|
||||||
|
|
||||||
|
count := d2.be32int()
|
||||||
|
sym := make(map[string]struct{}, count)
|
||||||
|
|
||||||
|
for ; count > 0; count-- {
|
||||||
|
s := d2.uvarintStr()
|
||||||
|
sym[s] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sym, d2.err()
|
||||||
|
}
|
||||||
|
|
||||||
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
const sep = "\xff"
|
const sep = "\xff"
|
||||||
|
|
||||||
|
@ -712,36 +740,37 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error {
|
||||||
d1 := r.decbufAt(int(ref))
|
d1 := r.decbufAt(int(ref))
|
||||||
d2 := d1.decbuf(int(d1.uvarint()))
|
d2 := d1.decbuf(int(d1.uvarint()))
|
||||||
|
|
||||||
|
*lbls = (*lbls)[:0]
|
||||||
|
*chks = (*chks)[:0]
|
||||||
|
|
||||||
k := int(d2.uvarint())
|
k := int(d2.uvarint())
|
||||||
lbls := make(labels.Labels, 0, k)
|
|
||||||
|
|
||||||
for i := 0; i < k; i++ {
|
for i := 0; i < k; i++ {
|
||||||
lno := uint32(d2.uvarint())
|
lno := uint32(d2.uvarint())
|
||||||
lvo := uint32(d2.uvarint())
|
lvo := uint32(d2.uvarint())
|
||||||
|
|
||||||
if d2.err() != nil {
|
if d2.err() != nil {
|
||||||
return nil, nil, errors.Wrap(d2.err(), "read series label offsets")
|
return errors.Wrap(d2.err(), "read series label offsets")
|
||||||
}
|
}
|
||||||
|
|
||||||
ln, err := r.lookupSymbol(lno)
|
ln, err := r.lookupSymbol(lno)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.Wrap(err, "lookup label name")
|
return errors.Wrap(err, "lookup label name")
|
||||||
}
|
}
|
||||||
lv, err := r.lookupSymbol(lvo)
|
lv, err := r.lookupSymbol(lvo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.Wrap(err, "lookup label value")
|
return errors.Wrap(err, "lookup label value")
|
||||||
}
|
}
|
||||||
|
|
||||||
lbls = append(lbls, labels.Label{Name: ln, Value: lv})
|
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the chunks meta data.
|
// Read the chunks meta data.
|
||||||
k = int(d2.uvarint())
|
k = int(d2.uvarint())
|
||||||
chunks := make([]*ChunkMeta, 0, k)
|
|
||||||
|
|
||||||
for i := 0; i < k; i++ {
|
for i := 0; i < k; i++ {
|
||||||
mint := d2.varint64()
|
mint := d2.varint64()
|
||||||
|
@ -749,10 +778,10 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
||||||
off := d2.uvarint64()
|
off := d2.uvarint64()
|
||||||
|
|
||||||
if d2.err() != nil {
|
if d2.err() != nil {
|
||||||
return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i)
|
return errors.Wrapf(d2.err(), "read meta for chunk %d", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
chunks = append(chunks, &ChunkMeta{
|
*chks = append(*chks, &ChunkMeta{
|
||||||
Ref: off,
|
Ref: off,
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
MaxTime: maxt,
|
MaxTime: maxt,
|
||||||
|
@ -761,7 +790,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
||||||
|
|
||||||
// TODO(fabxc): verify CRC32.
|
// TODO(fabxc): verify CRC32.
|
||||||
|
|
||||||
return lbls, chunks, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||||
|
@ -787,6 +816,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||||
return newBigEndianPostings(d2.get()), nil
|
return newBigEndianPostings(d2.get()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) SortedPostings(p Postings) Postings {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
type stringTuples struct {
|
type stringTuples struct {
|
||||||
l int // tuple length
|
l int // tuple length
|
||||||
s []string // flattened tuple entries
|
s []string // flattened tuple entries
|
||||||
|
|
24
vendor/github.com/prometheus/tsdb/labels/selector.go
generated
vendored
24
vendor/github.com/prometheus/tsdb/labels/selector.go
generated
vendored
|
@ -13,7 +13,10 @@
|
||||||
|
|
||||||
package labels
|
package labels
|
||||||
|
|
||||||
import "regexp"
|
import (
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
// Selector holds constraints for matching against a label set.
|
// Selector holds constraints for matching against a label set.
|
||||||
type Selector []Matcher
|
type Selector []Matcher
|
||||||
|
@ -84,3 +87,22 @@ func (m *notMatcher) Matches(v string) bool { return !m.Matcher.Matches(v) }
|
||||||
func Not(m Matcher) Matcher {
|
func Not(m Matcher) Matcher {
|
||||||
return ¬Matcher{m}
|
return ¬Matcher{m}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PrefixMatcher implements Matcher for labels which values matches prefix.
|
||||||
|
type PrefixMatcher struct {
|
||||||
|
name, prefix string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPrefixMatcher returns new Matcher for label name matching prefix.
|
||||||
|
func NewPrefixMatcher(name, prefix string) Matcher {
|
||||||
|
return &PrefixMatcher{name: name, prefix: prefix}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name implements Matcher interface.
|
||||||
|
func (m *PrefixMatcher) Name() string { return m.name }
|
||||||
|
|
||||||
|
// Prefix returns matching prefix.
|
||||||
|
func (m *PrefixMatcher) Prefix() string { return m.prefix }
|
||||||
|
|
||||||
|
// Matches implements Matcher interface.
|
||||||
|
func (m *PrefixMatcher) Matches(v string) bool { return strings.HasPrefix(v, m.prefix) }
|
||||||
|
|
60
vendor/github.com/prometheus/tsdb/querier.go
generated
vendored
60
vendor/github.com/prometheus/tsdb/querier.go
generated
vendored
|
@ -15,6 +15,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
|
@ -53,8 +54,8 @@ type querier struct {
|
||||||
blocks []Querier
|
blocks []Querier
|
||||||
}
|
}
|
||||||
|
|
||||||
// Querier returns a new querier over the data partition for the given
|
// Querier returns a new querier over the data partition for the given time range.
|
||||||
// time range.
|
// A goroutine must not handle more than one open Querier.
|
||||||
func (s *DB) Querier(mint, maxt int64) Querier {
|
func (s *DB) Querier(mint, maxt int64) Querier {
|
||||||
s.mtx.RLock()
|
s.mtx.RLock()
|
||||||
|
|
||||||
|
@ -133,8 +134,6 @@ type blockQuerier struct {
|
||||||
chunks ChunkReader
|
chunks ChunkReader
|
||||||
tombstones TombstoneReader
|
tombstones TombstoneReader
|
||||||
|
|
||||||
postingsMapper func(Postings) Postings
|
|
||||||
|
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,10 +142,6 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
||||||
|
|
||||||
p, absent := pr.Select(ms...)
|
p, absent := pr.Select(ms...)
|
||||||
|
|
||||||
if q.postingsMapper != nil {
|
|
||||||
p = q.postingsMapper(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &blockSeriesSet{
|
return &blockSeriesSet{
|
||||||
set: &populatedChunkSeries{
|
set: &populatedChunkSeries{
|
||||||
set: &baseChunkSeries{
|
set: &baseChunkSeries{
|
||||||
|
@ -217,7 +212,38 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
|
||||||
|
|
||||||
p := Intersect(its...)
|
p := Intersect(its...)
|
||||||
|
|
||||||
return p, absent
|
return r.index.SortedPostings(p), absent
|
||||||
|
}
|
||||||
|
|
||||||
|
// tuplesByPrefix uses binary search to find prefix matches within ts.
|
||||||
|
func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) {
|
||||||
|
var outErr error
|
||||||
|
tslen := ts.Len()
|
||||||
|
i := sort.Search(tslen, func(i int) bool {
|
||||||
|
vs, err := ts.At(i)
|
||||||
|
if err != nil {
|
||||||
|
outErr = fmt.Errorf("Failed to read tuple %d/%d: %v", i, tslen, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
val := vs[0]
|
||||||
|
l := len(m.Prefix())
|
||||||
|
if l > len(vs) {
|
||||||
|
l = len(val)
|
||||||
|
}
|
||||||
|
return val[:l] >= m.Prefix()
|
||||||
|
})
|
||||||
|
if outErr != nil {
|
||||||
|
return nil, outErr
|
||||||
|
}
|
||||||
|
var matches []string
|
||||||
|
for ; i < tslen; i++ {
|
||||||
|
vs, err := ts.At(i)
|
||||||
|
if err != nil || !m.Matches(vs[0]) {
|
||||||
|
return matches, err
|
||||||
|
}
|
||||||
|
matches = append(matches, vs[0])
|
||||||
|
}
|
||||||
|
return matches, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
||||||
|
@ -230,15 +256,19 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
||||||
return it
|
return it
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(fabxc): use interface upgrading to provide fast solution
|
|
||||||
// for prefix matches. Tuples are lexicographically sorted.
|
|
||||||
tpls, err := r.index.LabelValues(m.Name())
|
tpls, err := r.index.LabelValues(m.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errPostings{err: err}
|
return errPostings{err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
var res []string
|
var res []string
|
||||||
|
if pm, ok := m.(*labels.PrefixMatcher); ok {
|
||||||
|
res, err = tuplesByPrefix(pm, tpls)
|
||||||
|
if err != nil {
|
||||||
|
return errPostings{err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
for i := 0; i < tpls.Len(); i++ {
|
for i := 0; i < tpls.Len(); i++ {
|
||||||
vals, err := tpls.At(i)
|
vals, err := tpls.At(i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -248,6 +278,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
||||||
res = append(res, vals[0])
|
res = append(res, vals[0])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if len(res) == 0 {
|
if len(res) == 0 {
|
||||||
return emptyPostings
|
return emptyPostings
|
||||||
|
@ -397,11 +428,14 @@ func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||||
func (s *baseChunkSeries) Err() error { return s.err }
|
func (s *baseChunkSeries) Err() error { return s.err }
|
||||||
|
|
||||||
func (s *baseChunkSeries) Next() bool {
|
func (s *baseChunkSeries) Next() bool {
|
||||||
|
var (
|
||||||
|
lset labels.Labels
|
||||||
|
chunks []*ChunkMeta
|
||||||
|
)
|
||||||
Outer:
|
Outer:
|
||||||
for s.p.Next() {
|
for s.p.Next() {
|
||||||
ref := s.p.At()
|
ref := s.p.At()
|
||||||
lset, chunks, err := s.index.Series(ref)
|
if err := s.index.Series(ref, &lset, &chunks); err != nil {
|
||||||
if err != nil {
|
|
||||||
s.err = err
|
s.err = err
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
18
vendor/vendor.json
vendored
18
vendor/vendor.json
vendored
|
@ -853,22 +853,22 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "GgHaU/6pJjJ7I8aTfaZXnV/OWxA=",
|
"checksumSHA1": "beuXFIZLYTpeK3uRpnWxgm0dPvg=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "969c407335d68cbd8154dcd1bca6259786b27f53",
|
"revision": "912302877bfc98f632e8df61938e4e9600cef945",
|
||||||
"revisionTime": "2017-07-12T11:54:31Z"
|
"revisionTime": "2017-08-10T08:08:49Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
|
"checksumSHA1": "bHZjrxtacFkQRNI8/yj3gOOd9aA=",
|
||||||
"path": "github.com/prometheus/tsdb/chunks",
|
"path": "github.com/prometheus/tsdb/chunks",
|
||||||
"revision": "d492bfd973c24026ab784c1c1821af426bc80e90",
|
"revision": "912302877bfc98f632e8df61938e4e9600cef945",
|
||||||
"revisionTime": "2017-06-30T13:17:34Z"
|
"revisionTime": "2017-08-10T08:08:49Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=",
|
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
||||||
"path": "github.com/prometheus/tsdb/labels",
|
"path": "github.com/prometheus/tsdb/labels",
|
||||||
"revision": "d492bfd973c24026ab784c1c1821af426bc80e90",
|
"revision": "912302877bfc98f632e8df61938e4e9600cef945",
|
||||||
"revisionTime": "2017-06-30T13:17:34Z"
|
"revisionTime": "2017-08-10T08:08:49Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
||||||
|
|
Loading…
Reference in a new issue