From 475ca2ecd076c139f90fea3c60e7b4ffe58aa8f6 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Wed, 3 Jul 2019 07:19:47 -0600 Subject: [PATCH] Update to tsdb 0.9.1 Signed-off-by: Chris Marchbanks --- go.mod | 2 +- go.sum | 5 +- storage/remote/wal_watcher.go | 5 +- storage/remote/wal_watcher_test.go | 14 +- .../github.com/prometheus/tsdb/CHANGELOG.md | 29 +++- .../prometheus/tsdb/Makefile.common | 11 +- vendor/github.com/prometheus/tsdb/block.go | 125 +++++++++--------- .../github.com/prometheus/tsdb/checkpoint.go | 2 +- .../prometheus/tsdb/chunks/chunks.go | 6 +- vendor/github.com/prometheus/tsdb/compact.go | 49 ++++--- vendor/github.com/prometheus/tsdb/db.go | 43 +++++- .../prometheus/tsdb/fileutil/fileutil.go | 14 +- vendor/github.com/prometheus/tsdb/go.mod | 22 +-- vendor/github.com/prometheus/tsdb/go.sum | 39 +++++- vendor/github.com/prometheus/tsdb/head.go | 114 +++++++++++----- .../prometheus/tsdb/index/postings.go | 64 ++++----- .../prometheus/tsdb/labels/selector.go | 13 +- vendor/github.com/prometheus/tsdb/querier.go | 77 +++++++++++ vendor/github.com/prometheus/tsdb/repair.go | 2 +- .../github.com/prometheus/tsdb/tombstones.go | 62 ++++----- vendor/github.com/prometheus/tsdb/wal.go | 7 +- .../prometheus/tsdb/wal/live_reader.go | 66 +++++++-- .../github.com/prometheus/tsdb/wal/reader.go | 23 +++- vendor/github.com/prometheus/tsdb/wal/wal.go | 110 +++++++++++---- vendor/modules.txt | 2 +- 25 files changed, 628 insertions(+), 278 deletions(-) diff --git a/go.mod b/go.mod index 15f409ed5..1ba6e76ed 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/prometheus/common v0.4.1 - github.com/prometheus/tsdb v0.8.0 + github.com/prometheus/tsdb v0.9.1 github.com/rlmcpherson/s3gof3r v0.5.0 // indirect github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13 diff --git a/go.sum b/go.sum index fa239d531..9bc286f11 100644 --- a/go.sum +++ b/go.sum @@ -333,7 +333,6 @@ github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f h1:BVwpUVJ github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw= @@ -345,8 +344,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= -github.com/prometheus/tsdb v0.8.0 h1:w1tAGxsBMLkuGrFMhqgcCeBkM5d1YI24udArs+aASuQ= -github.com/prometheus/tsdb v0.8.0/go.mod h1:fSI0j+IUQrDd7+ZtR9WKIGtoYAYAJUKcKhYLG25tN4g= +github.com/prometheus/tsdb v0.9.1 h1:IWaAmWkYlgG7/S4iw4IpAQt5Y35QaZM6/GsZ7GsjAuk= +github.com/prometheus/tsdb v0.9.1/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k= diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 39f73ba76..48fba02d6 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -78,6 +78,7 @@ var ( }, []string{queue}, ) + liveReaderMetrics = wal.NewLiveReaderMetrics(prometheus.DefaultRegisterer) ) func init() { @@ -293,7 +294,7 @@ func (w *WALWatcher) watch(segmentNum int, tail bool) error { } defer segment.Close() - reader := wal.NewLiveReader(w.logger, segment) + reader := wal.NewLiveReader(w.logger, liveReaderMetrics, segment) readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() @@ -509,7 +510,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { } defer sr.Close() - r := wal.NewLiveReader(w.logger, sr) + r := wal.NewLiveReader(w.logger, liveReaderMetrics, sr) if err := w.readSegment(r, index, false); err != io.EOF && err != nil { return errors.Wrap(err, "readSegment") } diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index 2c584f6d7..34c0b9f36 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -103,7 +103,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) testutil.Ok(t, err) // Write to the initial segment then checkpoint. @@ -145,7 +145,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) defer segment.Close() - reader := wal.NewLiveReader(nil, segment) + reader := wal.NewLiveReader(nil, liveReaderMetrics, segment) // Use tail true so we can ensure we got the right number of samples. watcher.readSegment(reader, i, true) } @@ -171,7 +171,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) testutil.Ok(t, err) var recs [][]byte @@ -237,7 +237,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { testutil.Ok(t, err) enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, segmentSize) + w, err := wal.NewSize(nil, nil, wdir, segmentSize, false) testutil.Ok(t, err) // Write to the initial segment then checkpoint. @@ -319,7 +319,7 @@ func TestReadCheckpoint(t *testing.T) { os.Create(wal.SegmentName(wdir, 30)) enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize, false) testutil.Ok(t, err) // Write to the initial segment then checkpoint. @@ -381,7 +381,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { testutil.Ok(t, err) enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, pageSize) + w, err := wal.NewSize(nil, nil, wdir, pageSize, false) testutil.Ok(t, err) // Write a bunch of data. @@ -449,7 +449,7 @@ func TestCheckpointSeriesReset(t *testing.T) { testutil.Ok(t, err) enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, segmentSize) + w, err := wal.NewSize(nil, nil, wdir, segmentSize, false) testutil.Ok(t, err) // Write to the initial segment, then checkpoint later. diff --git a/vendor/github.com/prometheus/tsdb/CHANGELOG.md b/vendor/github.com/prometheus/tsdb/CHANGELOG.md index 610899d72..9d057a3bc 100644 --- a/vendor/github.com/prometheus/tsdb/CHANGELOG.md +++ b/vendor/github.com/prometheus/tsdb/CHANGELOG.md @@ -1,7 +1,26 @@ -## master / unreleased +## Master / unreleased +## 0.9.1 + + - [CHANGE] LiveReader metrics are now injected rather than global. + +## 0.9.0 + + - [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609) + - [BUGFIX] Re-calculate block size when calling `block.Delete`. + - [BUGFIX] Re-encode all head chunks at compaction that are open (being appended to) or outside the Maxt block range. This avoids writing out corrupt data. It happens when snapshotting with the head included. + - [BUGFIX] Improved handling of multiple refs for the same series in WAL reading. + - [BUGFIX] `prometheus_tsdb_compactions_failed_total` is now incremented on any compaction failure. + - [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before. + - [CHANGE] Create new clean segment when starting the WAL. + - [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`. + - [ENHANCEMENT] Improved atomicity of .tmp block replacement during compaction for usual case. + - [ENHANCEMENT] Improved postings intersection matching. + - [ENHANCEMENT] Reduced disk usage for WAL for small setups. + - [ENHANCEMENT] Optimize queries using regexp for set lookups. ## 0.8.0 + - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. - [BUGFIX] Don't panic and recover nicely when running out of disk space. - [BUGFIX] Correctly handle empty labels. @@ -11,9 +30,11 @@ - [FEATURE] Added `currentSegment` metric for the current WAL segment it is being written to. ## 0.7.1 + - [ENHANCEMENT] Reduce memory usage in mergedPostings.Seek ## 0.7.0 + - [CHANGE] tsdb now requires golang 1.12 or higher. - [REMOVED] `chunks.NewReader` is removed as it wasn't used anywhere. - [REMOVED] `FromData` is considered unused so was removed. @@ -29,12 +50,15 @@ - [ENHANCEMENT] PostListings and NotMatcher now public. ## 0.6.1 + - [BUGFIX] Update `last` after appending a non-overlapping chunk in `chunks.MergeOverlappingChunks`. [#539](https://github.com/prometheus/tsdb/pull/539) ## 0.6.0 + - [CHANGE] `AllowOverlappingBlock` is now `AllowOverlappingBlocks`. ## 0.5.0 + - [FEATURE] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370) - Disabled by default and can be enabled via `AllowOverlappingBlock` option. - Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks. @@ -50,6 +74,7 @@ - [BUGFIX] LiveReader can get into an infinite loop on corrupt WALs. ## 0.4.0 + - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: @@ -61,9 +86,11 @@ - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. ## 0.3.1 + - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. ## 0.3.0 + - [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path. - [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct. - [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors. diff --git a/vendor/github.com/prometheus/tsdb/Makefile.common b/vendor/github.com/prometheus/tsdb/Makefile.common index c7f9ea64f..48d2ff84e 100644 --- a/vendor/github.com/prometheus/tsdb/Makefile.common +++ b/vendor/github.com/prometheus/tsdb/Makefile.common @@ -69,7 +69,7 @@ else GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH) endif -PROMU_VERSION ?= 0.4.0 +PROMU_VERSION ?= 0.5.0 PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz GOLANGCI_LINT := @@ -86,6 +86,8 @@ endif PREFIX ?= $(shell pwd) BIN_DIR ?= $(shell pwd) DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD)) +DOCKERFILE_PATH ?= ./Dockerfile +DOCKERBUILD_CONTEXT ?= ./ DOCKER_REPO ?= prom DOCKER_ARCHS ?= amd64 @@ -210,9 +212,10 @@ common-tarball: promu common-docker: $(BUILD_DOCKER_ARCHS) $(BUILD_DOCKER_ARCHS): common-docker-%: docker build -t "$(DOCKER_REPO)/$(DOCKER_IMAGE_NAME)-linux-$*:$(DOCKER_IMAGE_TAG)" \ + -f $(DOCKERFILE_PATH) \ --build-arg ARCH="$*" \ --build-arg OS="linux" \ - . + $(DOCKERBUILD_CONTEXT) .PHONY: common-docker-publish $(PUBLISH_DOCKER_ARCHS) common-docker-publish: $(PUBLISH_DOCKER_ARCHS) @@ -247,7 +250,9 @@ proto: ifdef GOLANGCI_LINT $(GOLANGCI_LINT): mkdir -p $(FIRST_GOPATH)/bin - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(FIRST_GOPATH)/bin $(GOLANGCI_LINT_VERSION) + curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/$(GOLANGCI_LINT_VERSION)/install.sh \ + | sed -e '/install -d/d' \ + | sh -s -- -b $(FIRST_GOPATH)/bin $(GOLANGCI_LINT_VERSION) endif ifdef GOVENDOR diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 1b6e79d9d..6a8237f1f 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -151,12 +151,6 @@ type Appendable interface { Appender() Appender } -// SizeReader returns the size of the object in bytes. -type SizeReader interface { - // Size returns the size in bytes. - Size() int64 -} - // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -183,7 +177,6 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` - NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -214,24 +207,24 @@ const metaFilename = "meta.json" func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } -func readMetaFile(dir string) (*BlockMeta, error) { +func readMetaFile(dir string) (*BlockMeta, int64, error) { b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) if err != nil { - return nil, err + return nil, 0, err } var m BlockMeta if err := json.Unmarshal(b, &m); err != nil { - return nil, err + return nil, 0, err } if m.Version != 1 { - return nil, errors.Errorf("unexpected meta file version %d", m.Version) + return nil, 0, errors.Errorf("unexpected meta file version %d", m.Version) } - return &m, nil + return &m, int64(len(b)), nil } -func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error { +func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) (int64, error) { meta.Version = 1 // Make any changes to the file appear atomic. @@ -245,26 +238,32 @@ func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error { f, err := os.Create(tmp) if err != nil { - return err + return 0, err } - enc := json.NewEncoder(f) - enc.SetIndent("", "\t") + jsonMeta, err := json.MarshalIndent(meta, "", "\t") + if err != nil { + return 0, err + } var merr tsdb_errors.MultiError - if merr.Add(enc.Encode(meta)); merr.Err() != nil { + n, err := f.Write(jsonMeta) + if err != nil { + merr.Add(err) merr.Add(f.Close()) - return merr.Err() + return 0, merr.Err() } + // Force the kernel to persist the file on disk to avoid data loss if the host crashes. - if merr.Add(f.Sync()); merr.Err() != nil { + if err := f.Sync(); err != nil { + merr.Add(err) merr.Add(f.Close()) - return merr.Err() + return 0, merr.Err() } if err := f.Close(); err != nil { - return err + return 0, err } - return fileutil.Replace(tmp, path) + return int64(n), fileutil.Replace(tmp, path) } // Block represents a directory of time series data covering a continuous time range. @@ -285,6 +284,11 @@ type Block struct { tombstones TombstoneReader logger log.Logger + + numBytesChunks int64 + numBytesIndex int64 + numBytesTombstone int64 + numBytesMeta int64 } // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used @@ -302,7 +306,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er err = merr.Err() } }() - meta, err := readMetaFile(dir) + meta, sizeMeta, err := readMetaFile(dir) if err != nil { return nil, err } @@ -319,43 +323,28 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er } closers = append(closers, ir) - tr, tsr, err := readTombstones(dir) + tr, sizeTomb, err := readTombstones(dir) if err != nil { return nil, err } closers = append(closers, tr) - // TODO refactor to set this at block creation time as - // that would be the logical place for a block size to be calculated. - bs := blockSize(cr, ir, tsr) - meta.Stats.NumBytes = bs - err = writeMetaFile(logger, dir, meta) - if err != nil { - level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) - } - pb = &Block{ - dir: dir, - meta: *meta, - chunkr: cr, - indexr: ir, - tombstones: tr, - symbolTableSize: ir.SymbolTableSize(), - logger: logger, + dir: dir, + meta: *meta, + chunkr: cr, + indexr: ir, + tombstones: tr, + symbolTableSize: ir.SymbolTableSize(), + logger: logger, + numBytesChunks: cr.Size(), + numBytesIndex: ir.Size(), + numBytesTombstone: sizeTomb, + numBytesMeta: sizeMeta, } return pb, nil } -func blockSize(rr ...SizeReader) int64 { - var total int64 - for _, r := range rr { - if r != nil { - total += r.Size() - } - } - return total -} - // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -390,7 +379,9 @@ func (pb *Block) MinTime() int64 { return pb.meta.MinTime } func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime } // Size returns the number of bytes that the block takes up. -func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } +func (pb *Block) Size() int64 { + return pb.numBytesChunks + pb.numBytesIndex + pb.numBytesTombstone + pb.numBytesMeta +} // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") @@ -437,7 +428,12 @@ func (pb *Block) GetSymbolTableSize() uint64 { func (pb *Block) setCompactionFailed() error { pb.meta.Compaction.Failed = true - return writeMetaFile(pb.logger, pb.dir, &pb.meta) + n, err := writeMetaFile(pb.logger, pb.dir, &pb.meta) + if err != nil { + return err + } + pb.numBytesMeta = n + return nil } type blockIndexReader struct { @@ -457,7 +453,10 @@ func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, erro func (r blockIndexReader) Postings(name, value string) (index.Postings, error) { p, err := r.ir.Postings(name, value) - return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) + if err != nil { + return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) + } + return p, nil } func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { @@ -465,11 +464,10 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { } func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { - return errors.Wrapf( - r.ir.Series(ref, lset, chks), - "block: %s", - r.b.Meta().ULID, - ) + if err := r.ir.Series(ref, lset, chks); err != nil { + return errors.Wrapf(err, "block: %s", r.b.Meta().ULID) + } + return nil } func (r blockIndexReader) LabelIndices() ([][]string, error) { @@ -561,10 +559,17 @@ Outer: pb.tombstones = stones pb.meta.Stats.NumTombstones = pb.tombstones.Total() - if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil { + n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones) + if err != nil { return err } - return writeMetaFile(pb.logger, pb.dir, &pb.meta) + pb.numBytesTombstone = n + n, err = writeMetaFile(pb.logger, pb.dir, &pb.meta) + if err != nil { + return err + } + pb.numBytesMeta = n + return nil } // CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). diff --git a/vendor/github.com/prometheus/tsdb/checkpoint.go b/vendor/github.com/prometheus/tsdb/checkpoint.go index d8dee28aa..eccfa62be 100644 --- a/vendor/github.com/prometheus/tsdb/checkpoint.go +++ b/vendor/github.com/prometheus/tsdb/checkpoint.go @@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) if err := os.MkdirAll(cpdirtmp, 0777); err != nil { return nil, errors.Wrap(err, "create checkpoint dir") } - cp, err := wal.New(nil, nil, cpdirtmp) + cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled()) if err != nil { return nil, errors.Wrap(err, "open checkpoint") } diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunks.go b/vendor/github.com/prometheus/tsdb/chunks/chunks.go index 70cb119c5..9ce8c57da 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunks.go @@ -51,7 +51,9 @@ type Meta struct { Ref uint64 Chunk chunkenc.Chunk - MinTime, MaxTime int64 // time range the data covers + // Time range the data covers. + // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. + MinTime, MaxTime int64 } // writeHash writes the chunk encoding and raw data into the provided hash. @@ -218,7 +220,7 @@ func MergeOverlappingChunks(chks []Meta) ([]Meta, error) { // So never overlaps with newChks[last-1] or anything before that. if c.MinTime > newChks[last].MaxTime { newChks = append(newChks, c) - last += 1 + last++ continue } nc := &newChks[last] diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index c0948bbf3..e19b7ed76 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -84,7 +84,6 @@ type LeveledCompactor struct { type compactorMetrics struct { ran prometheus.Counter populatingBlocks prometheus.Gauge - failed prometheus.Counter overlappingBlocks prometheus.Counter duration prometheus.Histogram chunkSize prometheus.Histogram @@ -103,10 +102,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Name: "prometheus_tsdb_compaction_populating_block", Help: "Set to 1 when a block is currently being written to the disk.", }) - m.failed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_compactions_failed_total", - Help: "Total number of compactions that failed for the partition.", - }) m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_vertical_compactions_total", Help: "Total number of compactions done on overlapping blocks.", @@ -136,7 +131,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { r.MustRegister( m.ran, m.populatingBlocks, - m.failed, m.overlappingBlocks, m.duration, m.chunkRange, @@ -184,7 +178,7 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) { var dms []dirMeta for _, dir := range dirs { - meta, err := readMetaFile(dir) + meta, _, err := readMetaFile(dir) if err != nil { return nil, err } @@ -386,7 +380,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u start := time.Now() for _, d := range dirs { - meta, err := readMetaFile(d) + meta, _, err := readMetaFile(d) if err != nil { return uid, err } @@ -426,12 +420,14 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if meta.Stats.NumSamples == 0 { for _, b := range bs { b.meta.Compaction.Deletable = true - if err = writeMetaFile(c.logger, b.dir, &b.meta); err != nil { + n, err := writeMetaFile(c.logger, b.dir, &b.meta) + if err != nil { level.Error(c.logger).Log( "msg", "Failed to write 'Deletable' to meta file after compaction", "ulid", b.meta.ULID, ) } + b.numBytesMeta = n } uid = ulid.ULID{} level.Info(c.logger).Log( @@ -541,9 +537,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err := os.RemoveAll(tmp); err != nil { level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) } - if err != nil { - c.metrics.failed.Inc() - } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) @@ -609,12 +602,12 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return nil } - if err = writeMetaFile(c.logger, tmp, meta); err != nil { + if _, err = writeMetaFile(c.logger, tmp, meta); err != nil { return errors.Wrap(err, "write merged meta") } // Create an empty tombstones file. - if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil { + if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -764,6 +757,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for i, chk := range chks { + // Re-encode head chunks that are still open (being appended to) or + // outside the compacted MaxTime range. + // The chunk.Bytes() method is not safe for open chunks hence the re-encoding. + // This happens when snapshotting the head block. + // + // Block time range is half-open: [meta.MinTime, meta.MaxTime) and + // chunks are closed hence the chk.MaxTime >= meta.MaxTime check. + // + // TODO think how to avoid the typecasting to verify when it is head block. + if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime { + dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) + + } else + // Sanity check for disk blocks. + // chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that. if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime { return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d", chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime) @@ -781,12 +789,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} + + var ( + t int64 + v float64 + ) for it.Next() { - ts, v := it.At() - app.Append(ts, v) + t, v = it.At() + app.Append(t, v) + } + if err := it.Err(); err != nil { + return errors.Wrap(err, "iterate chunk while re-encoding") } chks[i].Chunk = newChunk + chks[i].MaxTime = t } } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index 52b21c2fd..e07f7d3e7 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -51,6 +51,7 @@ var DefaultOptions = &Options{ BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), NoLockfile: false, AllowOverlappingBlocks: false, + WALCompression: false, } // Options of the DB storage. @@ -80,6 +81,9 @@ type Options struct { // Overlapping blocks are allowed if AllowOverlappingBlocks is true. // This in-turn enables vertical compaction and vertical query merge. AllowOverlappingBlocks bool + + // WALCompression will turn on Snappy compression for records on the WAL. + WALCompression bool } // Appender allows appending a batch of data. It must be completed with a @@ -147,6 +151,7 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + compactionsFailed prometheus.Counter timeRetentionCount prometheus.Counter compactionsSkipped prometheus.Counter startTime prometheus.GaugeFunc @@ -191,6 +196,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.compactionsFailed = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_compactions_failed_total", + Help: "Total number of compactions that failed for the partition.", + }) m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_time_retentions_total", Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", @@ -231,6 +240,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.reloadsFailed, m.timeRetentionCount, m.compactionsTriggered, + m.compactionsFailed, m.startTime, m.tombCleanTimer, m.blocksBytes, @@ -300,7 +310,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if opts.WALSegmentSize > 0 { segmentSize = opts.WALSegmentSize } - wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize) + wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression) if err != nil { return nil, err } @@ -322,8 +332,12 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db minValidTime = blocks[len(blocks)-1].Meta().MaxTime } - if err := db.head.Init(minValidTime); err != nil { - return nil, errors.Wrap(err, "read WAL") + if initErr := db.head.Init(minValidTime); initErr != nil { + db.head.metrics.walCorruptionsTotal.Inc() + level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err) + if err := wlog.Repair(initErr); err != nil { + return nil, errors.Wrap(err, "repair corrupted WAL") + } } go db.run() @@ -411,6 +425,11 @@ func (a dbAppender) Commit() error { func (db *DB) compact() (err error) { db.cmtx.Lock() defer db.cmtx.Unlock() + defer func() { + if err != nil { + db.metrics.compactionsFailed.Inc() + } + }() // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. for { @@ -610,7 +629,7 @@ func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err corrupted = make(map[ulid.ULID]error) for _, dir := range dirs { - meta, err := readMetaFile(dir) + meta, _, err := readMetaFile(dir) if err != nil { level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) continue @@ -924,8 +943,20 @@ func (db *DB) Snapshot(dir string, withHead bool) error { if !withHead { return nil } - _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil) - return errors.Wrap(err, "snapshot head block") + + mint := db.head.MinTime() + maxt := db.head.MaxTime() + head := &rangeHead{ + head: db.head, + mint: mint, + maxt: maxt, + } + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + if _, err := db.compactor.Write(dir, head, mint, maxt+1, nil); err != nil { + return errors.Wrap(err, "snapshot head block") + } + return nil } // Querier returns a new querier over the data partition for the given time range. diff --git a/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go b/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go index c55a2b81d..4088f522a 100644 --- a/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go +++ b/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go @@ -128,9 +128,19 @@ func Rename(from, to string) error { // Replace moves a file or directory to a new location and deletes any previous data. // It is not atomic. func Replace(from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err + // Remove destination only if it is a dir otherwise leave it to os.Rename + // as it replaces the destination file and is atomic. + { + f, err := os.Stat(to) + if !os.IsNotExist(err) { + if err == nil && f.IsDir() { + if err := os.RemoveAll(to); err != nil { + return err + } + } + } } + if err := os.Rename(from, to); err != nil { return err } diff --git a/vendor/github.com/prometheus/tsdb/go.mod b/vendor/github.com/prometheus/tsdb/go.mod index 02f3cf9e7..ccdd43724 100644 --- a/vendor/github.com/prometheus/tsdb/go.mod +++ b/vendor/github.com/prometheus/tsdb/go.mod @@ -1,28 +1,14 @@ module github.com/prometheus/tsdb require ( - github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect - github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect - github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect github.com/cespare/xxhash v1.1.0 - github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954 github.com/go-kit/kit v0.8.0 - github.com/go-logfmt/logfmt v0.3.0 // indirect - github.com/go-stack/stack v1.8.0 // indirect - github.com/gogo/protobuf v1.1.1 // indirect - github.com/golang/protobuf v1.2.0 // indirect - github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/golang/snappy v0.0.1 github.com/oklog/ulid v1.3.1 github.com/pkg/errors v0.8.0 - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v0.9.1 - github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect - github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce // indirect - github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d // indirect - github.com/stretchr/testify v1.2.2 // indirect - golang.org/x/sync v0.0.0-20181108010431-42b317875d0f - golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8 + github.com/prometheus/client_golang v1.0.0 + golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 + golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 gopkg.in/alecthomas/kingpin.v2 v2.2.6 ) diff --git a/vendor/github.com/prometheus/tsdb/go.sum b/vendor/github.com/prometheus/tsdb/go.sum index 266fbe96b..365fa5ecf 100644 --- a/vendor/github.com/prometheus/tsdb/go.sum +++ b/vendor/github.com/prometheus/tsdb/go.sum @@ -6,8 +6,11 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZq github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954 h1:RMLoZVzv4GliuWafOuPuQDKSm1SJph7uCRnnS61JAn4= @@ -22,10 +25,20 @@ github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= @@ -34,19 +47,37 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1 h1:K47Rk0v/fkEfwfQet2KWhscE0cJzjgCCDBG2KHZoVno= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce h1:X0jFYGnHemYDIW6jlc+fSI8f9Cg+jqCnClYP2WgZT/A= -github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8 h1:YoY1wS6JYVRpIfFngRf2HHo9R9dAne3xbkGOQ5rJXjU= -golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 92619a640..5e2eae858 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -14,6 +14,7 @@ package tsdb import ( + "fmt" "math" "runtime" "sort" @@ -140,8 +141,9 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Help: "Total number of chunks removed in the head", }) m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_head_gc_duration_seconds", - Help: "Runtime of garbage collection in the head block.", + Name: "prometheus_tsdb_head_gc_duration_seconds", + Help: "Runtime of garbage collection in the head block.", + Objectives: map[float64]float64{}, }) m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_head_max_time", @@ -156,8 +158,9 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { return float64(h.MinTime()) }) m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_truncate_duration_seconds", - Help: "Duration of WAL truncation.", + Name: "prometheus_tsdb_wal_truncate_duration_seconds", + Help: "Duration of WAL truncation.", + Objectives: map[float64]float64{}, }) m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_wal_corruptions_total", @@ -312,7 +315,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { } } -func (h *Head) loadWAL(r *wal.Reader) error { +func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs uint64 @@ -321,13 +324,26 @@ func (h *Head) loadWAL(r *wal.Reader) error { // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - inputs = make([]chan []RefSample, n) - outputs = make([]chan []RefSample, n) + wg sync.WaitGroup + multiRefLock sync.Mutex + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []RefSample, n) + outputs = make([]chan []RefSample, n) ) wg.Add(n) + defer func() { + // For CorruptionErr ensure to terminate all workers before exiting. + if _, ok := err.(*wal.CorruptionErr); ok { + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } + } + wg.Wait() + } + }() + for i := 0; i < n; i++ { outputs[i] = make(chan []RefSample, 300) inputs[i] = make(chan []RefSample, 300) @@ -345,9 +361,12 @@ func (h *Head) loadWAL(r *wal.Reader) error { samples []RefSample tstones []Stone allStones = newMemTombstones() - err error ) - defer allStones.Close() + defer func() { + if err := allStones.Close(); err != nil { + level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err) + } + }() for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] rec := r.Record() @@ -363,7 +382,14 @@ func (h *Head) loadWAL(r *wal.Reader) error { } } for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if !created { + // There's already a different ref for this series. + multiRefLock.Lock() + multiRef[s.Ref] = series.ref + multiRefLock.Unlock() + } if h.lastSeriesID < s.Ref { h.lastSeriesID = s.Ref @@ -398,6 +424,9 @@ func (h *Head) loadWAL(r *wal.Reader) error { shards[i] = buf[:0] } for _, sam := range samples[:m] { + if r, ok := multiRef[sam.Ref]; ok { + sam.Ref = r + } mod := sam.Ref % uint64(n) shards[mod] = append(shards[mod], sam) } @@ -436,9 +465,6 @@ func (h *Head) loadWAL(r *wal.Reader) error { } } } - if r.Err() != nil { - return errors.Wrap(r.Err(), "read records") - } // Signal termination to each worker and wait for it to close its output channel. for i := 0; i < n; i++ { @@ -448,6 +474,10 @@ func (h *Head) loadWAL(r *wal.Reader) error { } wg.Wait() + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } + if err := allStones.Iter(func(ref uint64, dranges Intervals) error { return h.chunkRewrite(ref, dranges) }); err != nil { @@ -477,37 +507,49 @@ func (h *Head) Init(minValidTime int64) error { if err != nil && err != ErrNotFound { return errors.Wrap(err, "find last checkpoint") } + multiRef := map[uint64]uint64{} if err == nil { sr, err := wal.NewSegmentsReader(dir) if err != nil { return errors.Wrap(err, "open checkpoint") } - defer sr.Close() + defer func() { + if err := sr.Close(); err != nil { + level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + }() // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wal.NewReader(sr)); err != nil { + if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil { return errors.Wrap(err, "backfill checkpoint") } startFrom++ } - // Backfill segments from the last checkpoint onwards - sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1}) + // Find the last segment. + _, last, err := h.wal.Segments() if err != nil { - return errors.Wrap(err, "open WAL segments") + return errors.Wrap(err, "finding WAL segments") } - err = h.loadWAL(wal.NewReader(sr)) - sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows. - if err == nil { - return nil - } - level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) - h.metrics.walCorruptionsTotal.Inc() - if err := h.wal.Repair(err); err != nil { - return errors.Wrap(err, "repair corrupted WAL") + // Backfill segments from the most recent checkpoint onwards. + for i := startFrom; i <= last; i++ { + s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i)) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) + } + + sr := wal.NewSegmentBufReader(s) + err = h.loadWAL(wal.NewReader(sr), multiRef) + if err := sr.Close(); err != nil { + level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + if err != nil { + return err + } } + return nil } @@ -553,6 +595,12 @@ func (h *Head) Truncate(mint int64) (err error) { if err != nil { return errors.Wrap(err, "get segment range") } + // Start a new segment, so low ingestion volume TSDB don't have more WAL than + // needed. + err = h.wal.NextSegment() + if err != nil { + return errors.Wrap(err, "next segment") + } last-- // Never consider last segment for checkpoint. if last < 0 { return nil // no segments yet. @@ -1250,9 +1298,15 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks if !c.OverlapsClosedInterval(h.mint, h.maxt) { continue } + // Set the head chunks as open (being appended to). + maxTime := c.maxTime + if s.headChunk == c { + maxTime = math.MaxInt64 + } + *chks = append(*chks, chunks.Meta{ MinTime: c.minTime, - MaxTime: c.maxTime, + MaxTime: maxTime, Ref: packChunkID(s.ref, uint64(s.chunkID(i))), }) } diff --git a/vendor/github.com/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/tsdb/index/postings.go index bb7b5837a..cef2d886e 100644 --- a/vendor/github.com/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/tsdb/index/postings.go @@ -303,68 +303,68 @@ func Intersect(its ...Postings) Postings { if len(its) == 1 { return its[0] } - - l := len(its) / 2 - a := Intersect(its[:l]...) - b := Intersect(its[l:]...) - - if a == EmptyPostings() || b == EmptyPostings() { - return EmptyPostings() + for _, p := range its { + if p == EmptyPostings() { + return EmptyPostings() + } } - return newIntersectPostings(a, b) + + return newIntersectPostings(its...) } type intersectPostings struct { - a, b Postings - cur uint64 + arr []Postings + cur uint64 } -func newIntersectPostings(a, b Postings) *intersectPostings { - return &intersectPostings{a: a, b: b} +func newIntersectPostings(its ...Postings) *intersectPostings { + return &intersectPostings{arr: its} } func (it *intersectPostings) At() uint64 { return it.cur } -func (it *intersectPostings) doNext(id uint64) bool { +func (it *intersectPostings) doNext() bool { +Loop: for { - if !it.b.Seek(id) { - return false - } - if vb := it.b.At(); vb != id { - if !it.a.Seek(vb) { + for _, p := range it.arr { + if !p.Seek(it.cur) { return false } - id = it.a.At() - if vb != id { - continue + if p.At() > it.cur { + it.cur = p.At() + continue Loop } } - it.cur = id return true } } func (it *intersectPostings) Next() bool { - if !it.a.Next() { - return false + for _, p := range it.arr { + if !p.Next() { + return false + } + if p.At() > it.cur { + it.cur = p.At() + } } - return it.doNext(it.a.At()) + return it.doNext() } func (it *intersectPostings) Seek(id uint64) bool { - if !it.a.Seek(id) { - return false - } - return it.doNext(it.a.At()) + it.cur = id + return it.doNext() } func (it *intersectPostings) Err() error { - if it.a.Err() != nil { - return it.a.Err() + for _, p := range it.arr { + if p.Err() != nil { + return p.Err() + } } - return it.b.Err() + return nil } // Merge returns a new iterator over the union of the input iterators. diff --git a/vendor/github.com/prometheus/tsdb/labels/selector.go b/vendor/github.com/prometheus/tsdb/labels/selector.go index a0565f57e..c94ebb332 100644 --- a/vendor/github.com/prometheus/tsdb/labels/selector.go +++ b/vendor/github.com/prometheus/tsdb/labels/selector.go @@ -63,14 +63,15 @@ func NewEqualMatcher(name, value string) Matcher { return &EqualMatcher{name: name, value: value} } -type regexpMatcher struct { +type RegexpMatcher struct { name string re *regexp.Regexp } -func (m regexpMatcher) Name() string { return m.name } -func (m regexpMatcher) Matches(v string) bool { return m.re.MatchString(v) } -func (m regexpMatcher) String() string { return fmt.Sprintf("%s=~%q", m.name, m.re.String()) } +func (m RegexpMatcher) Name() string { return m.name } +func (m RegexpMatcher) Matches(v string) bool { return m.re.MatchString(v) } +func (m RegexpMatcher) String() string { return fmt.Sprintf("%s=~%q", m.name, m.re.String()) } +func (m RegexpMatcher) Value() string { return m.re.String() } // NewRegexpMatcher returns a new matcher verifying that a value matches // the regular expression pattern. @@ -79,7 +80,7 @@ func NewRegexpMatcher(name, pattern string) (Matcher, error) { if err != nil { return nil, err } - return ®expMatcher{name: name, re: re}, nil + return &RegexpMatcher{name: name, re: re}, nil } // NewMustRegexpMatcher returns a new matcher verifying that a value matches @@ -90,7 +91,7 @@ func NewMustRegexpMatcher(name, pattern string) Matcher { if err != nil { panic(err) } - return ®expMatcher{name: name, re: re} + return &RegexpMatcher{name: name, re: re} } diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 9d99de083..253102b0e 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -17,6 +17,7 @@ import ( "fmt" "sort" "strings" + "unicode/utf8" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" @@ -266,6 +267,62 @@ func (q *blockQuerier) Close() error { return merr.Err() } +// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped. +var regexMetaCharacterBytes [16]byte + +// isRegexMetaCharacter reports whether byte b needs to be escaped. +func isRegexMetaCharacter(b byte) bool { + return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0 +} + +func init() { + for _, b := range []byte(`.+*?()|[]{}^$`) { + regexMetaCharacterBytes[b%16] |= 1 << (b / 16) + } +} + +func findSetMatches(pattern string) []string { + // Return empty matches if the wrapper from Prometheus is missing. + if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" { + return nil + } + escaped := false + sets := []*strings.Builder{&strings.Builder{}} + for i := 4; i < len(pattern)-2; i++ { + if escaped { + switch { + case isRegexMetaCharacter(pattern[i]): + sets[len(sets)-1].WriteByte(pattern[i]) + case pattern[i] == '\\': + sets[len(sets)-1].WriteByte('\\') + default: + return nil + } + escaped = false + } else { + switch { + case isRegexMetaCharacter(pattern[i]): + if pattern[i] == '|' { + sets = append(sets, &strings.Builder{}) + } else { + return nil + } + case pattern[i] == '\\': + escaped = true + default: + sets[len(sets)-1].WriteByte(pattern[i]) + } + } + } + matches := make([]string, 0, len(sets)) + for _, s := range sets { + if s.Len() > 0 { + matches = append(matches, s.String()) + } + } + return matches +} + // PostingsForMatchers assembles a single postings iterator against the index reader // based on the given matchers. func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings, error) { @@ -346,6 +403,14 @@ func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error return ix.Postings(em.Name(), em.Value()) } + // Fast-path for set matching. + if em, ok := m.(*labels.RegexpMatcher); ok { + setMatches := findSetMatches(em.Value()) + if len(setMatches) > 0 { + return postingsForSetMatcher(ix, em.Name(), setMatches) + } + } + tpls, err := ix.LabelValues(m.Name()) if err != nil { return nil, err @@ -411,6 +476,18 @@ func inversePostingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings return index.Merge(rit...), nil } +func postingsForSetMatcher(ix IndexReader, name string, matches []string) (index.Postings, error) { + var its []index.Postings + for _, match := range matches { + if it, err := ix.Postings(name, match); err == nil { + its = append(its, it) + } else { + return nil, err + } + } + return index.Merge(its...), nil +} + func mergeStrings(a, b []string) []string { maxl := len(a) if len(b) > len(a) { diff --git a/vendor/github.com/prometheus/tsdb/repair.go b/vendor/github.com/prometheus/tsdb/repair.go index 38138b12a..1d299047a 100644 --- a/vendor/github.com/prometheus/tsdb/repair.go +++ b/vendor/github.com/prometheus/tsdb/repair.go @@ -109,7 +109,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { } // Reset version of meta.json to 1. meta.Version = 1 - if err := writeMetaFile(logger, d, meta); err != nil { + if _, err := writeMetaFile(logger, d, meta); err != nil { return wrapErr(err, d) } } diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index 220af4900..d7b76230c 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -54,14 +54,15 @@ type TombstoneReader interface { Close() error } -func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error { +func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" hash := newCRC32() + var size int f, err := os.Create(tmp) if err != nil { - return err + return 0, err } defer func() { if f != nil { @@ -79,10 +80,11 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error // Write the meta. buf.PutBE32(MagicTombstone) buf.PutByte(tombstoneFormatV1) - _, err = f.Write(buf.Get()) + n, err := f.Write(buf.Get()) if err != nil { - return err + return 0, err } + size += n mw := io.MultiWriter(f, hash) @@ -94,32 +96,34 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error buf.PutVarint64(iv.Mint) buf.PutVarint64(iv.Maxt) - _, err = mw.Write(buf.Get()) + n, err = mw.Write(buf.Get()) if err != nil { return err } + size += n } return nil }); err != nil { - return fmt.Errorf("error writing tombstones: %v", err) + return 0, fmt.Errorf("error writing tombstones: %v", err) } - _, err = f.Write(hash.Sum(nil)) + n, err = f.Write(hash.Sum(nil)) if err != nil { - return err + return 0, err } + size += n var merr tsdb_errors.MultiError if merr.Add(f.Sync()); merr.Err() != nil { merr.Add(f.Close()) - return merr.Err() + return 0, merr.Err() } if err = f.Close(); err != nil { - return err + return 0, err } f = nil - return fileutil.Replace(tmp, path) + return int64(size), fileutil.Replace(tmp, path) } // Stone holds the information on the posting and time-range @@ -129,41 +133,37 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, SizeReader, error) { +func readTombstones(dir string) (TombstoneReader, int64, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil, nil + return newMemTombstones(), 0, nil } else if err != nil { - return nil, nil, err - } - - sr := &TombstoneFile{ - size: int64(len(b)), + return nil, 0, err } if len(b) < 5 { - return nil, sr, errors.Wrap(encoding.ErrInvalidSize, "tombstones header") + return nil, 0, errors.Wrap(encoding.ErrInvalidSize, "tombstones header") } d := &encoding.Decbuf{B: b[:len(b)-4]} // 4 for the checksum. if mg := d.Be32(); mg != MagicTombstone { - return nil, sr, fmt.Errorf("invalid magic number %x", mg) + return nil, 0, fmt.Errorf("invalid magic number %x", mg) } if flag := d.Byte(); flag != tombstoneFormatV1 { - return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) + return nil, 0, fmt.Errorf("invalid tombstone format %x", flag) } if d.Err() != nil { - return nil, sr, d.Err() + return nil, 0, d.Err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.Get()); err != nil { - return nil, sr, errors.Wrap(err, "write to hash") + return nil, 0, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, sr, errors.New("checksum did not match") + return nil, 0, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -173,13 +173,13 @@ func readTombstones(dir string) (TombstoneReader, SizeReader, error) { mint := d.Varint64() maxt := d.Varint64() if d.Err() != nil { - return nil, sr, d.Err() + return nil, 0, d.Err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, sr, nil + return stonesMap, int64(len(b)), nil } type memTombstones struct { @@ -230,16 +230,6 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } -// TombstoneFile holds information about the tombstone file. -type TombstoneFile struct { - size int64 -} - -// Size returns the tombstone file size. -func (t *TombstoneFile) Size() int64 { - return t.size -} - func (*memTombstones) Close() error { return nil } diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 86b3bf79c..49f55fe40 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -65,8 +65,9 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { m := &walMetrics{} m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_fsync_duration_seconds", - Help: "Duration of WAL fsync.", + Name: "prometheus_tsdb_wal_fsync_duration_seconds", + Help: "Duration of WAL fsync.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_wal_corruptions_total", @@ -1245,7 +1246,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err := os.RemoveAll(tmpdir); err != nil { return errors.Wrap(err, "cleanup replacement dir") } - repl, err := wal.New(logger, nil, tmpdir) + repl, err := wal.New(logger, nil, tmpdir, false) if err != nil { return errors.Wrap(err, "open new WAL") } diff --git a/vendor/github.com/prometheus/tsdb/wal/live_reader.go b/vendor/github.com/prometheus/tsdb/wal/live_reader.go index 8394bfd08..94175e791 100644 --- a/vendor/github.com/prometheus/tsdb/wal/live_reader.go +++ b/vendor/github.com/prometheus/tsdb/wal/live_reader.go @@ -22,28 +22,46 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" ) -var ( - readerCorruptionErrors = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_reader_corruption_errors", - Help: "Errors encountered when reading the WAL.", - }, []string{"error"}) -) +// liveReaderMetrics holds all metrics exposed by the LiveReader. +type liveReaderMetrics struct { + readerCorruptionErrors *prometheus.CounterVec +} + +// LiveReaderMetrics instatiates, registers and returns metrics to be injected +// at LiveReader instantiation. +func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { + m := &liveReaderMetrics{ + readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_reader_corruption_errors_total", + Help: "Errors encountered when reading the WAL.", + }, []string{"error"}), + } + + if reg != nil { + reg.Register(m.readerCorruptionErrors) + } + + return m +} // NewLiveReader returns a new live reader. -func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader { - return &LiveReader{ - logger: logger, - rdr: r, +func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader { + lr := &LiveReader{ + logger: logger, + rdr: r, + metrics: metrics, // Until we understand how they come about, make readers permissive // to records spanning pages. permissive: true, } + + return lr } // LiveReader reads WAL records from an io.Reader. It allows reading of WALs @@ -54,6 +72,7 @@ type LiveReader struct { rdr io.Reader err error rec []byte + snappyBuf []byte hdr [recordHeaderSize]byte buf [pageSize]byte readIndex int // Index in buf to start at for next read. @@ -68,6 +87,8 @@ type LiveReader struct { // does. Until we track down why, set permissive to true to tolerate it. // NB the non-ive Reader implementation allows for this. permissive bool + + metrics *liveReaderMetrics } // Err returns any errors encountered reading the WAL. io.EOFs are not terminal @@ -166,11 +187,18 @@ func (r *LiveReader) buildRecord() (bool, error) { return false, nil } - rt := recType(r.hdr[0]) + rt := recTypeFromHeader(r.hdr[0]) if rt == recFirst || rt == recFull { r.rec = r.rec[:0] + r.snappyBuf = r.snappyBuf[:0] + } + + compressed := r.hdr[0]&snappyMask != 0 + if compressed { + r.snappyBuf = append(r.snappyBuf, temp...) + } else { + r.rec = append(r.rec, temp...) } - r.rec = append(r.rec, temp...) if err := validateRecord(rt, r.index); err != nil { r.index = 0 @@ -178,6 +206,16 @@ func (r *LiveReader) buildRecord() (bool, error) { } if rt == recLast || rt == recFull { r.index = 0 + if compressed && len(r.snappyBuf) > 0 { + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + r.rec = r.rec[:cap(r.rec)] + r.rec, err = snappy.Decode(r.rec, r.snappyBuf) + if err != nil { + return false, err + } + } return true, nil } // Only increment i for non-zero records since we use it @@ -258,7 +296,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) { if !r.permissive { return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize) } - readerCorruptionErrors.WithLabelValues("record_span_page").Inc() + r.metrics.readerCorruptionErrors.WithLabelValues("record_span_page").Inc() level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize) } if recordHeaderSize+length > pageSize { diff --git a/vendor/github.com/prometheus/tsdb/wal/reader.go b/vendor/github.com/prometheus/tsdb/wal/reader.go index 297463b00..7612f8775 100644 --- a/vendor/github.com/prometheus/tsdb/wal/reader.go +++ b/vendor/github.com/prometheus/tsdb/wal/reader.go @@ -19,6 +19,7 @@ import ( "hash/crc32" "io" + "github.com/golang/snappy" "github.com/pkg/errors" ) @@ -27,6 +28,7 @@ type Reader struct { rdr io.Reader err error rec []byte + snappyBuf []byte buf [pageSize]byte total int64 // Total bytes processed. curRecTyp recType // Used for checking that the last record is not torn. @@ -45,7 +47,7 @@ func (r *Reader) Next() bool { // The last WAL segment record shouldn't be torn(should be full or last). // The last record would be torn after a crash just before // the last record part could be persisted to disk. - if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { + if r.curRecTyp == recFirst || r.curRecTyp == recMiddle { r.err = errors.New("last record is torn") } return false @@ -61,6 +63,7 @@ func (r *Reader) next() (err error) { buf := r.buf[recordHeaderSize:] r.rec = r.rec[:0] + r.snappyBuf = r.snappyBuf[:0] i := 0 for { @@ -68,7 +71,8 @@ func (r *Reader) next() (err error) { return errors.Wrap(err, "read first header byte") } r.total++ - r.curRecTyp = recType(hdr[0]) + r.curRecTyp = recTypeFromHeader(hdr[0]) + compressed := hdr[0]&snappyMask != 0 // Gobble up zero bytes. if r.curRecTyp == recPageTerm { @@ -123,12 +127,25 @@ func (r *Reader) next() (err error) { if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { return errors.Errorf("unexpected checksum %x, expected %x", c, crc) } - r.rec = append(r.rec, buf[:length]...) + + if compressed { + r.snappyBuf = append(r.snappyBuf, buf[:length]...) + } else { + r.rec = append(r.rec, buf[:length]...) + } if err := validateRecord(r.curRecTyp, i); err != nil { return err } if r.curRecTyp == recLast || r.curRecTyp == recFull { + if compressed && len(r.snappyBuf) > 0 { + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + r.rec = r.rec[:cap(r.rec)] + r.rec, err = snappy.Decode(r.rec, r.snappyBuf) + return err + } return nil } diff --git a/vendor/github.com/prometheus/tsdb/wal/wal.go b/vendor/github.com/prometheus/tsdb/wal/wal.go index 46504f0d9..39daba975 100644 --- a/vendor/github.com/prometheus/tsdb/wal/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal/wal.go @@ -29,6 +29,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" @@ -165,6 +166,8 @@ type WAL struct { stopc chan chan struct{} actorc chan func() closed bool // To allow calling Close() more than once without blocking. + compress bool + snappyBuf []byte fsyncDuration prometheus.Summary pageFlushes prometheus.Counter @@ -175,13 +178,13 @@ type WAL struct { } // New returns a new WAL over the given directory. -func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { - return NewSize(logger, reg, dir, DefaultSegmentSize) +func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) { + return NewSize(logger, reg, dir, DefaultSegmentSize, compress) } // NewSize returns a new WAL over the given directory. // New segments are created with the specified size. -func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -198,10 +201,12 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi page: &page{}, actorc: make(chan func(), 100), stopc: make(chan chan struct{}), + compress: compress, } w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_fsync_duration_seconds", - Help: "Duration of WAL fsync.", + Name: "prometheus_tsdb_wal_fsync_duration_seconds", + Help: "Duration of WAL fsync.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_wal_page_flushes_total", @@ -228,34 +233,35 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi } _, j, err := w.Segments() + // Index of the Segment we want to open and write to. + writeSegmentIndex := 0 if err != nil { return nil, errors.Wrap(err, "get segment range") } - // Fresh dir, no segments yet. - if j == -1 { - segment, err := CreateSegment(w.dir, 0) - if err != nil { - return nil, err - } - - if err := w.setSegment(segment); err != nil { - return nil, err - } - } else { - segment, err := OpenWriteSegment(logger, w.dir, j) - if err != nil { - return nil, err - } - - if err := w.setSegment(segment); err != nil { - return nil, err - } + // If some segments already exist create one with a higher index than the last segment. + if j != -1 { + writeSegmentIndex = j + 1 } + + segment, err := CreateSegment(w.dir, writeSegmentIndex) + if err != nil { + return nil, err + } + + if err := w.setSegment(segment); err != nil { + return nil, err + } + go w.run() return w, nil } +// CompressionEnabled returns if compression is enabled on this WAL. +func (w *WAL) CompressionEnabled() bool { + return w.compress +} + // Dir returns the directory of the WAL. func (w *WAL) Dir() string { return w.dir @@ -363,6 +369,9 @@ func (w *WAL) Repair(origErr error) error { } // We expect an error here from r.Err(), so nothing to handle. + // We need to pad to the end of the last page in the repaired segment + w.flushPage(true) + // We explicitly close even when there is a defer for Windows to be // able to delete it. The defer is in place to close it in-case there // are errors above. @@ -372,6 +381,20 @@ func (w *WAL) Repair(origErr error) error { if err := os.Remove(tmpfn); err != nil { return errors.Wrap(err, "delete corrupted segment") } + + // Explicitly close the the segment we just repaired to avoid issues with Windows. + s.Close() + + // We always want to start writing to a new Segment rather than an existing + // Segment, which is handled by NewSize, but earlier in Repair we're deleting + // all segments that come after the corrupted Segment. Recreate a new Segment here. + s, err = CreateSegment(w.dir, cerr.Segment+1) + if err != nil { + return err + } + if err := w.setSegment(s); err != nil { + return err + } return nil } @@ -380,6 +403,13 @@ func SegmentName(dir string, i int) string { return filepath.Join(dir, fmt.Sprintf("%08d", i)) } +// NextSegment creates the next segment and closes the previous one. +func (w *WAL) NextSegment() error { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.nextSegment() +} + // nextSegment creates the next segment and closes the previous one. func (w *WAL) nextSegment() error { // Only flush the current page if it actually holds data. @@ -455,6 +485,14 @@ func (w *WAL) flushPage(clear bool) error { return nil } +// First Byte of header format: +// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ] + +const ( + snappyMask = 1 << 3 + recTypeMask = snappyMask - 1 +) + type recType uint8 const ( @@ -465,6 +503,10 @@ const ( recLast recType = 4 // Final fragment of a record. ) +func recTypeFromHeader(header byte) recType { + return recType(header & recTypeMask) +} + func (t recType) String() string { switch t { case recPageTerm: @@ -525,6 +567,19 @@ func (w *WAL) log(rec []byte, final bool) error { } } + compressed := false + if w.compress && len(rec) > 0 { + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)] + w.snappyBuf = snappy.Encode(w.snappyBuf, rec) + if len(w.snappyBuf) < len(rec) { + rec = w.snappyBuf + compressed = true + } + } + // Populate as many pages as necessary to fit the record. // Be careful to always do one pass to ensure we write zero-length records. for i := 0; i == 0 || len(rec) > 0; i++ { @@ -548,6 +603,9 @@ func (w *WAL) log(rec []byte, final bool) error { default: typ = recMiddle } + if compressed { + typ |= snappyMask + } buf[0] = byte(typ) crc := crc32.Checksum(part, castagnoliTable) @@ -710,7 +768,7 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { segs = append(segs, s) } } - return newSegmentBufReader(segs...), nil + return NewSegmentBufReader(segs...), nil } // segmentBufReader is a buffered reader that reads in multiples of pages. @@ -725,7 +783,7 @@ type segmentBufReader struct { off int // Offset of read data into current segment. } -func newSegmentBufReader(segs ...*Segment) *segmentBufReader { +func NewSegmentBufReader(segs ...*Segment) *segmentBufReader { return &segmentBufReader{ buf: bufio.NewReaderSize(segs[0], 16*pageSize), segs: segs, diff --git a/vendor/modules.txt b/vendor/modules.txt index b078abb52..018c43eb4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -278,7 +278,7 @@ github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg # github.com/prometheus/procfs v0.0.2 github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs -# github.com/prometheus/tsdb v0.8.0 +# github.com/prometheus/tsdb v0.9.1 github.com/prometheus/tsdb github.com/prometheus/tsdb/fileutil github.com/prometheus/tsdb/labels