Merge pull request #5793 from codesome/tsdb-vendor

Vendor tsdb v0.10.0
This commit is contained in:
Ganesh Vernekar 2019-07-24 16:42:39 +05:30 committed by GitHub
commit 87a0fe0c75
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 441 additions and 131 deletions

2
go.mod
View file

@ -38,7 +38,7 @@ require (
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/common v0.4.1
github.com/prometheus/tsdb v0.9.1
github.com/prometheus/tsdb v0.10.0
github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371
github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b

4
go.sum
View file

@ -286,8 +286,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/prometheus/tsdb v0.9.1 h1:IWaAmWkYlgG7/S4iw4IpAQt5Y35QaZM6/GsZ7GsjAuk=
github.com/prometheus/tsdb v0.9.1/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4=
github.com/prometheus/tsdb v0.10.0 h1:If5rVCMTp6W2SiRAQFlbpJNgVlgMEd+U2GZckwK38ic=
github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=

View file

@ -1,4 +1,11 @@
## Master / unreleased
## master / unreleased
## 0.10.0
- [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode.
- `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s.
- `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`.
- [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse.
## 0.9.1
@ -19,6 +26,7 @@
- [ENHANCEMENT] Reduced disk usage for WAL for small setups.
- [ENHANCEMENT] Optimize queries using regexp for set lookups.
## 0.8.0
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.

View file

@ -74,7 +74,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_
GOLANGCI_LINT :=
GOLANGCI_LINT_OPTS ?=
GOLANGCI_LINT_VERSION ?= v1.16.0
GOLANGCI_LINT_VERSION ?= v1.17.1
# golangci-lint only supports linux, darwin and windows platforms on i386/amd64.
# windows isn't included here because of the path separator being different.
ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin))

View file

@ -138,11 +138,8 @@ type BlockReader interface {
// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (TombstoneReader, error)
// MinTime returns the min time of the block.
MinTime() int64
// MaxTime returns the max time of the block.
MaxTime() int64
// Meta provides meta information about the block reader.
Meta() BlockMeta
}
// Appendable defines an entity to which data can be appended.

View file

@ -44,7 +44,10 @@ type Chunk interface {
Bytes() []byte
Encoding() Encoding
Appender() (Appender, error)
Iterator() Iterator
// The iterator passed as argument is for re-use.
// Depending on implementation, the iterator can
// be re-used or a new iterator can be allocated.
Iterator(Iterator) Iterator
NumSamples() int
}

View file

@ -77,7 +77,7 @@ func (c *XORChunk) NumSamples() int {
// Appender implements the Chunk interface.
func (c *XORChunk) Appender() (Appender, error) {
it := c.iterator()
it := c.iterator(nil)
// To get an appender we must know the state it would have if we had
// appended all existing data from scratch.
@ -102,19 +102,25 @@ func (c *XORChunk) Appender() (Appender, error) {
return a, nil
}
func (c *XORChunk) iterator() *xorIterator {
func (c *XORChunk) iterator(it Iterator) *xorIterator {
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
// When using striped locks to guard access to chunks, probably yes.
// Could only copy data if the chunk is not completed yet.
if xorIter, ok := it.(*xorIterator); ok {
xorIter.Reset(c.b.bytes())
return xorIter
}
return &xorIterator{
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
br: newBReader(c.b.bytes()[2:]),
numTotal: binary.BigEndian.Uint16(c.b.bytes()),
}
}
// Iterator implements the Chunk interface.
func (c *XORChunk) Iterator() Iterator {
return c.iterator()
func (c *XORChunk) Iterator(it Iterator) Iterator {
return c.iterator(it)
}
type xorAppender struct {
@ -243,6 +249,21 @@ func (it *xorIterator) Err() error {
return it.err
}
func (it *xorIterator) Reset(b []byte) {
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
it.br = newBReader(b[2:])
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0
it.t = 0
it.val = 0
it.leading = 0
it.trailing = 0
it.tDelta = 0
it.err = nil
}
func (it *xorIterator) Next() bool {
if it.err != nil || it.numRead == it.numTotal {
return false

View file

@ -57,8 +57,9 @@ type Meta struct {
}
// writeHash writes the chunk encoding and raw data into the provided hash.
func (cm *Meta) writeHash(h hash.Hash) error {
if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
func (cm *Meta) writeHash(h hash.Hash, buf []byte) error {
buf = append(buf[:0], byte(cm.Chunk.Encoding()))
if _, err := h.Write(buf[:1]); err != nil {
return err
}
if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
@ -97,6 +98,7 @@ type Writer struct {
wbuf *bufio.Writer
n int64
crc32 hash.Hash
buf [binary.MaxVarintLen32]byte
segmentSize int64
}
@ -245,8 +247,8 @@ func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
if err != nil {
return nil, err
}
ait := a.Iterator()
bit := b.Iterator()
ait := a.Iterator(nil)
bit := b.Iterator(nil)
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
@ -299,22 +301,19 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
}
}
var (
b = [binary.MaxVarintLen32]byte{}
seq = uint64(w.seq()) << 32
)
var seq = uint64(w.seq()) << 32
for i := range chks {
chk := &chks[i]
chk.Ref = seq | uint64(w.n)
n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
n := binary.PutUvarint(w.buf[:], uint64(len(chk.Chunk.Bytes())))
if err := w.write(b[:n]); err != nil {
if err := w.write(w.buf[:n]); err != nil {
return err
}
b[0] = byte(chk.Chunk.Encoding())
if err := w.write(b[:1]); err != nil {
w.buf[0] = byte(chk.Chunk.Encoding())
if err := w.write(w.buf[:1]); err != nil {
return err
}
if err := w.write(chk.Chunk.Bytes()); err != nil {
@ -322,10 +321,10 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
}
w.crc32.Reset()
if err := chk.writeHash(w.crc32); err != nil {
if err := chk.writeHash(w.crc32, w.buf[:]); err != nil {
return err
}
if err := w.write(w.crc32.Sum(b[:0])); err != nil {
if err := w.write(w.crc32.Sum(w.buf[:0])); err != nil {
return err
}
}
@ -366,7 +365,7 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
// Reader implements a SeriesReader for a serialized byte stream
// Reader implements a ChunkReader for a serialized byte stream
// of series data.
type Reader struct {
bs []ByteSlice // The underlying bytes holding the encoded series data.
@ -503,11 +502,11 @@ func sequenceFiles(dir string) ([]string, error) {
return res, nil
}
func closeAll(cs []io.Closer) (err error) {
func closeAll(cs []io.Closer) error {
var merr tsdb_errors.MultiError
for _, c := range cs {
if e := c.Close(); e != nil {
err = e
}
merr.Add(c.Close())
}
return err
return merr.Err()
}

View file

@ -662,7 +662,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}()
c.metrics.populatingBlocks.Set(1)
globalMaxt := blocks[0].MaxTime()
globalMaxt := blocks[0].Meta().MaxTime
for i, b := range blocks {
select {
case <-c.ctx.Done():
@ -671,13 +671,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
if !overlapping {
if i > 0 && b.MinTime() < globalMaxt {
if i > 0 && b.Meta().MinTime < globalMaxt {
c.metrics.overlappingBlocks.Inc()
overlapping = true
level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID)
}
if b.MaxTime() > globalMaxt {
globalMaxt = b.MaxTime()
if b.Meta().MaxTime > globalMaxt {
globalMaxt = b.Meta().MaxTime
}
}
@ -736,6 +736,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(err, "add symbols")
}
delIter := &deletedIterator{}
for set.Next() {
select {
case <-c.ctx.Done():
@ -788,17 +789,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return err
}
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
delIter.it = chk.Chunk.Iterator(delIter.it)
delIter.intervals = dranges
var (
t int64
v float64
)
for it.Next() {
t, v = it.At()
for delIter.Next() {
t, v = delIter.At()
app.Append(t, v)
}
if err := it.Err(); err != nil {
if err := delIter.Err(); err != nil {
return errors.Wrap(err, "iterate chunk while re-encoding")
}

View file

@ -250,6 +250,178 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
return m
}
// ErrClosed is returned when the db is closed.
var ErrClosed = errors.New("db already closed")
// DBReadOnly provides APIs for read only operations on a database.
// Current implementation doesn't support concurency so
// all API calls should happen in the same go routine.
type DBReadOnly struct {
logger log.Logger
dir string
closers []io.Closer
closed chan struct{}
}
// OpenDBReadOnly opens DB in the given directory for read only operations.
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil {
return nil, errors.Wrap(err, "openning the db dir")
}
if l == nil {
l = log.NewNopLogger()
}
return &DBReadOnly{
logger: l,
dir: dir,
closed: make(chan struct{}),
}, nil
}
// Querier loads the wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers.
func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) {
select {
case <-db.closed:
return nil, ErrClosed
default:
}
blocksReaders, err := db.Blocks()
if err != nil {
return nil, err
}
blocks := make([]*Block, len(blocksReaders))
for i, b := range blocksReaders {
b, ok := b.(*Block)
if !ok {
return nil, errors.New("unable to convert a read only block to a normal block")
}
blocks[i] = b
}
head, err := NewHead(nil, db.logger, nil, 1)
if err != nil {
return nil, err
}
maxBlockTime := int64(math.MinInt64)
if len(blocks) > 0 {
maxBlockTime = blocks[len(blocks)-1].Meta().MaxTime
}
// Also add the WAL if the current blocks don't cover the requestes time range.
if maxBlockTime <= maxt {
w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal"))
if err != nil {
return nil, err
}
head, err = NewHead(nil, db.logger, w, 1)
if err != nil {
return nil, err
}
// Set the min valid time for the ingested wal samples
// to be no lower than the maxt of the last block.
if err := head.Init(maxBlockTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
}
// Set the wal to nil to disable all wal operations.
// This is mainly to avoid blocking when closing the head.
head.wal = nil
db.closers = append(db.closers, head)
}
// TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance.
// Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite.
// Option 2: refactor Querier to use another independent func which
// can than be used by a read only and writable db instances without any code duplication.
dbWritable := &DB{
dir: db.dir,
logger: db.logger,
blocks: blocks,
head: head,
}
return dbWritable.Querier(mint, maxt)
}
// Blocks returns a slice of block readers for persisted blocks.
func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
select {
case <-db.closed:
return nil, ErrClosed
default:
}
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil)
if err != nil {
return nil, err
}
// Corrupted blocks that have been superseded by a loadable block can be safely ignored.
for _, block := range loadable {
for _, b := range block.Meta().Compaction.Parents {
delete(corrupted, b.ULID)
}
}
if len(corrupted) > 0 {
for _, b := range loadable {
if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing a block", err)
}
}
return nil, errors.Errorf("unexpected corrupted block:%v", corrupted)
}
if len(loadable) == 0 {
return nil, errors.New("no blocks found")
}
sort.Slice(loadable, func(i, j int) bool {
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime
})
blockMetas := make([]BlockMeta, 0, len(loadable))
for _, b := range loadable {
blockMetas = append(blockMetas, b.Meta())
}
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String())
}
// Close all previously open readers and add the new ones to the cache.
for _, closer := range db.closers {
closer.Close()
}
blockClosers := make([]io.Closer, len(loadable))
blockReaders := make([]BlockReader, len(loadable))
for i, b := range loadable {
blockClosers[i] = b
blockReaders[i] = b
}
db.closers = blockClosers
return blockReaders, nil
}
// Close all block readers.
func (db *DBReadOnly) Close() error {
select {
case <-db.closed:
return ErrClosed
default:
}
close(db.closed)
var merr tsdb_errors.MultiError
for _, b := range db.closers {
merr.Add(b.Close())
}
return merr.Err()
}
// Open returns a new DB in the given directory.
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
if err := os.MkdirAll(dir, 0777); err != nil {
@ -514,8 +686,10 @@ func (db *DB) compact() (err error) {
return nil
}
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
for _, b := range db.blocks {
// getBlock iterates a given block range to find a block by a given id.
// If found it returns the block itself and a boolean to indicate that it was found.
func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) {
for _, b := range allBlocks {
if b.Meta().ULID == id {
return b, true
}
@ -533,14 +707,14 @@ func (db *DB) reload() (err error) {
db.metrics.reloads.Inc()
}()
loadable, corrupted, err := db.openBlocks()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
if err != nil {
return err
}
deletable := db.deletableBlocks(loadable)
// Corrupted blocks that have been replaced by parents can be safely ignored and deleted.
// Corrupted blocks that have been superseded by a loadable block can be safely ignored.
// This makes it resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically.
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
@ -553,7 +727,7 @@ func (db *DB) reload() (err error) {
if len(corrupted) > 0 {
// Close all new blocks to release the lock for windows.
for _, block := range loadable {
if _, loaded := db.getBlock(block.Meta().ULID); !loaded {
if _, open := getBlock(db.blocks, block.Meta().ULID); !open {
block.Close()
}
}
@ -621,24 +795,24 @@ func (db *DB) reload() (err error) {
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
dirs, err := blockDirs(db.dir)
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, errors.Wrap(err, "find blocks")
}
corrupted = make(map[ulid.ULID]error)
for _, dir := range dirs {
meta, _, err := readMetaFile(dir)
for _, bDir := range bDirs {
meta, _, err := readMetaFile(bDir)
if err != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
level.Error(l).Log("msg", "not a block dir", "dir", bDir)
continue
}
// See if we already have the block in memory or open it otherwise.
block, ok := db.getBlock(meta.ULID)
if !ok {
block, err = OpenBlock(db.logger, dir, db.chunkPool)
block, open := getBlock(loaded, meta.ULID)
if !open {
block, err = OpenBlock(l, bDir, chunkPool)
if err != nil {
corrupted[meta.ULID] = err
continue

View file

@ -25,6 +25,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunkenc"
@ -64,6 +65,7 @@ type Head struct {
logger log.Logger
appendPool sync.Pool
bytesPool sync.Pool
numSeries uint64
minTime, maxTime int64 // Current min and max of the samples included in the head.
minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
@ -84,7 +86,7 @@ type Head struct {
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.Gauge
series prometheus.GaugeFunc
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
@ -112,9 +114,11 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_active_appenders",
Help: "Number of currently active appender transactions",
})
m.series = prometheus.NewGauge(prometheus.GaugeOpts{
m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.",
}, func() float64 {
return float64(h.NumSeries())
})
m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
@ -502,6 +506,7 @@ func (h *Head) Init(minValidTime int64) error {
return nil
}
level.Info(h.logger).Log("msg", "replaying WAL, this may take awhile")
// Backfill the checkpoint first if it exists.
dir, startFrom, err := LastCheckpoint(h.wal.Dir())
if err != nil && err != ErrNotFound {
@ -525,6 +530,7 @@ func (h *Head) Init(minValidTime int64) error {
return errors.Wrap(err, "backfill checkpoint")
}
startFrom++
level.Info(h.logger).Log("msg", "WAL checkpoint loaded")
}
// Find the last segment.
@ -548,6 +554,7 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil {
return err
}
level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
}
return nil
@ -698,6 +705,21 @@ func (h *rangeHead) MaxTime() int64 {
return h.maxt
}
func (h *rangeHead) NumSeries() uint64 {
return h.head.NumSeries()
}
func (h *rangeHead) Meta() BlockMeta {
return BlockMeta{
MinTime: h.MinTime(),
MaxTime: h.MaxTime(),
ULID: h.head.Meta().ULID,
Stats: BlockStats{
NumSeries: h.NumSeries(),
},
}
}
// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {
@ -1022,9 +1044,11 @@ func (h *Head) gc() {
seriesRemoved := len(deleted)
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
h.metrics.series.Sub(float64(seriesRemoved))
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved))
// Using AddUint64 to substract series removed.
// See: https://golang.org/pkg/sync/atomic/#AddUint64.
atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1))
// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)
@ -1101,6 +1125,26 @@ func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
return &headChunkReader{head: h, mint: mint, maxt: maxt}
}
// NumSeries returns the number of active series in the head.
func (h *Head) NumSeries() uint64 {
return atomic.LoadUint64(&h.numSeries)
}
// Meta returns meta information about the head.
// The head is dynamic so will return dynamic results.
func (h *Head) Meta() BlockMeta {
var id [16]byte
copy(id[:], "______head______")
return BlockMeta{
MinTime: h.MinTime(),
MaxTime: h.MaxTime(),
ULID: ulid.ULID(id),
Stats: BlockStats{
NumSeries: h.NumSeries(),
},
}
}
// MinTime returns the lowest time bound on visible data in the head.
func (h *Head) MinTime() int64 {
return atomic.LoadInt64(&h.minTime)
@ -1185,9 +1229,9 @@ type safeChunk struct {
cid int
}
func (c *safeChunk) Iterator() chunkenc.Iterator {
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid)
it := c.s.iterator(c.cid, reuseIter)
c.s.Unlock()
return it
}
@ -1347,8 +1391,8 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
return s, false
}
h.metrics.series.Inc()
h.metrics.seriesCreated.Inc()
atomic.AddUint64(&h.numSeries, 1)
h.postings.Add(id, lset)
@ -1739,7 +1783,7 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/a
}
func (s *memSeries) iterator(id int) chunkenc.Iterator {
func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator {
c := s.chunk(id)
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
// which got then garbage collected before it got accessed.
@ -1749,17 +1793,23 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator {
}
if id-s.firstChunkID < len(s.chunks)-1 {
return c.chunk.Iterator()
return c.chunk.Iterator(it)
}
// Serve the last 4 samples for the last chunk from the sample buffer
// as their compressed bytes may be mutated by added samples.
it := &memSafeIterator{
Iterator: c.chunk.Iterator(),
if msIter, ok := it.(*memSafeIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.total = c.chunk.NumSamples()
msIter.buf = s.sampleBuf
return msIter
}
return &memSafeIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
total: c.chunk.NumSamples(),
buf: s.sampleBuf,
}
return it
}
func (s *memSeries) head() *memChunk {

View file

@ -123,10 +123,10 @@ type Writer struct {
buf2 encoding.Encbuf
uint32s []uint32
symbols map[string]uint32 // symbol offsets
seriesOffsets map[uint64]uint64 // offsets of series
labelIndexes []hashEntry // label index offsets
postings []hashEntry // postings lists offsets
symbols map[string]uint32 // symbol offsets
seriesOffsets map[uint64]uint64 // offsets of series
labelIndexes []labelIndexHashEntry // label index offsets
postings []postingsHashEntry // postings lists offsets
// Hold last series to validate that clients insert new series in order.
lastSeries labels.Labels
@ -271,11 +271,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
case idxStageDone:
w.toc.LabelIndicesTable = w.pos
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
if err := w.writeLabelIndexesOffsetTable(); err != nil {
return err
}
w.toc.PostingsTable = w.pos
if err := w.writeOffsetTable(w.postings); err != nil {
if err := w.writePostingsOffsetTable(); err != nil {
return err
}
if err := w.writeTOC(); err != nil {
@ -420,7 +420,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
return err
}
w.labelIndexes = append(w.labelIndexes, hashEntry{
w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
keys: names,
offset: w.pos,
})
@ -447,12 +447,12 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
return errors.Wrap(err, "write label index")
}
// writeOffsetTable writes a sequence of readable hash entries.
func (w *Writer) writeOffsetTable(entries []hashEntry) error {
// writeLabelIndexesOffsetTable writes the label indices offset table.
func (w *Writer) writeLabelIndexesOffsetTable() error {
w.buf2.Reset()
w.buf2.PutBE32int(len(entries))
w.buf2.PutBE32int(len(w.labelIndexes))
for _, e := range entries {
for _, e := range w.labelIndexes {
w.buf2.PutUvarint(len(e.keys))
for _, k := range e.keys {
w.buf2.PutUvarintStr(k)
@ -467,6 +467,25 @@ func (w *Writer) writeOffsetTable(entries []hashEntry) error {
return w.write(w.buf1.Get(), w.buf2.Get())
}
// writePostingsOffsetTable writes the postings offset table.
func (w *Writer) writePostingsOffsetTable() error {
w.buf2.Reset()
w.buf2.PutBE32int(len(w.postings))
for _, e := range w.postings {
w.buf2.PutUvarint(2)
w.buf2.PutUvarintStr(e.name)
w.buf2.PutUvarintStr(e.value)
w.buf2.PutUvarint64(e.offset)
}
w.buf1.Reset()
w.buf1.PutBE32int(w.buf2.Len())
w.buf2.PutHash(w.crc32)
return w.write(w.buf1.Get(), w.buf2.Get())
}
const indexTOCLen = 6*8 + 4
func (w *Writer) writeTOC() error {
@ -494,8 +513,9 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
return err
}
w.postings = append(w.postings, hashEntry{
keys: []string{name, value},
w.postings = append(w.postings, postingsHashEntry{
name: name,
value: value,
offset: w.pos,
})
@ -542,11 +562,16 @@ func (s uint32slice) Len() int { return len(s) }
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
type hashEntry struct {
type labelIndexHashEntry struct {
keys []string
offset uint64
}
type postingsHashEntry struct {
name, value string
offset uint64
}
func (w *Writer) Close() error {
if err := w.ensureStage(idxStageDone); err != nil {
return err
@ -781,9 +806,13 @@ func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) e
d := encoding.NewDecbufAt(bs, int(off), castagnoliTable)
cnt := d.Be32()
// The Postings offset table takes only 2 keys per entry (name and value of label),
// and the LabelIndices offset table takes only 1 key per entry (a label name).
// Hence setting the size to max of both, i.e. 2.
keys := make([]string, 0, 2)
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
keyCount := d.Uvarint()
keys := make([]string, 0, keyCount)
keys = keys[:0]
for i := 0; i < keyCount; i++ {
keys = append(keys, d.UvarintStr())
@ -951,25 +980,30 @@ func (r *Reader) LabelNames() ([]string, error) {
type stringTuples struct {
length int // tuple length
entries []string // flattened tuple entries
swapBuf []string
}
func NewStringTuples(entries []string, length int) (*stringTuples, error) {
if len(entries)%length != 0 {
return nil, errors.Wrap(encoding.ErrInvalidSize, "string tuple list")
}
return &stringTuples{entries: entries, length: length}, nil
return &stringTuples{
entries: entries,
length: length,
}, nil
}
func (t *stringTuples) Len() int { return len(t.entries) / t.length }
func (t *stringTuples) At(i int) ([]string, error) { return t.entries[i : i+t.length], nil }
func (t *stringTuples) Swap(i, j int) {
c := make([]string, t.length)
copy(c, t.entries[i:i+t.length])
if t.swapBuf == nil {
t.swapBuf = make([]string, t.length)
}
copy(t.swapBuf, t.entries[i:i+t.length])
for k := 0; k < t.length; k++ {
t.entries[i+k] = t.entries[j+k]
t.entries[j+k] = c[k]
t.entries[j+k] = t.swapBuf[k]
}
}

View file

@ -1060,8 +1060,9 @@ func (it *verticalMergeSeriesIterator) Err() error {
type chunkSeriesIterator struct {
chunks []chunks.Meta
i int
cur chunkenc.Iterator
i int
cur chunkenc.Iterator
bufDelIter *deletedIterator
maxt, mint int64
@ -1069,21 +1070,32 @@ type chunkSeriesIterator struct {
}
func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator {
it := cs[0].Chunk.Iterator()
if len(dranges) > 0 {
it = &deletedIterator{it: it, intervals: dranges}
}
return &chunkSeriesIterator{
csi := &chunkSeriesIterator{
chunks: cs,
i: 0,
cur: it,
mint: mint,
maxt: maxt,
intervals: dranges,
}
csi.resetCurIterator()
return csi
}
func (it *chunkSeriesIterator) resetCurIterator() {
if len(it.intervals) == 0 {
it.cur = it.chunks[it.i].Chunk.Iterator(it.cur)
return
}
if it.bufDelIter == nil {
it.bufDelIter = &deletedIterator{
intervals: it.intervals,
}
}
it.bufDelIter.it = it.chunks[it.i].Chunk.Iterator(it.bufDelIter.it)
it.cur = it.bufDelIter
}
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
@ -1102,10 +1114,7 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
}
}
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
}
it.resetCurIterator()
for it.cur.Next() {
t0, _ := it.cur.At()
@ -1145,10 +1154,7 @@ func (it *chunkSeriesIterator) Next() bool {
}
it.i++
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
}
it.resetCurIterator()
return it.Next()
}

View file

@ -203,6 +203,48 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
stopc: make(chan chan struct{}),
compress: compress,
}
registerMetrics(reg, w)
_, j, err := w.Segments()
// Index of the Segment we want to open and write to.
writeSegmentIndex := 0
if err != nil {
return nil, errors.Wrap(err, "get segment range")
}
// If some segments already exist create one with a higher index than the last segment.
if j != -1 {
writeSegmentIndex = j + 1
}
segment, err := CreateSegment(w.dir, writeSegmentIndex)
if err != nil {
return nil, err
}
if err := w.setSegment(segment); err != nil {
return nil, err
}
go w.run()
return w, nil
}
// Open an existing WAL.
func Open(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) {
if logger == nil {
logger = log.NewNopLogger()
}
w := &WAL{
dir: dir,
logger: logger,
}
registerMetrics(reg, w)
return w, nil
}
func registerMetrics(reg prometheus.Registerer, w *WAL) {
w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
@ -231,30 +273,6 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment)
}
_, j, err := w.Segments()
// Index of the Segment we want to open and write to.
writeSegmentIndex := 0
if err != nil {
return nil, errors.Wrap(err, "get segment range")
}
// If some segments already exist create one with a higher index than the last segment.
if j != -1 {
writeSegmentIndex = j + 1
}
segment, err := CreateSegment(w.dir, writeSegmentIndex)
if err != nil {
return nil, err
}
if err := w.setSegment(segment); err != nil {
return nil, err
}
go w.run()
return w, nil
}
// CompressionEnabled returns if compression is enabled on this WAL.
@ -302,7 +320,6 @@ func (w *WAL) Repair(origErr error) error {
if cerr.Segment < 0 {
return errors.New("corruption error does not specify position")
}
level.Warn(w.logger).Log("msg", "starting corruption repair",
"segment", cerr.Segment, "offset", cerr.Offset)
@ -487,7 +504,6 @@ func (w *WAL) flushPage(clear bool) error {
// First Byte of header format:
// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ]
const (
snappyMask = 1 << 3
recTypeMask = snappyMask - 1

2
vendor/modules.txt vendored
View file

@ -277,7 +277,7 @@ github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg
# github.com/prometheus/procfs v0.0.2
github.com/prometheus/procfs
github.com/prometheus/procfs/internal/fs
# github.com/prometheus/tsdb v0.9.1
# github.com/prometheus/tsdb v0.10.0
github.com/prometheus/tsdb
github.com/prometheus/tsdb/fileutil
github.com/prometheus/tsdb/labels