Merge remote-tracking branch 'upstream/master' into delete-compact-block-on-reload-error

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2019-02-08 13:26:28 +02:00
commit e138c7ed7e
22 changed files with 372 additions and 326 deletions

View file

@ -29,12 +29,15 @@ GO ?= go
GOFMT ?= $(GO)fmt
FIRST_GOPATH := $(firstword $(subst :, ,$(shell $(GO) env GOPATH)))
GOOPTS ?=
GOHOSTOS ?= $(shell $(GO) env GOHOSTOS)
GOHOSTARCH ?= $(shell $(GO) env GOHOSTARCH)
GO_VERSION ?= $(shell $(GO) version)
GO_VERSION_NUMBER ?= $(word 3, $(GO_VERSION))
PRE_GO_111 ?= $(shell echo $(GO_VERSION_NUMBER) | grep -E 'go1\.(10|[0-9])\.')
unexport GOVENDOR
GOVENDOR :=
GO111MODULE :=
ifeq (, $(PRE_GO_111))
ifneq (,$(wildcard go.mod))
# Enforce Go modules support just in case the directory is inside GOPATH (and for Travis CI).
@ -55,24 +58,35 @@ $(warning Some recipes may not work as expected as the current Go runtime is '$(
# This repository isn't using Go modules (yet).
GOVENDOR := $(FIRST_GOPATH)/bin/govendor
endif
unexport GO111MODULE
endif
PROMU := $(FIRST_GOPATH)/bin/promu
STATICCHECK := $(FIRST_GOPATH)/bin/staticcheck
pkgs = ./...
GO_VERSION ?= $(shell $(GO) version)
GO_BUILD_PLATFORM ?= $(subst /,-,$(lastword $(GO_VERSION)))
ifeq (arm, $(GOHOSTARCH))
GOHOSTARM ?= $(shell GOARM= $(GO) env GOARM)
GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH)v$(GOHOSTARM)
else
GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH)
endif
PROMU_VERSION ?= 0.2.0
PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz
STATICCHECK_VERSION ?= 2019.1
STATICCHECK_URL := https://github.com/dominikh/go-tools/releases/download/$(STATICCHECK_VERSION)/staticcheck_$(GOHOSTOS)_$(GOHOSTARCH)
PREFIX ?= $(shell pwd)
BIN_DIR ?= $(shell pwd)
DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))
DOCKER_REPO ?= prom
ifeq ($(GOHOSTARCH),amd64)
ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux freebsd darwin windows))
# Only supported on amd64
test-flags := -race
endif
endif
.PHONY: all
all: precheck style staticcheck unused build test
@ -110,12 +124,12 @@ common-test-short:
.PHONY: common-test
common-test:
@echo ">> running all tests"
GO111MODULE=$(GO111MODULE) $(GO) test -race $(GOOPTS) $(pkgs)
GO111MODULE=$(GO111MODULE) $(GO) test $(test-flags) $(GOOPTS) $(pkgs)
.PHONY: common-format
common-format:
@echo ">> formatting code"
GO111MODULE=$(GO111MODULE) $(GO) fmt $(GOOPTS) $(pkgs)
GO111MODULE=$(GO111MODULE) $(GO) fmt $(pkgs)
.PHONY: common-vet
common-vet:
@ -125,8 +139,12 @@ common-vet:
.PHONY: common-staticcheck
common-staticcheck: $(STATICCHECK)
@echo ">> running staticcheck"
chmod +x $(STATICCHECK)
ifdef GO111MODULE
GO111MODULE=$(GO111MODULE) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" -checks "SA*" $(pkgs)
# 'go list' needs to be executed before staticcheck to prepopulate the modules cache.
# Otherwise staticcheck might fail randomly for some reason not yet explained.
GO111MODULE=$(GO111MODULE) $(GO) list -e -compiled -test=true -export=false -deps=true -find=false -tags= -- ./... > /dev/null
GO111MODULE=$(GO111MODULE) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs)
else
$(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs)
endif
@ -140,8 +158,9 @@ else
ifdef GO111MODULE
@echo ">> running check for unused/missing packages in go.mod"
GO111MODULE=$(GO111MODULE) $(GO) mod tidy
ifeq (,$(wildcard vendor))
@git diff --exit-code -- go.sum go.mod
ifneq (,$(wildcard vendor))
else
@echo ">> running check for unused packages in vendor/"
GO111MODULE=$(GO111MODULE) $(GO) mod vendor
@git diff --exit-code -- go.sum go.mod vendor/
@ -175,30 +194,20 @@ common-docker-tag-latest:
promu: $(PROMU)
$(PROMU):
curl -s -L $(PROMU_URL) | tar -xvz -C /tmp
mkdir -v -p $(FIRST_GOPATH)/bin
cp -v /tmp/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(PROMU)
$(eval PROMU_TMP := $(shell mktemp -d))
curl -s -L $(PROMU_URL) | tar -xvzf - -C $(PROMU_TMP)
mkdir -p $(FIRST_GOPATH)/bin
cp $(PROMU_TMP)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(FIRST_GOPATH)/bin/promu
rm -r $(PROMU_TMP)
.PHONY: proto
proto:
@echo ">> generating code from proto files"
@./scripts/genproto.sh
.PHONY: $(STATICCHECK)
$(STATICCHECK):
ifdef GO111MODULE
# Get staticcheck from a temporary directory to avoid modifying the local go.{mod,sum}.
# See https://github.com/golang/go/issues/27643.
# For now, we are using the next branch of staticcheck because master isn't compatible yet with Go modules.
tmpModule=$$(mktemp -d 2>&1) && \
mkdir -p $${tmpModule}/staticcheck && \
cd "$${tmpModule}"/staticcheck && \
GO111MODULE=on $(GO) mod init example.com/staticcheck && \
GO111MODULE=on GOOS= GOARCH= $(GO) get -u honnef.co/go/tools/cmd/staticcheck@next && \
rm -rf $${tmpModule};
else
GOOS= GOARCH= GO111MODULE=off $(GO) get -u honnef.co/go/tools/cmd/staticcheck
endif
mkdir -p $(FIRST_GOPATH)/bin
curl -s -L $(STATICCHECK_URL) > $(STATICCHECK)
ifdef GOVENDOR
.PHONY: $(GOVENDOR)

View file

@ -53,16 +53,6 @@ func newBReader(b []byte) bstream {
return bstream{stream: b, count: 8}
}
func newBWriter(size int) *bstream {
return &bstream{stream: make([]byte, 0, size), count: 0}
}
func (b *bstream) clone() *bstream {
d := make([]byte, len(b.stream))
copy(d, b.stream)
return &bstream{stream: d, count: b.count}
}
func (b *bstream) bytes() []byte {
return b.stream
}

View file

@ -32,7 +32,7 @@ func TestChunk(t *testing.T) {
for enc, nc := range map[Encoding]func() Chunk{
EncXOR: func() Chunk { return NewXORChunk() },
} {
t.Run(fmt.Sprintf("%s", enc), func(t *testing.T) {
t.Run(fmt.Sprintf("%v", enc), func(t *testing.T) {
for range make([]struct{}, 1) {
c := nc()
if err := testChunk(c); err != nil {

View file

@ -65,8 +65,6 @@ func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
var (
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum")
)
var castagnoliTable *crc32.Table

View file

@ -87,7 +87,7 @@ func main() {
block = blocks[len(blocks)-1]
}
if block == nil {
exitWithError(fmt.Errorf("Block not found"))
exitWithError(fmt.Errorf("block not found"))
}
analyzeBlock(block, *analyzeLimit)
}
@ -340,12 +340,6 @@ func measureTime(stage string, f func()) time.Duration {
return time.Since(start)
}
func mapToLabels(m map[string]interface{}, l *labels.Labels) {
for k, v := range m {
*l = append(*l, labels.Label{Name: k, Value: v.(string)})
}
}
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
scanner := bufio.NewScanner(r)

View file

@ -71,7 +71,6 @@ type Compactor interface {
// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
dir string
metrics *compactorMetrics
logger log.Logger
ranges []int64

View file

@ -223,6 +223,34 @@ func TestDBAppenderAddRef(t *testing.T) {
func TestDeleteSimple(t *testing.T) {
numSamples := int64(10)
cases := []struct {
intervals Intervals
remaint []int64
}{
{
intervals: Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
{
intervals: Intervals{{1, 3}, {4, 700}},
remaint: []int64{0},
},
{ // This case is to ensure that labels and symbols are deleted.
intervals: Intervals{{0, 9}},
remaint: []int64{},
},
}
Outer:
for _, c := range cases {
db, delete := openTestDB(t, nil)
defer func() {
testutil.Ok(t, db.Close())
@ -238,18 +266,7 @@ func TestDeleteSimple(t *testing.T) {
}
testutil.Ok(t, app.Commit())
cases := []struct {
intervals Intervals
remaint []int64
}{
{
intervals: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Outer:
for _, c := range cases {
// TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges.
for _, r := range c.intervals {
@ -272,9 +289,20 @@ Outer:
newSeries(map[string]string{"a": "b"}, expSamples),
})
lns, err := q.LabelNames()
testutil.Ok(t, err)
lvs, err := q.LabelValues("a")
testutil.Ok(t, err)
if len(expSamples) == 0 {
testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, res.Next() == false, "")
continue
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, "a", lns[0])
testutil.Equals(t, "b", lvs[0])
}
for {
@ -537,8 +565,6 @@ func TestDB_e2e(t *testing.T) {
numDatapoints = 1000
numRanges = 1000
timeInterval = int64(3)
maxTime = int64(2 * 1000)
minTime = int64(200)
)
// Create 8 series with 1000 data-points of different ranges and run queries.
lbls := [][]labels.Label{
@ -691,8 +717,6 @@ func TestDB_e2e(t *testing.T) {
q.Close()
}
}
return
}
func TestWALFlushedOnDBClose(t *testing.T) {

View file

@ -85,34 +85,40 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc
`mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one.
```
┌─────────────────────────────────────────────────────────────────────────┐
┌─────────────────────────────────────────────────────────────────────────
│ len <uvarint>
├─────────────────────────────────────────────────────────────────────────┤
│ ┌──────────────────┬──────────────────────────────────────────────────┐ │
│ │ │ ┌──────────────────────────────────────────┐ │ │
│ │ │ │ ref(l_i.name) <uvarint> │ │ │
│ │ #labels │ ├──────────────────────────────────────────┤ ... │ │
│ │ <uvarint> │ │ ref(l_i.value) <uvarint> │ │ │
│ │ │ └──────────────────────────────────────────┘ │ │
│ ├──────────────────┼──────────────────────────────────────────────────┤ │
│ │ │ ┌──────────────────────────────────────────┐ │ │
│ │ │ │ c_0.mint <varint> │ │ │
│ │ │ ├──────────────────────────────────────────┤ │ │
│ │ │ │ c_0.maxt - c_0.mint <uvarint> │ │ │
│ │ │ ├──────────────────────────────────────────┤ │ │
│ │ │ │ ref(c_0.data) <uvarint> │ │ │
│ │ #chunks │ └──────────────────────────────────────────┘ │ │
│ │ <uvarint> │ ┌──────────────────────────────────────────┐ │ │
│ │ │ │ c_i.mint - c_i-1.maxt <uvarint> │ │ │
│ │ │ ├──────────────────────────────────────────┤ │ │
│ │ │ │ c_i.maxt - c_i.mint <uvarint> │ │ │
│ │ │ ├──────────────────────────────────────────┤ ... │ │
│ │ │ │ ref(c_i.data) - ref(c_i-1.data) <varint> │ │ │
│ │ │ └──────────────────────────────────────────┘ │ │
│ └──────────────────┴──────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────────────┤
├──────────────────────────────────────────────────────────────────────────┤
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ labels count <uvarint64> │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ ref(l_i.name) <uvarint32> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(l_i.value) <uvarint32> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ... │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ chunks count <uvarint64> │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ c_0.mint <varint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ c_0.maxt - c_0.mint <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(c_0.data) <uvarint64> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ c_i.mint - c_i-1.maxt <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ c_i.maxt - c_i.mint <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(c_i.data) - ref(c_i-1.data) <varint64> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ... │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────────────────────┤
│ CRC32 <4b>
└─────────────────────────────────────────────────────────────────────────┘
└─────────────────────────────────────────────────────────────────────────
```
@ -176,24 +182,24 @@ The sequence of postings sections is finalized by an [offset table](#offset-tabl
An offset table stores a sequence of entries that maps a list of strings to an offset. They are used to track label index and postings sections. They are read into memory when an index file is loaded.
```
┌─────────────────────┬────────────────────┐
┌─────────────────────┬──────────────────────
│ len <4b>#entries <4b>
├─────────────────────┴────────────────────┤
│ ┌──────────────────────────────────────┐ │
├─────────────────────┴──────────────────────
│ ┌────────────────────────────────────────┐ │
│ │ n = #strs <uvarint> │ │
│ ├──────────────────────┬───────────────┤ │
│ ├──────────────────────┬─────────────────┤ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├──────────────────────┴───────────────┤ │
│ ├──────────────────────┴─────────────────┤ │
│ │ ... │ │
│ ├──────────────────────┬───────────────┤ │
│ ├──────────────────────┬─────────────────┤ │
│ │ len(str_n) <uvarint> │ str_n <bytes> │ │
│ ├──────────────────────┴───────────────┤ │
│ │ offset <uvarint> │ │
│ └──────────────────────────────────────┘ │
│ ├──────────────────────┴─────────────────┤ │
│ │ offset <uvarint64> │ │
│ └────────────────────────────────────────┘ │
│ . . . │
├──────────────────────────────────────────┤
├────────────────────────────────────────────
│ CRC32 <4b>
└──────────────────────────────────────────┘
└────────────────────────────────────────────
```

View file

@ -25,7 +25,7 @@ The stones section is 0 padded to a multiple of 4 for fast scans.
# Tombstone
```
┌─────────────┬───────────────┬──────────────┐
│ref <varint> │ mint <varint> │ maxt <varint>
└─────────────┴───────────────┴──────────────┘
┌─────────────────────────────────┬────────────────┐
│ref <uvarint64> │ mint <varint64> │ maxt <varint64>
└─────────────────────────────────┴────────────────┘
```

View file

@ -15,8 +15,6 @@ package tsdb
import (
"encoding/binary"
"hash"
"hash/crc32"
"unsafe"
"github.com/pkg/errors"
@ -32,16 +30,11 @@ type encbuf struct {
func (e *encbuf) reset() { e.b = e.b[:0] }
func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE32(x uint32) {
@ -71,16 +64,6 @@ func (e *encbuf) putUvarintStr(s string) {
e.putString(s)
}
// putHash appends a hash over the buffers current contents to the buffer.
func (e *encbuf) putHash(h hash.Hash) {
h.Reset()
_, err := h.Write(e.b)
if err != nil {
panic(err) // The CRC32 implementation does not error
}
e.b = h.Sum(e.b)
}
// decbuf provides safe methods to extract data from a byte slice. It does all
// necessary bounds checking and advancing of the byte slice.
// Several datums can be extracted without checking for errors. However, before using
@ -91,15 +74,8 @@ type decbuf struct {
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
return crc32.Checksum(d.b, castagnoliTable)
}
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {
@ -179,18 +155,6 @@ func (d *decbuf) byte() byte {
return x
}
func (d *decbuf) decbuf(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error { return d.e }
func (d *decbuf) len() int { return len(d.b) }
func (d *decbuf) get() []byte { return d.b }

View file

@ -77,9 +77,8 @@ func copyFile(src, dest string) error {
// returns relative paths to all files and empty directories.
func readDirs(src string) ([]string, error) {
var files []string
var err error
err = filepath.Walk(src, func(path string, f os.FileInfo, err error) error {
err := filepath.Walk(src, func(path string, f os.FileInfo, err error) error {
relativePath := strings.TrimPrefix(path, src)
if len(relativePath) > 0 {
files = append(files, relativePath)

109
head.go
View file

@ -48,6 +48,10 @@ var (
// ErrOutOfBounds is returned if an appended sample is out of the
// writable time range.
ErrOutOfBounds = errors.New("out of bounds")
// emptyTombstoneReader is a no-op Tombstone Reader.
// This is used by head to satisfy the Tombstones() function call.
emptyTombstoneReader = newMemTombstones()
)
// Head handles reads and writes of time series data within a time window.
@ -71,8 +75,6 @@ type Head struct {
values map[string]stringset // label names to possible values
postings *index.MemPostings // postings lists for terms
tombstones *memTombstones
}
type headMetrics struct {
@ -231,7 +233,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
values: map[string]stringset{},
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
tombstones: newMemTombstones(),
}
h.metrics = newHeadMetrics(h, r)
@ -338,8 +339,10 @@ func (h *Head) loadWAL(r *wal.Reader) error {
series []RefSeries
samples []RefSample
tstones []Stone
allStones = newMemTombstones()
err error
)
defer allStones.Close()
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
rec := r.Record()
@ -413,7 +416,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
if itv.Maxt < h.minValidTime {
continue
}
h.tombstones.addInterval(s.ref, itv)
allStones.addInterval(s.ref, itv)
}
}
default:
@ -436,6 +439,12 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
wg.Wait()
if err := allStones.Iter(func(ref uint64, dranges Intervals) error {
return h.chunkRewrite(ref, dranges)
}); err != nil {
return errors.Wrap(r.Err(), "deleting samples from tombstones")
}
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
}
@ -604,7 +613,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) {
}
func (h *rangeHead) Tombstones() (TombstoneReader, error) {
return h.head.tombstones, nil
return emptyTombstoneReader, nil
}
// initAppender is a helper to initialize the time bounds of the head
@ -849,7 +858,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
}
var stones []Stone
dirty := false
for p.Next() {
series := h.series.getByID(p.At())
@ -859,22 +868,61 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
}
// Delete only until the current values and not beyond.
t0, t1 = clampInterval(mint, maxt, t0, t1)
if h.wal != nil {
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
}
if err := h.chunkRewrite(p.At(), Intervals{{t0, t1}}); err != nil {
return errors.Wrap(err, "delete samples")
}
dirty = true
}
if p.Err() != nil {
return p.Err()
}
var enc RecordEncoder
if h.wal != nil {
// Although we don't store the stones in the head
// we need to write them to the WAL to mark these as deleted
// after a restart while loeading the WAL.
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
return err
}
}
for _, s := range stones {
h.tombstones.addInterval(s.ref, s.intervals[0])
if dirty {
h.gc()
}
return nil
}
// chunkRewrite re-writes the chunks which overlaps with deleted ranges
// and removes the samples in the deleted ranges.
// Chunks is deleted if no samples are left at the end.
func (h *Head) chunkRewrite(ref uint64, dranges Intervals) (err error) {
if len(dranges) == 0 {
return nil
}
ms := h.series.getByID(ref)
ms.Lock()
defer ms.Unlock()
if len(ms.chunks) == 0 {
return nil
}
metas := ms.chunksMetas()
mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime
it := newChunkSeriesIterator(metas, dranges, mint, maxt)
ms.reset()
for it.Next() {
t, v := it.At()
ok, _ := ms.append(t, v)
if !ok {
level.Warn(h.logger).Log("msg", "failed to add sample during delete")
}
}
return nil
}
@ -926,7 +974,7 @@ func (h *Head) gc() {
// Tombstones returns a new reader over the head's tombstones
func (h *Head) Tombstones() (TombstoneReader, error) {
return h.tombstones, nil
return emptyTombstoneReader, nil
}
// Index returns an IndexReader against the block.
@ -1406,6 +1454,16 @@ type memSeries struct {
app chunkenc.Appender // Current appender for the chunk.
}
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
}
return s
}
func (s *memSeries) minTime() int64 {
if len(s.chunks) == 0 {
return math.MinInt64
@ -1442,14 +1500,24 @@ func (s *memSeries) cut(mint int64) *memChunk {
return c
}
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
func (s *memSeries) chunksMetas() []chunks.Meta {
metas := make([]chunks.Meta, 0, len(s.chunks))
for _, chk := range s.chunks {
metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime})
}
return s
return metas
}
// reset re-initialises all the variable in the memSeries except 'lset', 'ref',
// and 'chunkRange', like how it would appear after 'newMemSeries(...)'.
func (s *memSeries) reset() {
s.chunks = nil
s.headChunk = nil
s.firstChunkID = 0
s.nextAt = math.MinInt64
s.sampleBuf = [4]sample{}
s.pendingCommit = false
s.app = nil
}
// appendable checks whether the given sample is valid for appending to the series.
@ -1628,11 +1696,6 @@ func (ss stringset) set(s string) {
ss[s] = struct{}{}
}
func (ss stringset) has(s string) bool {
_, ok := ss[s]
return ok
}
func (ss stringset) String() string {
return strings.Join(ss.slice(), ",")
}

View file

@ -18,6 +18,7 @@ import (
"math"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"testing"
@ -296,97 +297,161 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
}
func TestHeadDeleteSimple(t *testing.T) {
numSamples := int64(10)
head, err := NewHead(nil, nil, nil, 1000)
testutil.Ok(t, err)
defer head.Close()
app := head.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
buildSmpls := func(s []int64) []sample {
ss := make([]sample, 0, len(s))
for _, t := range s {
ss = append(ss, sample{t: t, v: float64(t)})
}
return ss
}
smplsAll := buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
lblDefault := labels.Label{"a", "b"}
testutil.Ok(t, app.Commit())
cases := []struct {
intervals Intervals
remaint []int64
dranges Intervals
smplsExp []sample
}{
{
intervals: Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9},
dranges: Intervals{{0, 3}},
smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}),
},
{
intervals: Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
dranges: Intervals{{1, 3}},
smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}),
},
{
intervals: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
dranges: Intervals{{1, 3}, {4, 7}},
smplsExp: buildSmpls([]int64{0, 8, 9}),
},
{
intervals: Intervals{{1, 3}, {4, 700}},
remaint: []int64{0},
dranges: Intervals{{1, 3}, {4, 700}},
smplsExp: buildSmpls([]int64{0}),
},
{
intervals: Intervals{{0, 9}},
remaint: []int64{},
{ // This case is to ensure that labels and symbols are deleted.
dranges: Intervals{{0, 9}},
smplsExp: buildSmpls([]int64{}),
},
}
Outer:
for _, c := range cases {
// Reset the tombstones.
head.tombstones = newMemTombstones()
dir, err := ioutil.TempDir("", "test_wal_reload")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
app := head.Appender()
for _, smpl := range smplsAll {
_, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
// Delete the ranges.
for _, r := range c.intervals {
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
for _, r := range c.dranges {
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)))
}
// Compare the result.
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
// Compare the samples for both heads - before and after the reload.
reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err)
res, err := q.Select(labels.NewEqualMatcher("a", "b"))
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
testutil.Ok(t, err)
testutil.Ok(t, reloadedHead.Init(0))
for _, h := range []*Head{head, reloadedHead} {
indexr, err := h.Index()
testutil.Ok(t, err)
// Use an emptyTombstoneReader explicitly to get all the samples.
css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]})
// Getting the actual samples.
actSamples := make([]sample, 0)
for css.Next() {
lblsAct, chkMetas, intv := css.At()
testutil.Equals(t, labels.Labels{lblDefault}, lblsAct)
testutil.Equals(t, 0, len(intv))
chunkr, err := h.Chunks()
testutil.Ok(t, err)
for _, meta := range chkMetas {
chk, err := chunkr.Chunk(meta.Ref)
testutil.Ok(t, err)
ii := chk.Iterator()
for ii.Next() {
t, v := ii.At()
actSamples = append(actSamples, sample{t: t, v: v})
}
}
}
expss := newMockSeriesSet([]Series{
newSeries(map[string]string{"a": "b"}, expSamples),
testutil.Ok(t, css.Err())
testutil.Equals(t, c.smplsExp, actSamples)
}
// Compare the query results for both heads - before and after the reload.
expSeriesSet := newMockSeriesSet([]Series{
newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample {
ss := make([]tsdbutil.Sample, 0, len(c.smplsExp))
for _, s := range c.smplsExp {
ss = append(ss, s)
}
return ss
}(),
),
})
for _, h := range []*Head{head, reloadedHead} {
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
testutil.Ok(t, err)
actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
if len(expSamples) == 0 {
testutil.Assert(t, res.Next() == false, "")
lns, err := q.LabelNames()
testutil.Ok(t, err)
lvs, err := q.LabelValues(lblDefault.Name)
testutil.Ok(t, err)
// When all samples are deleted we expect that no labels should exist either.
if len(c.smplsExp) == 0 {
testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, actSeriesSet.Next() == false, "")
testutil.Ok(t, h.Close())
continue
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, lblDefault.Name, lns[0])
testutil.Equals(t, lblDefault.Value, lvs[0])
}
for {
eok, rok := expss.Next(), res.Next()
eok, rok := expSeriesSet.Next(), actSeriesSet.Next()
testutil.Equals(t, eok, rok)
if !eok {
testutil.Ok(t, h.Close())
continue Outer
}
sexp := expss.At()
sres := res.At()
expSeries := expSeriesSet.At()
actSeries := actSeriesSet.At()
testutil.Equals(t, sexp.Labels(), sres.Labels())
testutil.Equals(t, expSeries.Labels(), actSeries.Labels())
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
smplExp, errExp := expandSeriesIterator(expSeries.Iterator())
smplRes, errRes := expandSeriesIterator(actSeries.Iterator())
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
}
}
}
}
func TestDeleteUntilCurMax(t *testing.T) {
numSamples := int64(10)
@ -524,8 +589,6 @@ func TestDelete_e2e(t *testing.T) {
// TODO: Add Regexp Matchers.
}
for _, del := range dels {
// Reset the deletes everytime.
hb.tombstones = newMemTombstones()
for _, r := range del.drange {
testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...))
}
@ -585,7 +648,6 @@ func TestDelete_e2e(t *testing.T) {
}
}
}
return
}
func boundedSamples(full []sample, mint, maxt int64) []sample {
@ -946,4 +1008,5 @@ func TestWalRepair(t *testing.T) {
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")
})
}
}

View file

@ -33,12 +33,9 @@ func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
@ -143,9 +140,7 @@ func newDecbufUvarintAt(bs ByteSlice, off int) decbuf {
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
@ -218,31 +213,6 @@ func (d *decbuf) be32() uint32 {
return x
}
func (d *decbuf) byte() byte {
if d.e != nil {
return 0
}
if len(d.b) < 1 {
d.e = errInvalidSize
return 0
}
x := d.b[0]
d.b = d.b[1:]
return x
}
func (d *decbuf) decbuf(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error { return d.e }
func (d *decbuf) len() int { return len(d.b) }
func (d *decbuf) get() []byte { return d.b }

View file

@ -754,7 +754,7 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
symbolSlice []string
symbols = map[uint32]string{}
)
if version == 2 {
if version == FormatV2 {
symbolSlice = make([]string, 0, cnt)
}

View file

@ -306,7 +306,6 @@ func Intersect(its ...Postings) Postings {
type intersectPostings struct {
a, b Postings
aok, bok bool
cur uint64
}

View file

@ -61,18 +61,6 @@ func TestMemPostings_ensureOrder(t *testing.T) {
}
}
type mockPostings struct {
next func() bool
seek func(uint64) bool
value func() uint64
err func() error
}
func (m *mockPostings) Next() bool { return m.next() }
func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) }
func (m *mockPostings) Value() uint64 { return m.value() }
func (m *mockPostings) Err() error { return m.err() }
func TestIntersect(t *testing.T) {
var cases = []struct {
a, b []uint64
@ -300,8 +288,6 @@ func TestMergedPostingsSeek(t *testing.T) {
testutil.Equals(t, c.res, lst)
}
}
return
}
func TestRemovedPostings(t *testing.T) {
@ -463,8 +449,6 @@ func TestRemovedPostingsSeek(t *testing.T) {
testutil.Equals(t, c.res, lst)
}
}
return
}
func TestBigEndian(t *testing.T) {

View file

@ -56,18 +56,6 @@ func newMockSeriesSet(list []Series) *mockSeriesSet {
}
}
type mockSeriesIterator struct {
seek func(int64) bool
at func() (int64, float64)
next func() bool
err func() error
}
func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) }
func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
func (m *mockSeriesIterator) Next() bool { return m.next() }
func (m *mockSeriesIterator) Err() error { return m.err() }
func TestMergedSeriesSet(t *testing.T) {
cases := []struct {
@ -399,8 +387,6 @@ Outer:
testutil.Equals(t, smplExp, smplRes)
}
}
return
}
func TestBlockQuerierDelete(t *testing.T) {
@ -563,8 +549,6 @@ Outer:
testutil.Equals(t, smplExp, smplRes)
}
}
return
}
func TestBaseChunkSeries(t *testing.T) {
@ -661,8 +645,6 @@ func TestBaseChunkSeries(t *testing.T) {
testutil.Equals(t, len(tc.expIdxs), i)
testutil.Ok(t, bcs.Err())
}
return
}
// TODO: Remove after simpleSeries is merged
@ -986,8 +968,6 @@ func TestSeriesIterator(t *testing.T) {
}
})
})
return
}
// Regression for: https://github.com/prometheus/tsdb/pull/97
@ -1088,7 +1068,6 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
maxt: 15,
}
testutil.Assert(t, p.Next() == false, "")
return
}
type mockChunkSeriesSet struct {

View file

@ -1,2 +0,0 @@
# Enable only "legacy" staticcheck verifications.
checks = [ "SA*" ]

View file

@ -66,6 +66,15 @@ func Equals(tb testing.TB, exp, act interface{}, msgAndArgs ...interface{}) {
}
}
// NotEquals fails the test if exp is equal to act.
func NotEquals(tb testing.TB, exp, act interface{}) {
if reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: Expected different exp and got\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act)
tb.FailNow()
}
}
func formatMessage(msgAndArgs []interface{}) string {
if len(msgAndArgs) == 0 {
return ""

View file

@ -120,7 +120,6 @@ func TestAddingNewIntervals(t *testing.T) {
testutil.Equals(t, c.exp, c.exist.add(c.new))
}
return
}
// TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines.

View file

@ -226,7 +226,6 @@ func TestReader_Live(t *testing.T) {
func TestWAL_FuzzWriteRead_Live(t *testing.T) {
const count = 5000
const segmentSize = int64(128 * 1024 * 1204)
var input [][]byte
lock := sync.RWMutex{}
var recs [][]byte