Merge remote-tracking branch 'upstream/master' into shutdown-during-compaction

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2019-02-12 10:56:45 +02:00
commit beee5c58f3
28 changed files with 610 additions and 418 deletions

View file

@ -29,12 +29,15 @@ GO ?= go
GOFMT ?= $(GO)fmt GOFMT ?= $(GO)fmt
FIRST_GOPATH := $(firstword $(subst :, ,$(shell $(GO) env GOPATH))) FIRST_GOPATH := $(firstword $(subst :, ,$(shell $(GO) env GOPATH)))
GOOPTS ?= GOOPTS ?=
GOHOSTOS ?= $(shell $(GO) env GOHOSTOS)
GOHOSTARCH ?= $(shell $(GO) env GOHOSTARCH)
GO_VERSION ?= $(shell $(GO) version) GO_VERSION ?= $(shell $(GO) version)
GO_VERSION_NUMBER ?= $(word 3, $(GO_VERSION)) GO_VERSION_NUMBER ?= $(word 3, $(GO_VERSION))
PRE_GO_111 ?= $(shell echo $(GO_VERSION_NUMBER) | grep -E 'go1\.(10|[0-9])\.') PRE_GO_111 ?= $(shell echo $(GO_VERSION_NUMBER) | grep -E 'go1\.(10|[0-9])\.')
unexport GOVENDOR GOVENDOR :=
GO111MODULE :=
ifeq (, $(PRE_GO_111)) ifeq (, $(PRE_GO_111))
ifneq (,$(wildcard go.mod)) ifneq (,$(wildcard go.mod))
# Enforce Go modules support just in case the directory is inside GOPATH (and for Travis CI). # Enforce Go modules support just in case the directory is inside GOPATH (and for Travis CI).
@ -55,24 +58,35 @@ $(warning Some recipes may not work as expected as the current Go runtime is '$(
# This repository isn't using Go modules (yet). # This repository isn't using Go modules (yet).
GOVENDOR := $(FIRST_GOPATH)/bin/govendor GOVENDOR := $(FIRST_GOPATH)/bin/govendor
endif endif
unexport GO111MODULE
endif endif
PROMU := $(FIRST_GOPATH)/bin/promu PROMU := $(FIRST_GOPATH)/bin/promu
STATICCHECK := $(FIRST_GOPATH)/bin/staticcheck STATICCHECK := $(FIRST_GOPATH)/bin/staticcheck
pkgs = ./... pkgs = ./...
GO_VERSION ?= $(shell $(GO) version) ifeq (arm, $(GOHOSTARCH))
GO_BUILD_PLATFORM ?= $(subst /,-,$(lastword $(GO_VERSION))) GOHOSTARM ?= $(shell GOARM= $(GO) env GOARM)
GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH)v$(GOHOSTARM)
else
GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH)
endif
PROMU_VERSION ?= 0.2.0 PROMU_VERSION ?= 0.2.0
PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz
STATICCHECK_VERSION ?= 2019.1
STATICCHECK_URL := https://github.com/dominikh/go-tools/releases/download/$(STATICCHECK_VERSION)/staticcheck_$(GOHOSTOS)_$(GOHOSTARCH)
PREFIX ?= $(shell pwd) PREFIX ?= $(shell pwd)
BIN_DIR ?= $(shell pwd) BIN_DIR ?= $(shell pwd)
DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD)) DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))
DOCKER_REPO ?= prom DOCKER_REPO ?= prom
ifeq ($(GOHOSTARCH),amd64)
ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux freebsd darwin windows))
# Only supported on amd64
test-flags := -race
endif
endif
.PHONY: all .PHONY: all
all: precheck style staticcheck unused build test all: precheck style staticcheck unused build test
@ -110,12 +124,12 @@ common-test-short:
.PHONY: common-test .PHONY: common-test
common-test: common-test:
@echo ">> running all tests" @echo ">> running all tests"
GO111MODULE=$(GO111MODULE) $(GO) test -race $(GOOPTS) $(pkgs) GO111MODULE=$(GO111MODULE) $(GO) test $(test-flags) $(GOOPTS) $(pkgs)
.PHONY: common-format .PHONY: common-format
common-format: common-format:
@echo ">> formatting code" @echo ">> formatting code"
GO111MODULE=$(GO111MODULE) $(GO) fmt $(GOOPTS) $(pkgs) GO111MODULE=$(GO111MODULE) $(GO) fmt $(pkgs)
.PHONY: common-vet .PHONY: common-vet
common-vet: common-vet:
@ -125,8 +139,12 @@ common-vet:
.PHONY: common-staticcheck .PHONY: common-staticcheck
common-staticcheck: $(STATICCHECK) common-staticcheck: $(STATICCHECK)
@echo ">> running staticcheck" @echo ">> running staticcheck"
chmod +x $(STATICCHECK)
ifdef GO111MODULE ifdef GO111MODULE
GO111MODULE=$(GO111MODULE) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" -checks "SA*" $(pkgs) # 'go list' needs to be executed before staticcheck to prepopulate the modules cache.
# Otherwise staticcheck might fail randomly for some reason not yet explained.
GO111MODULE=$(GO111MODULE) $(GO) list -e -compiled -test=true -export=false -deps=true -find=false -tags= -- ./... > /dev/null
GO111MODULE=$(GO111MODULE) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs)
else else
$(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs)
endif endif
@ -140,8 +158,9 @@ else
ifdef GO111MODULE ifdef GO111MODULE
@echo ">> running check for unused/missing packages in go.mod" @echo ">> running check for unused/missing packages in go.mod"
GO111MODULE=$(GO111MODULE) $(GO) mod tidy GO111MODULE=$(GO111MODULE) $(GO) mod tidy
ifeq (,$(wildcard vendor))
@git diff --exit-code -- go.sum go.mod @git diff --exit-code -- go.sum go.mod
ifneq (,$(wildcard vendor)) else
@echo ">> running check for unused packages in vendor/" @echo ">> running check for unused packages in vendor/"
GO111MODULE=$(GO111MODULE) $(GO) mod vendor GO111MODULE=$(GO111MODULE) $(GO) mod vendor
@git diff --exit-code -- go.sum go.mod vendor/ @git diff --exit-code -- go.sum go.mod vendor/
@ -175,30 +194,20 @@ common-docker-tag-latest:
promu: $(PROMU) promu: $(PROMU)
$(PROMU): $(PROMU):
curl -s -L $(PROMU_URL) | tar -xvz -C /tmp $(eval PROMU_TMP := $(shell mktemp -d))
mkdir -v -p $(FIRST_GOPATH)/bin curl -s -L $(PROMU_URL) | tar -xvzf - -C $(PROMU_TMP)
cp -v /tmp/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(PROMU) mkdir -p $(FIRST_GOPATH)/bin
cp $(PROMU_TMP)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(FIRST_GOPATH)/bin/promu
rm -r $(PROMU_TMP)
.PHONY: proto .PHONY: proto
proto: proto:
@echo ">> generating code from proto files" @echo ">> generating code from proto files"
@./scripts/genproto.sh @./scripts/genproto.sh
.PHONY: $(STATICCHECK)
$(STATICCHECK): $(STATICCHECK):
ifdef GO111MODULE mkdir -p $(FIRST_GOPATH)/bin
# Get staticcheck from a temporary directory to avoid modifying the local go.{mod,sum}. curl -s -L $(STATICCHECK_URL) > $(STATICCHECK)
# See https://github.com/golang/go/issues/27643.
# For now, we are using the next branch of staticcheck because master isn't compatible yet with Go modules.
tmpModule=$$(mktemp -d 2>&1) && \
mkdir -p $${tmpModule}/staticcheck && \
cd "$${tmpModule}"/staticcheck && \
GO111MODULE=on $(GO) mod init example.com/staticcheck && \
GO111MODULE=on GOOS= GOARCH= $(GO) get -u honnef.co/go/tools/cmd/staticcheck@next && \
rm -rf $${tmpModule};
else
GOOS= GOARCH= GO111MODULE=off $(GO) get -u honnef.co/go/tools/cmd/staticcheck
endif
ifdef GOVENDOR ifdef GOVENDOR
.PHONY: $(GOVENDOR) .PHONY: $(GOVENDOR)

View file

@ -16,6 +16,7 @@ package tsdb
import ( import (
"encoding/json" "encoding/json"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -269,10 +270,19 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs. // to instantiate chunk structs.
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
var closers []io.Closer
defer func() {
if err != nil {
var merr MultiError
merr.Add(err)
merr.Add(closeAll(closers))
err = merr.Err()
}
}()
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -282,15 +292,19 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error
if err != nil { if err != nil {
return nil, err return nil, err
} }
ir, err := index.NewFileReader(filepath.Join(dir, "index")) closers = append(closers, cr)
ir, err := index.NewFileReader(filepath.Join(dir, indexFilename))
if err != nil { if err != nil {
return nil, err return nil, err
} }
closers = append(closers, ir)
tr, tsr, err := readTombstones(dir) tr, tsr, err := readTombstones(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
closers = append(closers, tr)
// TODO refactor to set this at block creation time as // TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated. // that would be the logical place for a block size to be calculated.
@ -301,7 +315,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
} }
pb := &Block{ pb = &Block{
dir: dir, dir: dir,
meta: *meta, meta: *meta,
chunkr: cr, chunkr: cr,

View file

@ -53,16 +53,6 @@ func newBReader(b []byte) bstream {
return bstream{stream: b, count: 8} return bstream{stream: b, count: 8}
} }
func newBWriter(size int) *bstream {
return &bstream{stream: make([]byte, 0, size), count: 0}
}
func (b *bstream) clone() *bstream {
d := make([]byte, len(b.stream))
copy(d, b.stream)
return &bstream{stream: d, count: b.count}
}
func (b *bstream) bytes() []byte { func (b *bstream) bytes() []byte {
return b.stream return b.stream
} }

View file

@ -32,7 +32,7 @@ func TestChunk(t *testing.T) {
for enc, nc := range map[Encoding]func() Chunk{ for enc, nc := range map[Encoding]func() Chunk{
EncXOR: func() Chunk { return NewXORChunk() }, EncXOR: func() Chunk { return NewXORChunk() },
} { } {
t.Run(fmt.Sprintf("%s", enc), func(t *testing.T) { t.Run(fmt.Sprintf("%v", enc), func(t *testing.T) {
for range make([]struct{}, 1) { for range make([]struct{}, 1) {
c := nc() c := nc()
if err := testChunk(c); err != nil { if err := testChunk(c); err != nil {

View file

@ -64,9 +64,7 @@ func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
} }
var ( var (
errInvalidSize = fmt.Errorf("invalid size") errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum")
) )
var castagnoliTable *crc32.Table var castagnoliTable *crc32.Table
@ -344,7 +342,7 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
} }
func (s *Reader) Close() error { func (s *Reader) Close() error {
return closeAll(s.cs...) return closeAll(s.cs)
} }
// Size returns the size of the chunks. // Size returns the size of the chunks.
@ -412,7 +410,7 @@ func sequenceFiles(dir string) ([]string, error) {
return res, nil return res, nil
} }
func closeAll(cs ...io.Closer) (err error) { func closeAll(cs []io.Closer) (err error) {
for _, c := range cs { for _, c := range cs {
if e := c.Close(); e != nil { if e := c.Close(); e != nil {
err = e err = e

View file

@ -87,7 +87,7 @@ func main() {
block = blocks[len(blocks)-1] block = blocks[len(blocks)-1]
} }
if block == nil { if block == nil {
exitWithError(fmt.Errorf("Block not found")) exitWithError(fmt.Errorf("block not found"))
} }
analyzeBlock(block, *analyzeLimit) analyzeBlock(block, *analyzeLimit)
} }
@ -340,12 +340,6 @@ func measureTime(stage string, f func()) time.Duration {
return time.Since(start) return time.Since(start)
} }
func mapToLabels(m map[string]interface{}, l *labels.Labels) {
for k, v := range m {
*l = append(*l, labels.Label{Name: k, Value: v.(string)})
}
}
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
scanner := bufio.NewScanner(r) scanner := bufio.NewScanner(r)

View file

@ -72,7 +72,6 @@ type Compactor interface {
// LeveledCompactor implements the Compactor interface. // LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct { type LeveledCompactor struct {
dir string
metrics *compactorMetrics metrics *compactorMetrics
logger log.Logger logger log.Logger
ranges []int64 ranges []int64
@ -621,7 +620,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
err = merr.Err() err = merr.Err()
c.metrics.populatingBlocks.Set(0) c.metrics.populatingBlocks.Set(0)
}() }()
c.metrics.populatingBlocks.Set(1) c.metrics.populatingBlocks.Set(1)
for i, b := range blocks { for i, b := range blocks {

View file

@ -18,6 +18,7 @@ import (
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
"path"
"path/filepath" "path/filepath"
"testing" "testing"
"time" "time"
@ -694,9 +695,11 @@ func TestCompaction_populateBlock(t *testing.T) {
// This is needed for unit tests that rely on // This is needed for unit tests that rely on
// checking state before and after a compaction. // checking state before and after a compaction.
func TestDisableAutoCompactions(t *testing.T) { func TestDisableAutoCompactions(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
blockRange := DefaultOptions.BlockRanges[0] blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar") label := labels.FromStrings("foo", "bar")
@ -808,3 +811,77 @@ func TestCancelCompactions(t *testing.T) {
testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT)
} }
} }
// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction
// deletes the resulting block to avoid creatings blocks with the same time range.
func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
tests := map[string]func(*DB) int{
"Test Head Compaction": func(db *DB) int {
rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1
defaultLabel := labels.FromStrings("foo", "bar")
// Add some data to the head that is enough to trigger a compaction.
app := db.Appender()
_, err := app.Add(defaultLabel, 1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 2, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
return 0
},
"Test Block Compaction": func(db *DB) int {
blocks := []*BlockMeta{
{MinTime: 0, MaxTime: 100},
{MinTime: 100, MaxTime: 150},
{MinTime: 150, MaxTime: 200},
}
for _, m := range blocks {
createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
}
testutil.Ok(t, db.reload())
testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reload")
return len(blocks)
},
}
for title, bootStrap := range tests {
t.Run(title, func(t *testing.T) {
db, delete := openTestDB(t, &Options{
BlockRanges: []int64{1, 100},
})
defer func() {
testutil.Ok(t, db.Close())
delete()
}()
db.DisableCompactions()
expBlocks := bootStrap(db)
// Create a block that will trigger the reload to fail.
blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300))
lastBlockIndex := path.Join(blockPath, indexFilename)
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block.
testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file.
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch")
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch")
// Do the compaction and check the metrics.
// Compaction should succeed, but the reload should fail and
// the new block created from the compaction should be deleted.
testutil.NotOk(t, db.compact())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch")
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch")
actBlocks, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expBlocks, len(actBlocks)-1, "block count should be the same as before the compaction") // -1 to exclude the corrupted block.
})
}
}

18
db.go
View file

@ -432,6 +432,9 @@ func (db *DB) compact() (err error) {
runtime.GC() runtime.GC()
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid)
}
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reload blocks")
} }
if (uid == ulid.ULID{}) { if (uid == ulid.ULID{}) {
@ -460,12 +463,17 @@ func (db *DB) compact() (err error) {
return nil return nil
default: default:
} }
if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil {
uid, err := db.compactor.Compact(db.dir, plan, db.blocks)
if err != nil {
return errors.Wrapf(err, "compact %s", plan) return errors.Wrapf(err, "compact %s", plan)
} }
runtime.GC() runtime.GC()
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid)
}
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reload blocks")
} }
runtime.GC() runtime.GC()
@ -511,7 +519,13 @@ func (db *DB) reload() (err error) {
} }
} }
if len(corrupted) > 0 { if len(corrupted) > 0 {
return errors.Wrap(err, "unexpected corrupted block") // Close all new blocks to release the lock for windows.
for _, block := range loadable {
if _, loaded := db.getBlock(block.Meta().ULID); !loaded {
block.Close()
}
}
return fmt.Errorf("unexpected corrupted block:%v", corrupted)
} }
// All deletable blocks should not be loaded. // All deletable blocks should not be loaded.

View file

@ -46,7 +46,9 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
testutil.Ok(t, err) testutil.Ok(t, err)
// Do not close the test database by default as it will deadlock on test failures. // Do not close the test database by default as it will deadlock on test failures.
return db, func() { os.RemoveAll(tmpdir) } return db, func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}
} }
// query runs a matcher query against the querier and fully expands its data. // query runs a matcher query against the querier and fully expands its data.
@ -78,9 +80,11 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam
// Ensure that blocks are held in memory in their time order // Ensure that blocks are held in memory in their time order
// and not in ULID order as they are read from the directory. // and not in ULID order as they are read from the directory.
func TestDB_reloadOrder(t *testing.T) { func TestDB_reloadOrder(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
metas := []BlockMeta{ metas := []BlockMeta{
{MinTime: 90, MaxTime: 100}, {MinTime: 90, MaxTime: 100},
@ -106,9 +110,11 @@ func TestDB_reloadOrder(t *testing.T) {
} }
func TestDataAvailableOnlyAfterCommit(t *testing.T) { func TestDataAvailableOnlyAfterCommit(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender() app := db.Appender()
@ -135,9 +141,11 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
} }
func TestDataNotAvailableAfterRollback(t *testing.T) { func TestDataNotAvailableAfterRollback(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender() app := db.Appender()
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
@ -156,9 +164,11 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
} }
func TestDBAppenderAddRef(t *testing.T) { func TestDBAppenderAddRef(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app1 := db.Appender() app1 := db.Appender()
@ -213,31 +223,50 @@ func TestDBAppenderAddRef(t *testing.T) {
func TestDeleteSimple(t *testing.T) { func TestDeleteSimple(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
app := db.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
}
testutil.Ok(t, app.Commit())
cases := []struct { cases := []struct {
intervals Intervals intervals Intervals
remaint []int64 remaint []int64
}{ }{
{
intervals: Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
},
{ {
intervals: Intervals{{1, 3}, {4, 7}}, intervals: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9}, remaint: []int64{0, 8, 9},
}, },
{
intervals: Intervals{{1, 3}, {4, 700}},
remaint: []int64{0},
},
{ // This case is to ensure that labels and symbols are deleted.
intervals: Intervals{{0, 9}},
remaint: []int64{},
},
} }
Outer: Outer:
for _, c := range cases { for _, c := range cases {
db, delete := openTestDB(t, nil)
defer func() {
testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
}
testutil.Ok(t, app.Commit())
// TODO(gouthamve): Reset the tombstones somehow. // TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges. // Delete the ranges.
for _, r := range c.intervals { for _, r := range c.intervals {
@ -260,9 +289,20 @@ Outer:
newSeries(map[string]string{"a": "b"}, expSamples), newSeries(map[string]string{"a": "b"}, expSamples),
}) })
lns, err := q.LabelNames()
testutil.Ok(t, err)
lvs, err := q.LabelValues("a")
testutil.Ok(t, err)
if len(expSamples) == 0 { if len(expSamples) == 0 {
testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, res.Next() == false, "") testutil.Assert(t, res.Next() == false, "")
continue continue
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, "a", lns[0])
testutil.Equals(t, "b", lvs[0])
} }
for { for {
@ -287,9 +327,11 @@ Outer:
} }
func TestAmendDatapointCausesError(t *testing.T) { func TestAmendDatapointCausesError(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender() app := db.Appender()
_, err := app.Add(labels.Labels{}, 0, 0) _, err := app.Add(labels.Labels{}, 0, 0)
@ -303,9 +345,11 @@ func TestAmendDatapointCausesError(t *testing.T) {
} }
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender() app := db.Appender()
_, err := app.Add(labels.Labels{}, 0, math.NaN()) _, err := app.Add(labels.Labels{}, 0, math.NaN())
@ -318,10 +362,11 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
} }
func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender() app := db.Appender()
_, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
testutil.Ok(t, err) testutil.Ok(t, err)
@ -333,9 +378,11 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
} }
func TestSkippingInvalidValuesInSameTxn(t *testing.T) { func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
// Append AmendedValue. // Append AmendedValue.
app := db.Appender() app := db.Appender()
@ -377,8 +424,8 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
} }
func TestDB_Snapshot(t *testing.T) { func TestDB_Snapshot(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer delete()
// append data // append data
app := db.Appender() app := db.Appender()
@ -401,11 +448,11 @@ func TestDB_Snapshot(t *testing.T) {
// reopen DB from snapshot // reopen DB from snapshot
db, err = Open(snap, nil, nil, nil) db, err = Open(snap, nil, nil, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
defer db.Close() defer func() { testutil.Ok(t, db.Close()) }()
querier, err := db.Querier(mint, mint+1000) querier, err := db.Querier(mint, mint+1000)
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer func() { testutil.Ok(t, querier.Close()) }()
// sum values // sum values
seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
@ -427,8 +474,8 @@ func TestDB_Snapshot(t *testing.T) {
func TestDB_SnapshotWithDelete(t *testing.T) { func TestDB_SnapshotWithDelete(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer delete()
app := db.Appender() app := db.Appender()
@ -468,12 +515,12 @@ Outer:
// reopen DB from snapshot // reopen DB from snapshot
db, err = Open(snap, nil, nil, nil) db, err = Open(snap, nil, nil, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
defer db.Close() defer func() { testutil.Ok(t, db.Close()) }()
// Compare the result. // Compare the result.
q, err := db.Querier(0, numSamples) q, err := db.Querier(0, numSamples)
testutil.Ok(t, err) testutil.Ok(t, err)
defer q.Close() defer func() { testutil.Ok(t, q.Close()) }()
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
@ -518,8 +565,6 @@ func TestDB_e2e(t *testing.T) {
numDatapoints = 1000 numDatapoints = 1000
numRanges = 1000 numRanges = 1000
timeInterval = int64(3) timeInterval = int64(3)
maxTime = int64(2 * 1000)
minTime = int64(200)
) )
// Create 8 series with 1000 data-points of different ranges and run queries. // Create 8 series with 1000 data-points of different ranges and run queries.
lbls := [][]labels.Label{ lbls := [][]labels.Label{
@ -570,9 +615,11 @@ func TestDB_e2e(t *testing.T) {
seriesMap[labels.New(l...).String()] = []sample{} seriesMap[labels.New(l...).String()] = []sample{}
} }
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender() app := db.Appender()
@ -670,13 +717,11 @@ func TestDB_e2e(t *testing.T) {
q.Close() q.Close()
} }
} }
return
} }
func TestWALFlushedOnDBClose(t *testing.T) { func TestWALFlushedOnDBClose(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer delete()
dirDb := db.Dir() dirDb := db.Dir()
@ -691,7 +736,7 @@ func TestWALFlushedOnDBClose(t *testing.T) {
db, err = Open(dirDb, nil, nil, nil) db, err = Open(dirDb, nil, nil, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
defer db.Close() defer func() { testutil.Ok(t, db.Close()) }()
q, err := db.Querier(0, 1) q, err := db.Querier(0, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -704,8 +749,8 @@ func TestWALFlushedOnDBClose(t *testing.T) {
func TestWALSegmentSizeOption(t *testing.T) { func TestWALSegmentSizeOption(t *testing.T) {
options := *DefaultOptions options := *DefaultOptions
options.WALSegmentSize = 2 * 32 * 1024 options.WALSegmentSize = 2 * 32 * 1024
db, close := openTestDB(t, &options) db, delete := openTestDB(t, &options)
defer close() defer delete()
app := db.Appender() app := db.Appender()
for i := int64(0); i < 155; i++ { for i := int64(0); i < 155; i++ {
_, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64()) _, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64())
@ -730,8 +775,8 @@ func TestWALSegmentSizeOption(t *testing.T) {
func TestTombstoneClean(t *testing.T) { func TestTombstoneClean(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer delete()
app := db.Appender() app := db.Appender()
@ -827,9 +872,11 @@ func TestTombstoneClean(t *testing.T) {
// if TombstoneClean leaves any blocks behind these will overlap. // if TombstoneClean leaves any blocks behind these will overlap.
func TestTombstoneCleanFail(t *testing.T) { func TestTombstoneCleanFail(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer db.Close() defer func() {
defer close() testutil.Ok(t, db.Close())
delete()
}()
var expectedBlockDirs []string var expectedBlockDirs []string
@ -906,11 +953,13 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block)
} }
func TestTimeRetention(t *testing.T) { func TestTimeRetention(t *testing.T) {
db, close := openTestDB(t, &Options{ db, delete := openTestDB(t, &Options{
BlockRanges: []int64{1000}, BlockRanges: []int64{1000},
}) })
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
blocks := []*BlockMeta{ blocks := []*BlockMeta{
{MinTime: 500, MaxTime: 900}, // Oldest block {MinTime: 500, MaxTime: 900}, // Oldest block
@ -938,11 +987,13 @@ func TestTimeRetention(t *testing.T) {
} }
func TestSizeRetention(t *testing.T) { func TestSizeRetention(t *testing.T) {
db, close := openTestDB(t, &Options{ db, delete := openTestDB(t, &Options{
BlockRanges: []int64{100}, BlockRanges: []int64{100},
}) })
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
blocks := []*BlockMeta{ blocks := []*BlockMeta{
{MinTime: 100, MaxTime: 200}, // Oldest block {MinTime: 100, MaxTime: 200}, // Oldest block
@ -1000,8 +1051,11 @@ func dbDiskSize(dir string) int64 {
} }
func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
testutil.Ok(t, db.Close())
delete()
}()
labelpairs := []labels.Labels{ labelpairs := []labels.Labels{
labels.FromStrings("a", "abcd", "b", "abcde"), labels.FromStrings("a", "abcd", "b", "abcde"),
@ -1059,7 +1113,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
q, err := db.Querier(0, 10) q, err := db.Querier(0, 10)
testutil.Ok(t, err) testutil.Ok(t, err)
defer q.Close() defer func() { testutil.Ok(t, q.Close()) }()
for _, c := range cases { for _, c := range cases {
ss, err := q.Select(c.selector...) ss, err := q.Select(c.selector...)
@ -1175,9 +1229,11 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
// Regression test for https://github.com/prometheus/tsdb/issues/347 // Regression test for https://github.com/prometheus/tsdb/issues/347
func TestChunkAtBlockBoundary(t *testing.T) { func TestChunkAtBlockBoundary(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender() app := db.Appender()
@ -1229,9 +1285,11 @@ func TestChunkAtBlockBoundary(t *testing.T) {
} }
func TestQuerierWithBoundaryChunks(t *testing.T) { func TestQuerierWithBoundaryChunks(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
app := db.Appender() app := db.Appender()
@ -1366,14 +1424,16 @@ func TestInitializeHeadTimestamp(t *testing.T) {
} }
func TestNoEmptyBlocks(t *testing.T) { func TestNoEmptyBlocks(t *testing.T) {
db, close := openTestDB(t, &Options{ db, delete := openTestDB(t, &Options{
BlockRanges: []int64{100}, BlockRanges: []int64{100},
}) })
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
db.DisableCompactions() db.DisableCompactions()
rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1 rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1
defaultLabel := labels.FromStrings("foo", "bar") defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.NewMustRegexpMatcher("", ".*") defaultMatcher := labels.NewMustRegexpMatcher("", ".*")
@ -1392,7 +1452,7 @@ func TestNoEmptyBlocks(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 2, 0) _, err = app.Add(defaultLabel, 2, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0) _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
@ -1414,7 +1474,7 @@ func TestNoEmptyBlocks(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0) _, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -1435,7 +1495,7 @@ func TestNoEmptyBlocks(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0) _, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
@ -1524,9 +1584,11 @@ func TestDB_LabelNames(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
} }
for _, tst := range tests { for _, tst := range tests {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
appendSamples(db, 0, 4, tst.sampleLabels1) appendSamples(db, 0, 4, tst.sampleLabels1)
@ -1567,9 +1629,11 @@ func TestDB_LabelNames(t *testing.T) {
} }
func TestCorrectNumTombstones(t *testing.T) { func TestCorrectNumTombstones(t *testing.T) {
db, close := openTestDB(t, nil) db, delete := openTestDB(t, nil)
defer close() defer func() {
defer db.Close() testutil.Ok(t, db.Close())
delete()
}()
blockRange := DefaultOptions.BlockRanges[0] blockRange := DefaultOptions.BlockRanges[0]
defaultLabel := labels.FromStrings("foo", "bar") defaultLabel := labels.FromStrings("foo", "bar")

View file

@ -85,34 +85,40 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc
`mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one. `mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one.
``` ```
┌─────────────────────────────────────────────────────────────────────────┐ ┌──────────────────────────────────────────────────────────────────────────┐
│ len <uvarint> │ len <uvarint>
├─────────────────────────────────────────────────────────────────────────┤ ├──────────────────────────────────────────────────────────────────────────┤
│ ┌──────────────────┬──────────────────────────────────────────────────┐ │ │ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ │ ┌──────────────────────────────────────────┐ │ │ │ │ labels count <uvarint64> │ │
│ │ │ │ ref(l_i.name) <uvarint> │ │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ #labels │ ├──────────────────────────────────────────┤ ... │ │ │ │ ┌────────────────────────────────────────────┐ │ │
│ │ <uvarint> │ │ ref(l_i.value) <uvarint> │ │ │ │ │ │ ref(l_i.name) <uvarint32> │ │ │
│ │ │ └──────────────────────────────────────────┘ │ │ │ │ ├────────────────────────────────────────────┤ │ │
│ ├──────────────────┼──────────────────────────────────────────────────┤ │ │ │ │ ref(l_i.value) <uvarint32> │ │ │
│ │ │ ┌──────────────────────────────────────────┐ │ │ │ │ └────────────────────────────────────────────┘ │ │
│ │ │ │ c_0.mint <varint> │ │ │ │ │ ... │ │
│ │ │ ├──────────────────────────────────────────┤ │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ │ │ c_0.maxt - c_0.mint <uvarint> │ │ │ │ │ chunks count <uvarint64> │ │
│ │ │ ├──────────────────────────────────────────┤ │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ │ │ ref(c_0.data) <uvarint> │ │ │ │ │ ┌────────────────────────────────────────────┐ │ │
│ │ #chunks │ └──────────────────────────────────────────┘ │ │ │ │ │ c_0.mint <varint64> │ │ │
│ │ <uvarint> │ ┌──────────────────────────────────────────┐ │ │ │ │ ├────────────────────────────────────────────┤ │ │
│ │ │ │ c_i.mint - c_i-1.maxt <uvarint> │ │ │ │ │ │ c_0.maxt - c_0.mint <uvarint64> │ │ │
│ │ │ ├──────────────────────────────────────────┤ │ │ │ │ ├────────────────────────────────────────────┤ │ │
│ │ │ │ c_i.maxt - c_i.mint <uvarint> │ │ │ │ │ │ ref(c_0.data) <uvarint64> │ │ │
│ │ │ ├──────────────────────────────────────────┤ ... │ │ │ │ └────────────────────────────────────────────┘ │ │
│ │ │ │ ref(c_i.data) - ref(c_i-1.data) <varint> │ │ │ │ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ c_i.mint - c_i-1.maxt <uvarint64> │ │ │
│ └──────────────────┴──────────────────────────────────────────────────┘ │ │ │ ├────────────────────────────────────────────┤ │ │
├─────────────────────────────────────────────────────────────────────────┤ │ │ │ c_i.maxt - c_i.mint <uvarint64> │ │ │
│ CRC32 <4b> │ │ ├────────────────────────────────────────────┤ │ │
└─────────────────────────────────────────────────────────────────────────┘ │ │ │ ref(c_i.data) - ref(c_i-1.data) <varint64> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ... │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────────────────────┤
│ CRC32 <4b>
└──────────────────────────────────────────────────────────────────────────┘
``` ```
@ -176,24 +182,24 @@ The sequence of postings sections is finalized by an [offset table](#offset-tabl
An offset table stores a sequence of entries that maps a list of strings to an offset. They are used to track label index and postings sections. They are read into memory when an index file is loaded. An offset table stores a sequence of entries that maps a list of strings to an offset. They are used to track label index and postings sections. They are read into memory when an index file is loaded.
``` ```
┌─────────────────────┬────────────────────┐ ┌─────────────────────┬──────────────────────
│ len <4b>#entries <4b> │ len <4b>#entries <4b>
├─────────────────────┴────────────────────┤ ├─────────────────────┴──────────────────────
│ ┌──────────────────────────────────────┐ │ │ ┌────────────────────────────────────────┐ │
│ │ n = #strs <uvarint> │ │ │ │ n = #strs <uvarint> │ │
│ ├──────────────────────┬───────────────┤ │ │ ├──────────────────────┬─────────────────┤ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │ │ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├──────────────────────┴───────────────┤ │ │ ├──────────────────────┴─────────────────┤ │
│ │ ... │ │ │ │ ... │ │
│ ├──────────────────────┬───────────────┤ │ │ ├──────────────────────┬─────────────────┤ │
│ │ len(str_n) <uvarint> │ str_n <bytes> │ │ │ │ len(str_n) <uvarint> │ str_n <bytes> │ │
│ ├──────────────────────┴───────────────┤ │ │ ├──────────────────────┴─────────────────┤ │
│ │ offset <uvarint> │ │ │ │ offset <uvarint64> │ │
│ └──────────────────────────────────────┘ │ │ └────────────────────────────────────────┘ │
│ . . . │ . . . │
├──────────────────────────────────────────┤ ├────────────────────────────────────────────
│ CRC32 <4b> │ CRC32 <4b>
└──────────────────────────────────────────┘ └────────────────────────────────────────────
``` ```

View file

@ -25,7 +25,7 @@ The stones section is 0 padded to a multiple of 4 for fast scans.
# Tombstone # Tombstone
``` ```
┌─────────────┬───────────────┬──────────────┐ ┌─────────────────────────────────┬────────────────┐
│ref <varint> │ mint <varint> │ maxt <varint> │ref <uvarint64> │ mint <varint64> │ maxt <varint64>
└─────────────┴───────────────┴──────────────┘ └─────────────────────────────────┴────────────────┘
``` ```

View file

@ -15,8 +15,6 @@ package tsdb
import ( import (
"encoding/binary" "encoding/binary"
"hash"
"hash/crc32"
"unsafe" "unsafe"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -32,17 +30,12 @@ type encbuf struct {
func (e *encbuf) reset() { e.b = e.b[:0] } func (e *encbuf) reset() { e.b = e.b[:0] }
func (e *encbuf) get() []byte { return e.b } func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE32(x uint32) { func (e *encbuf) putBE32(x uint32) {
binary.BigEndian.PutUint32(e.c[:], x) binary.BigEndian.PutUint32(e.c[:], x)
@ -71,16 +64,6 @@ func (e *encbuf) putUvarintStr(s string) {
e.putString(s) e.putString(s)
} }
// putHash appends a hash over the buffers current contents to the buffer.
func (e *encbuf) putHash(h hash.Hash) {
h.Reset()
_, err := h.Write(e.b)
if err != nil {
panic(err) // The CRC32 implementation does not error
}
e.b = h.Sum(e.b)
}
// decbuf provides safe methods to extract data from a byte slice. It does all // decbuf provides safe methods to extract data from a byte slice. It does all
// necessary bounds checking and advancing of the byte slice. // necessary bounds checking and advancing of the byte slice.
// Several datums can be extracted without checking for errors. However, before using // Several datums can be extracted without checking for errors. However, before using
@ -90,15 +73,8 @@ type decbuf struct {
e error e error
} }
func (d *decbuf) uvarint() int { return int(d.uvarint64()) } func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
return crc32.Checksum(d.b, castagnoliTable)
}
func (d *decbuf) uvarintStr() string { func (d *decbuf) uvarintStr() string {
l := d.uvarint64() l := d.uvarint64()
@ -179,18 +155,6 @@ func (d *decbuf) byte() byte {
return x return x
} }
func (d *decbuf) decbuf(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error { return d.e } func (d *decbuf) err() error { return d.e }
func (d *decbuf) len() int { return len(d.b) } func (d *decbuf) len() int { return len(d.b) }
func (d *decbuf) get() []byte { return d.b } func (d *decbuf) get() []byte { return d.b }

View file

@ -77,9 +77,8 @@ func copyFile(src, dest string) error {
// returns relative paths to all files and empty directories. // returns relative paths to all files and empty directories.
func readDirs(src string) ([]string, error) { func readDirs(src string) ([]string, error) {
var files []string var files []string
var err error
err = filepath.Walk(src, func(path string, f os.FileInfo, err error) error { err := filepath.Walk(src, func(path string, f os.FileInfo, err error) error {
relativePath := strings.TrimPrefix(path, src) relativePath := strings.TrimPrefix(path, src)
if len(relativePath) > 0 { if len(relativePath) > 0 {
files = append(files, relativePath) files = append(files, relativePath)

121
head.go
View file

@ -48,6 +48,10 @@ var (
// ErrOutOfBounds is returned if an appended sample is out of the // ErrOutOfBounds is returned if an appended sample is out of the
// writable time range. // writable time range.
ErrOutOfBounds = errors.New("out of bounds") ErrOutOfBounds = errors.New("out of bounds")
// emptyTombstoneReader is a no-op Tombstone Reader.
// This is used by head to satisfy the Tombstones() function call.
emptyTombstoneReader = newMemTombstones()
) )
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
@ -71,8 +75,6 @@ type Head struct {
values map[string]stringset // label names to possible values values map[string]stringset // label names to possible values
postings *index.MemPostings // postings lists for terms postings *index.MemPostings // postings lists for terms
tombstones *memTombstones
} }
type headMetrics struct { type headMetrics struct {
@ -231,7 +233,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(), postings: index.NewUnorderedMemPostings(),
tombstones: newMemTombstones(),
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -334,12 +335,14 @@ func (h *Head) loadWAL(r *wal.Reader) error {
} }
var ( var (
dec RecordDecoder dec RecordDecoder
series []RefSeries series []RefSeries
samples []RefSample samples []RefSample
tstones []Stone tstones []Stone
err error allStones = newMemTombstones()
err error
) )
defer allStones.Close()
for r.Next() { for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0] series, samples, tstones = series[:0], samples[:0], tstones[:0]
rec := r.Record() rec := r.Record()
@ -413,7 +416,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
if itv.Maxt < h.minValidTime { if itv.Maxt < h.minValidTime {
continue continue
} }
h.tombstones.addInterval(s.ref, itv) allStones.addInterval(s.ref, itv)
} }
} }
default: default:
@ -436,6 +439,12 @@ func (h *Head) loadWAL(r *wal.Reader) error {
} }
wg.Wait() wg.Wait()
if err := allStones.Iter(func(ref uint64, dranges Intervals) error {
return h.chunkRewrite(ref, dranges)
}); err != nil {
return errors.Wrap(r.Err(), "deleting samples from tombstones")
}
if unknownRefs > 0 { if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
} }
@ -604,7 +613,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) {
} }
func (h *rangeHead) Tombstones() (TombstoneReader, error) { func (h *rangeHead) Tombstones() (TombstoneReader, error) {
return h.head.tombstones, nil return emptyTombstoneReader, nil
} }
// initAppender is a helper to initialize the time bounds of the head // initAppender is a helper to initialize the time bounds of the head
@ -849,7 +858,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
} }
var stones []Stone var stones []Stone
dirty := false
for p.Next() { for p.Next() {
series := h.series.getByID(p.At()) series := h.series.getByID(p.At())
@ -859,22 +868,61 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
} }
// Delete only until the current values and not beyond. // Delete only until the current values and not beyond.
t0, t1 = clampInterval(mint, maxt, t0, t1) t0, t1 = clampInterval(mint, maxt, t0, t1)
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) if h.wal != nil {
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
}
if err := h.chunkRewrite(p.At(), Intervals{{t0, t1}}); err != nil {
return errors.Wrap(err, "delete samples")
}
dirty = true
} }
if p.Err() != nil { if p.Err() != nil {
return p.Err() return p.Err()
} }
var enc RecordEncoder var enc RecordEncoder
if h.wal != nil { if h.wal != nil {
// Although we don't store the stones in the head
// we need to write them to the WAL to mark these as deleted
// after a restart while loeading the WAL.
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
return err return err
} }
} }
for _, s := range stones { if dirty {
h.tombstones.addInterval(s.ref, s.intervals[0]) h.gc()
} }
return nil
}
// chunkRewrite re-writes the chunks which overlaps with deleted ranges
// and removes the samples in the deleted ranges.
// Chunks is deleted if no samples are left at the end.
func (h *Head) chunkRewrite(ref uint64, dranges Intervals) (err error) {
if len(dranges) == 0 {
return nil
}
ms := h.series.getByID(ref)
ms.Lock()
defer ms.Unlock()
if len(ms.chunks) == 0 {
return nil
}
metas := ms.chunksMetas()
mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime
it := newChunkSeriesIterator(metas, dranges, mint, maxt)
ms.reset()
for it.Next() {
t, v := it.At()
ok, _ := ms.append(t, v)
if !ok {
level.Warn(h.logger).Log("msg", "failed to add sample during delete")
}
}
return nil return nil
} }
@ -926,7 +974,7 @@ func (h *Head) gc() {
// Tombstones returns a new reader over the head's tombstones // Tombstones returns a new reader over the head's tombstones
func (h *Head) Tombstones() (TombstoneReader, error) { func (h *Head) Tombstones() (TombstoneReader, error) {
return h.tombstones, nil return emptyTombstoneReader, nil
} }
// Index returns an IndexReader against the block. // Index returns an IndexReader against the block.
@ -1406,6 +1454,16 @@ type memSeries struct {
app chunkenc.Appender // Current appender for the chunk. app chunkenc.Appender // Current appender for the chunk.
} }
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
}
return s
}
func (s *memSeries) minTime() int64 { func (s *memSeries) minTime() int64 {
if len(s.chunks) == 0 { if len(s.chunks) == 0 {
return math.MinInt64 return math.MinInt64
@ -1442,14 +1500,24 @@ func (s *memSeries) cut(mint int64) *memChunk {
return c return c
} }
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { func (s *memSeries) chunksMetas() []chunks.Meta {
s := &memSeries{ metas := make([]chunks.Meta, 0, len(s.chunks))
lset: lset, for _, chk := range s.chunks {
ref: id, metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime})
chunkRange: chunkRange,
nextAt: math.MinInt64,
} }
return s return metas
}
// reset re-initialises all the variable in the memSeries except 'lset', 'ref',
// and 'chunkRange', like how it would appear after 'newMemSeries(...)'.
func (s *memSeries) reset() {
s.chunks = nil
s.headChunk = nil
s.firstChunkID = 0
s.nextAt = math.MinInt64
s.sampleBuf = [4]sample{}
s.pendingCommit = false
s.app = nil
} }
// appendable checks whether the given sample is valid for appending to the series. // appendable checks whether the given sample is valid for appending to the series.
@ -1628,11 +1696,6 @@ func (ss stringset) set(s string) {
ss[s] = struct{}{} ss[s] = struct{}{}
} }
func (ss stringset) has(s string) bool {
_, ok := ss[s]
return ok
}
func (ss stringset) String() string { func (ss stringset) String() string {
return strings.Join(ss.slice(), ",") return strings.Join(ss.slice(), ",")
} }

View file

@ -18,6 +18,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"os" "os"
"path"
"path/filepath" "path/filepath"
"sort" "sort"
"testing" "testing"
@ -296,94 +297,158 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
} }
func TestHeadDeleteSimple(t *testing.T) { func TestHeadDeleteSimple(t *testing.T) {
numSamples := int64(10) buildSmpls := func(s []int64) []sample {
ss := make([]sample, 0, len(s))
head, err := NewHead(nil, nil, nil, 1000) for _, t := range s {
testutil.Ok(t, err) ss = append(ss, sample{t: t, v: float64(t)})
defer head.Close() }
return ss
app := head.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
} }
smplsAll := buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
lblDefault := labels.Label{"a", "b"}
testutil.Ok(t, app.Commit())
cases := []struct { cases := []struct {
intervals Intervals dranges Intervals
remaint []int64 smplsExp []sample
}{ }{
{ {
intervals: Intervals{{0, 3}}, dranges: Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9}, smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}),
}, },
{ {
intervals: Intervals{{1, 3}}, dranges: Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9}, smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}),
}, },
{ {
intervals: Intervals{{1, 3}, {4, 7}}, dranges: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9}, smplsExp: buildSmpls([]int64{0, 8, 9}),
}, },
{ {
intervals: Intervals{{1, 3}, {4, 700}}, dranges: Intervals{{1, 3}, {4, 700}},
remaint: []int64{0}, smplsExp: buildSmpls([]int64{0}),
}, },
{ { // This case is to ensure that labels and symbols are deleted.
intervals: Intervals{{0, 9}}, dranges: Intervals{{0, 9}},
remaint: []int64{}, smplsExp: buildSmpls([]int64{}),
}, },
} }
Outer: Outer:
for _, c := range cases { for _, c := range cases {
// Reset the tombstones. dir, err := ioutil.TempDir("", "test_wal_reload")
head.tombstones = newMemTombstones() testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
app := head.Appender()
for _, smpl := range smplsAll {
_, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
// Delete the ranges. // Delete the ranges.
for _, r := range c.intervals { for _, r := range c.dranges {
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)))
} }
// Compare the result. // Compare the samples for both heads - before and after the reload.
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err) testutil.Ok(t, err)
res, err := q.Select(labels.NewEqualMatcher("a", "b")) reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, reloadedHead.Init(0))
for _, h := range []*Head{head, reloadedHead} {
indexr, err := h.Index()
testutil.Ok(t, err)
// Use an emptyTombstoneReader explicitly to get all the samples.
css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
expSamples := make([]tsdbutil.Sample, 0, len(c.remaint)) // Getting the actual samples.
for _, ts := range c.remaint { actSamples := make([]sample, 0)
expSamples = append(expSamples, sample{ts, smpls[ts]}) for css.Next() {
} lblsAct, chkMetas, intv := css.At()
testutil.Equals(t, labels.Labels{lblDefault}, lblsAct)
testutil.Equals(t, 0, len(intv))
expss := newMockSeriesSet([]Series{ chunkr, err := h.Chunks()
newSeries(map[string]string{"a": "b"}, expSamples), testutil.Ok(t, err)
}) for _, meta := range chkMetas {
chk, err := chunkr.Chunk(meta.Ref)
if len(expSamples) == 0 { testutil.Ok(t, err)
testutil.Assert(t, res.Next() == false, "") ii := chk.Iterator()
continue for ii.Next() {
} t, v := ii.At()
actSamples = append(actSamples, sample{t: t, v: v})
for { }
eok, rok := expss.Next(), res.Next() }
testutil.Equals(t, eok, rok)
if !eok {
continue Outer
} }
sexp := expss.At()
sres := res.At()
testutil.Equals(t, sexp.Labels(), sres.Labels()) testutil.Ok(t, css.Err())
testutil.Equals(t, c.smplsExp, actSamples)
}
smplExp, errExp := expandSeriesIterator(sexp.Iterator()) // Compare the query results for both heads - before and after the reload.
smplRes, errRes := expandSeriesIterator(sres.Iterator()) expSeriesSet := newMockSeriesSet([]Series{
newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample {
ss := make([]tsdbutil.Sample, 0, len(c.smplsExp))
for _, s := range c.smplsExp {
ss = append(ss, s)
}
return ss
}(),
),
})
for _, h := range []*Head{head, reloadedHead} {
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
testutil.Ok(t, err)
actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
testutil.Equals(t, errExp, errRes) lns, err := q.LabelNames()
testutil.Equals(t, smplExp, smplRes) testutil.Ok(t, err)
lvs, err := q.LabelValues(lblDefault.Name)
testutil.Ok(t, err)
// When all samples are deleted we expect that no labels should exist either.
if len(c.smplsExp) == 0 {
testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, actSeriesSet.Next() == false, "")
testutil.Ok(t, h.Close())
continue
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, lblDefault.Name, lns[0])
testutil.Equals(t, lblDefault.Value, lvs[0])
}
for {
eok, rok := expSeriesSet.Next(), actSeriesSet.Next()
testutil.Equals(t, eok, rok)
if !eok {
testutil.Ok(t, h.Close())
continue Outer
}
expSeries := expSeriesSet.At()
actSeries := actSeriesSet.At()
testutil.Equals(t, expSeries.Labels(), actSeries.Labels())
smplExp, errExp := expandSeriesIterator(expSeries.Iterator())
smplRes, errRes := expandSeriesIterator(actSeries.Iterator())
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
}
} }
} }
} }
@ -524,8 +589,6 @@ func TestDelete_e2e(t *testing.T) {
// TODO: Add Regexp Matchers. // TODO: Add Regexp Matchers.
} }
for _, del := range dels { for _, del := range dels {
// Reset the deletes everytime.
hb.tombstones = newMemTombstones()
for _, r := range del.drange { for _, r := range del.drange {
testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...))
} }
@ -585,7 +648,6 @@ func TestDelete_e2e(t *testing.T) {
} }
} }
} }
return
} }
func boundedSamples(full []sample, mint, maxt int64) []sample { func boundedSamples(full []sample, mint, maxt int64) []sample {
@ -946,4 +1008,5 @@ func TestWalRepair(t *testing.T) {
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")
}) })
} }
} }

View file

@ -33,12 +33,9 @@ func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) } func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
@ -142,10 +139,8 @@ func newDecbufUvarintAt(bs ByteSlice, off int) decbuf {
return dec return dec
} }
func (d *decbuf) uvarint() int { return int(d.uvarint64()) } func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes. // crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 { func (d *decbuf) crc32() uint32 {
@ -218,31 +213,6 @@ func (d *decbuf) be32() uint32 {
return x return x
} }
func (d *decbuf) byte() byte {
if d.e != nil {
return 0
}
if len(d.b) < 1 {
d.e = errInvalidSize
return 0
}
x := d.b[0]
d.b = d.b[1:]
return x
}
func (d *decbuf) decbuf(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error { return d.e } func (d *decbuf) err() error { return d.e }
func (d *decbuf) len() int { return len(d.b) } func (d *decbuf) len() int { return len(d.b) }
func (d *decbuf) get() []byte { return d.b } func (d *decbuf) get() []byte { return d.b }

View file

@ -45,6 +45,8 @@ const (
FormatV2 = 2 FormatV2 = 2
labelNameSeperator = "\xff" labelNameSeperator = "\xff"
indexFilename = "index"
) )
type indexWriterSeries struct { type indexWriterSeries struct {
@ -752,7 +754,7 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
symbolSlice []string symbolSlice []string
symbols = map[uint32]string{} symbols = map[uint32]string{}
) )
if version == 2 { if version == FormatV2 {
symbolSlice = make([]string, 0, cnt) symbolSlice = make([]string, 0, cnt)
} }

View file

@ -151,7 +151,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
fn := filepath.Join(dir, "index") fn := filepath.Join(dir, indexFilename)
// An empty index must still result in a readable file. // An empty index must still result in a readable file.
iw, err := NewWriter(fn) iw, err := NewWriter(fn)
@ -177,7 +177,7 @@ func TestIndexRW_Postings(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
fn := filepath.Join(dir, "index") fn := filepath.Join(dir, indexFilename)
iw, err := NewWriter(fn) iw, err := NewWriter(fn)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -271,7 +271,7 @@ func TestPersistence_index_e2e(t *testing.T) {
}) })
} }
iw, err := NewWriter(filepath.Join(dir, "index")) iw, err := NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, iw.AddSymbols(symbols)) testutil.Ok(t, iw.AddSymbols(symbols))
@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close() err = iw.Close()
testutil.Ok(t, err) testutil.Ok(t, err)
ir, err := NewFileReader(filepath.Join(dir, "index")) ir, err := NewFileReader(filepath.Join(dir, indexFilename))
testutil.Ok(t, err) testutil.Ok(t, err)
for p := range mi.postings { for p := range mi.postings {

View file

@ -305,9 +305,8 @@ func Intersect(its ...Postings) Postings {
} }
type intersectPostings struct { type intersectPostings struct {
a, b Postings a, b Postings
aok, bok bool cur uint64
cur uint64
} }
func newIntersectPostings(a, b Postings) *intersectPostings { func newIntersectPostings(a, b Postings) *intersectPostings {

View file

@ -61,18 +61,6 @@ func TestMemPostings_ensureOrder(t *testing.T) {
} }
} }
type mockPostings struct {
next func() bool
seek func(uint64) bool
value func() uint64
err func() error
}
func (m *mockPostings) Next() bool { return m.next() }
func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) }
func (m *mockPostings) Value() uint64 { return m.value() }
func (m *mockPostings) Err() error { return m.err() }
func TestIntersect(t *testing.T) { func TestIntersect(t *testing.T) {
var cases = []struct { var cases = []struct {
a, b []uint64 a, b []uint64
@ -300,8 +288,6 @@ func TestMergedPostingsSeek(t *testing.T) {
testutil.Equals(t, c.res, lst) testutil.Equals(t, c.res, lst)
} }
} }
return
} }
func TestRemovedPostings(t *testing.T) { func TestRemovedPostings(t *testing.T) {
@ -463,8 +449,6 @@ func TestRemovedPostingsSeek(t *testing.T) {
testutil.Equals(t, c.res, lst) testutil.Equals(t, c.res, lst)
} }
} }
return
} }
func TestBigEndian(t *testing.T) { func TestBigEndian(t *testing.T) {

View file

@ -56,18 +56,6 @@ func newMockSeriesSet(list []Series) *mockSeriesSet {
} }
} }
type mockSeriesIterator struct {
seek func(int64) bool
at func() (int64, float64)
next func() bool
err func() error
}
func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) }
func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
func (m *mockSeriesIterator) Next() bool { return m.next() }
func (m *mockSeriesIterator) Err() error { return m.err() }
func TestMergedSeriesSet(t *testing.T) { func TestMergedSeriesSet(t *testing.T) {
cases := []struct { cases := []struct {
@ -399,8 +387,6 @@ Outer:
testutil.Equals(t, smplExp, smplRes) testutil.Equals(t, smplExp, smplRes)
} }
} }
return
} }
func TestBlockQuerierDelete(t *testing.T) { func TestBlockQuerierDelete(t *testing.T) {
@ -563,8 +549,6 @@ Outer:
testutil.Equals(t, smplExp, smplRes) testutil.Equals(t, smplExp, smplRes)
} }
} }
return
} }
func TestBaseChunkSeries(t *testing.T) { func TestBaseChunkSeries(t *testing.T) {
@ -661,8 +645,6 @@ func TestBaseChunkSeries(t *testing.T) {
testutil.Equals(t, len(tc.expIdxs), i) testutil.Equals(t, len(tc.expIdxs), i)
testutil.Ok(t, bcs.Err()) testutil.Ok(t, bcs.Err())
} }
return
} }
// TODO: Remove after simpleSeries is merged // TODO: Remove after simpleSeries is merged
@ -986,8 +968,6 @@ func TestSeriesIterator(t *testing.T) {
} }
}) })
}) })
return
} }
// Regression for: https://github.com/prometheus/tsdb/pull/97 // Regression for: https://github.com/prometheus/tsdb/pull/97
@ -1088,7 +1068,6 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
maxt: 15, maxt: 15,
} }
testutil.Assert(t, p.Next() == false, "") testutil.Assert(t, p.Next() == false, "")
return
} }
type mockChunkSeriesSet struct { type mockChunkSeriesSet struct {

View file

@ -64,7 +64,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
if err != nil { if err != nil {
return wrapErr(err, d) return wrapErr(err, d)
} }
broken, err := os.Open(filepath.Join(d, "index")) broken, err := os.Open(filepath.Join(d, indexFilename))
if err != nil { if err != nil {
return wrapErr(err, d) return wrapErr(err, d)
} }

View file

@ -30,7 +30,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
// at a broken revision. // at a broken revision.
// //
// func main() { // func main() {
// w, err := index.NewWriter("index") // w, err := index.NewWriter(indexFilename)
// if err != nil { // if err != nil {
// panic(err) // panic(err)
// } // }
@ -72,7 +72,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777) os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777)
defer os.RemoveAll(filepath.Join(dbDir, "chunks")) defer os.RemoveAll(filepath.Join(dbDir, "chunks"))
r, err := index.NewFileReader(filepath.Join(dbDir, "index")) r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename))
testutil.Ok(t, err) testutil.Ok(t, err)
p, err := r.Postings("b", "1") p, err := r.Postings("b", "1")
testutil.Ok(t, err) testutil.Ok(t, err)
@ -95,7 +95,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
db.Close() db.Close()
r, err = index.NewFileReader(filepath.Join(tmpDbDir, "index")) r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename))
testutil.Ok(t, err) testutil.Ok(t, err)
p, err = r.Postings("b", "1") p, err = r.Postings("b", "1")
testutil.Ok(t, err) testutil.Ok(t, err)

View file

@ -1,2 +0,0 @@
# Enable only "legacy" staticcheck verifications.
checks = [ "SA*" ]

View file

@ -66,6 +66,15 @@ func Equals(tb testing.TB, exp, act interface{}, msgAndArgs ...interface{}) {
} }
} }
// NotEquals fails the test if exp is equal to act.
func NotEquals(tb testing.TB, exp, act interface{}) {
if reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: Expected different exp and got\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act)
tb.FailNow()
}
}
func formatMessage(msgAndArgs []interface{}) string { func formatMessage(msgAndArgs []interface{}) string {
if len(msgAndArgs) == 0 { if len(msgAndArgs) == 0 {
return "" return ""

View file

@ -120,7 +120,6 @@ func TestAddingNewIntervals(t *testing.T) {
testutil.Equals(t, c.exp, c.exist.add(c.new)) testutil.Equals(t, c.exp, c.exist.add(c.new))
} }
return
} }
// TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines. // TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines.

View file

@ -226,7 +226,6 @@ func TestReader_Live(t *testing.T) {
func TestWAL_FuzzWriteRead_Live(t *testing.T) { func TestWAL_FuzzWriteRead_Live(t *testing.T) {
const count = 5000 const count = 5000
const segmentSize = int64(128 * 1024 * 1204)
var input [][]byte var input [][]byte
lock := sync.RWMutex{} lock := sync.RWMutex{}
var recs [][]byte var recs [][]byte