mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Merge branch 'master' into deletes-1
Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
commit
8434019ad9
6
block.go
6
block.go
|
@ -15,7 +15,6 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -80,9 +79,6 @@ type BlockMeta struct {
|
||||||
// Unique identifier for the block and its contents. Changes on compaction.
|
// Unique identifier for the block and its contents. Changes on compaction.
|
||||||
ULID ulid.ULID `json:"ulid"`
|
ULID ulid.ULID `json:"ulid"`
|
||||||
|
|
||||||
// Sequence number of the block.
|
|
||||||
Sequence int `json:"sequence"`
|
|
||||||
|
|
||||||
// MinTime and MaxTime specify the time range all samples
|
// MinTime and MaxTime specify the time range all samples
|
||||||
// in the block are in.
|
// in the block are in.
|
||||||
MinTime int64 `json:"minTime"`
|
MinTime int64 `json:"minTime"`
|
||||||
|
@ -214,7 +210,7 @@ func (pb *persistedBlock) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) String() string {
|
func (pb *persistedBlock) String() string {
|
||||||
return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID)
|
return pb.meta.ULID.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
|
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
build:
|
build:
|
||||||
@go1.8rc1 build .
|
@go build .
|
||||||
|
|
||||||
bench: build
|
bench: build
|
||||||
@echo ">> running benchmark"
|
@echo ">> running benchmark"
|
||||||
|
@ -8,3 +8,4 @@ bench: build
|
||||||
@go tool pprof --inuse_space -svg ./tsdb benchout/mem.prof > benchout/memprof.inuse.svg
|
@go tool pprof --inuse_space -svg ./tsdb benchout/mem.prof > benchout/memprof.inuse.svg
|
||||||
@go tool pprof --alloc_space -svg ./tsdb benchout/mem.prof > benchout/memprof.alloc.svg
|
@go tool pprof --alloc_space -svg ./tsdb benchout/mem.prof > benchout/memprof.alloc.svg
|
||||||
@go tool pprof -svg ./tsdb benchout/block.prof > benchout/blockprof.svg
|
@go tool pprof -svg ./tsdb benchout/block.prof > benchout/blockprof.svg
|
||||||
|
@go tool pprof -svg ./tsdb benchout/mutex.prof > benchout/mutexprof.svg
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
promlabels "github.com/prometheus/prometheus/pkg/labels"
|
promlabels "github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/textparse"
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
"github.com/prometheus/tsdb"
|
"github.com/prometheus/tsdb"
|
||||||
|
@ -73,6 +74,7 @@ type writeBenchmark struct {
|
||||||
cpuprof *os.File
|
cpuprof *os.File
|
||||||
memprof *os.File
|
memprof *os.File
|
||||||
blockprof *os.File
|
blockprof *os.File
|
||||||
|
mtxprof *os.File
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBenchWriteCommand() *cobra.Command {
|
func NewBenchWriteCommand() *cobra.Command {
|
||||||
|
@ -113,7 +115,6 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
||||||
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||||
MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds
|
MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||||
AppendableBlocks: 2,
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exitWithError(err)
|
exitWithError(err)
|
||||||
|
@ -196,7 +197,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount
|
||||||
type sample struct {
|
type sample struct {
|
||||||
labels labels.Labels
|
labels labels.Labels
|
||||||
value int64
|
value int64
|
||||||
ref *uint64
|
ref *string
|
||||||
}
|
}
|
||||||
|
|
||||||
scrape := make([]*sample, 0, len(metrics))
|
scrape := make([]*sample, 0, len(metrics))
|
||||||
|
@ -224,7 +225,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount
|
||||||
s.ref = &ref
|
s.ref = &ref
|
||||||
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
|
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
|
||||||
|
|
||||||
if err.Error() != "not found" {
|
if errors.Cause(err) != tsdb.ErrNotFound {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,14 +260,20 @@ func (b *writeBenchmark) startProfiling() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err))
|
exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err))
|
||||||
}
|
}
|
||||||
runtime.MemProfileRate = 4096
|
runtime.MemProfileRate = 64 * 1024
|
||||||
|
|
||||||
// Start fatal profiling.
|
// Start fatal profiling.
|
||||||
b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof"))
|
b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exitWithError(fmt.Errorf("bench: could not create block profile: %v", err))
|
exitWithError(fmt.Errorf("bench: could not create block profile: %v", err))
|
||||||
}
|
}
|
||||||
runtime.SetBlockProfileRate(1)
|
runtime.SetBlockProfileRate(20)
|
||||||
|
|
||||||
|
b.mtxprof, err = os.Create(filepath.Join(b.outPath, "mutex.prof"))
|
||||||
|
if err != nil {
|
||||||
|
exitWithError(fmt.Errorf("bench: could not create mutex profile: %v", err))
|
||||||
|
}
|
||||||
|
runtime.SetMutexProfileFraction(20)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *writeBenchmark) stopProfiling() {
|
func (b *writeBenchmark) stopProfiling() {
|
||||||
|
@ -286,6 +293,12 @@ func (b *writeBenchmark) stopProfiling() {
|
||||||
b.blockprof = nil
|
b.blockprof = nil
|
||||||
runtime.SetBlockProfileRate(0)
|
runtime.SetBlockProfileRate(0)
|
||||||
}
|
}
|
||||||
|
if b.mtxprof != nil {
|
||||||
|
pprof.Lookup("mutex").WriteTo(b.mtxprof, 0)
|
||||||
|
b.mtxprof.Close()
|
||||||
|
b.mtxprof = nil
|
||||||
|
runtime.SetMutexProfileFraction(0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func measureTime(stage string, f func()) time.Duration {
|
func measureTime(stage string, f func()) time.Duration {
|
||||||
|
|
71
compact.go
71
compact.go
|
@ -18,6 +18,7 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
@ -35,10 +36,10 @@ type Compactor interface {
|
||||||
// Plan returns a set of non-overlapping directories that can
|
// Plan returns a set of non-overlapping directories that can
|
||||||
// be compacted concurrently.
|
// be compacted concurrently.
|
||||||
// Results returned when compactions are in progress are undefined.
|
// Results returned when compactions are in progress are undefined.
|
||||||
Plan(dir string) ([][]string, error)
|
Plan() ([][]string, error)
|
||||||
|
|
||||||
// Write persists a Block into a directory.
|
// Write persists a Block into a directory.
|
||||||
Write(dir string, b Block) error
|
Write(b Block) error
|
||||||
|
|
||||||
// Compact runs compaction against the provided directories. Must
|
// Compact runs compaction against the provided directories. Must
|
||||||
// only be called concurrently with results of Plan().
|
// only be called concurrently with results of Plan().
|
||||||
|
@ -47,6 +48,7 @@ type Compactor interface {
|
||||||
|
|
||||||
// compactor implements the Compactor interface.
|
// compactor implements the Compactor interface.
|
||||||
type compactor struct {
|
type compactor struct {
|
||||||
|
dir string
|
||||||
metrics *compactorMetrics
|
metrics *compactorMetrics
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
opts *compactorOptions
|
opts *compactorOptions
|
||||||
|
@ -88,8 +90,9 @@ type compactorOptions struct {
|
||||||
maxBlockRange uint64
|
maxBlockRange uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
||||||
return &compactor{
|
return &compactor{
|
||||||
|
dir: dir,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
logger: l,
|
logger: l,
|
||||||
metrics: newCompactorMetrics(r),
|
metrics: newCompactorMetrics(r),
|
||||||
|
@ -104,13 +107,18 @@ type compactionInfo struct {
|
||||||
|
|
||||||
const compactionBlocksLen = 3
|
const compactionBlocksLen = 3
|
||||||
|
|
||||||
func (c *compactor) Plan(dir string) ([][]string, error) {
|
type dirMeta struct {
|
||||||
dirs, err := blockDirs(dir)
|
dir string
|
||||||
|
meta *BlockMeta
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *compactor) Plan() ([][]string, error) {
|
||||||
|
dirs, err := blockDirs(c.dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var bs []*BlockMeta
|
var dms []dirMeta
|
||||||
|
|
||||||
for _, dir := range dirs {
|
for _, dir := range dirs {
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
|
@ -118,25 +126,28 @@ func (c *compactor) Plan(dir string) ([][]string, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if meta.Compaction.Generation > 0 {
|
if meta.Compaction.Generation > 0 {
|
||||||
bs = append(bs, meta)
|
dms = append(dms, dirMeta{dir, meta})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sort.Slice(dms, func(i, j int) bool {
|
||||||
|
return dms[i].meta.MinTime < dms[j].meta.MinTime
|
||||||
|
})
|
||||||
|
|
||||||
if len(bs) == 0 {
|
if len(dms) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sliceDirs := func(i, j int) [][]string {
|
sliceDirs := func(i, j int) [][]string {
|
||||||
var res []string
|
var res []string
|
||||||
for k := i; k < j; k++ {
|
for k := i; k < j; k++ {
|
||||||
res = append(res, dirs[k])
|
res = append(res, dms[k].dir)
|
||||||
}
|
}
|
||||||
return [][]string{res}
|
return [][]string{res}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then we care about compacting multiple blocks, starting with the oldest.
|
// Then we care about compacting multiple blocks, starting with the oldest.
|
||||||
for i := 0; i < len(bs)-compactionBlocksLen+1; i++ {
|
for i := 0; i < len(dms)-compactionBlocksLen+1; i++ {
|
||||||
if c.match(bs[i : i+3]) {
|
if c.match(dms[i : i+3]) {
|
||||||
return sliceDirs(i, i+compactionBlocksLen), nil
|
return sliceDirs(i, i+compactionBlocksLen), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -144,26 +155,22 @@ func (c *compactor) Plan(dir string) ([][]string, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) match(bs []*BlockMeta) bool {
|
func (c *compactor) match(dirs []dirMeta) bool {
|
||||||
g := bs[0].Compaction.Generation
|
g := dirs[0].meta.Compaction.Generation
|
||||||
|
|
||||||
for _, b := range bs {
|
for _, d := range dirs {
|
||||||
if b.Compaction.Generation != g {
|
if d.meta.Compaction.Generation != g {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange
|
return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
|
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
|
||||||
m0 := blocks[0].Meta()
|
m0 := blocks[0].Meta()
|
||||||
|
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
||||||
|
|
||||||
res.Sequence = m0.Sequence
|
|
||||||
res.MinTime = m0.MinTime
|
res.MinTime = m0.MinTime
|
||||||
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
|
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
|
||||||
res.ULID = ulid.MustNew(ulid.Now(), entropy)
|
|
||||||
|
|
||||||
res.Compaction.Generation = m0.Compaction.Generation + 1
|
res.Compaction.Generation = m0.Compaction.Generation + 1
|
||||||
|
|
||||||
|
@ -186,16 +193,27 @@ func (c *compactor) Compact(dirs ...string) (err error) {
|
||||||
blocks = append(blocks, b)
|
blocks = append(blocks, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.write(dirs[0], blocks...)
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
|
return c.write(uid, blocks...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) Write(dir string, b Block) error {
|
func (c *compactor) Write(b Block) error {
|
||||||
return c.write(dir, b)
|
// Buffering blocks might have been created that often have no data.
|
||||||
|
if b.Meta().Stats.NumSeries == 0 {
|
||||||
|
return errors.Wrap(os.RemoveAll(b.Dir()), "remove empty block")
|
||||||
|
}
|
||||||
|
|
||||||
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
|
return c.write(uid, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write creates a new block that is the union of the provided blocks into dir.
|
// write creates a new block that is the union of the provided blocks into dir.
|
||||||
// It cleans up all files of the old blocks after completing successfully.
|
// It cleans up all files of the old blocks after completing successfully.
|
||||||
func (c *compactor) write(dir string, blocks ...Block) (err error) {
|
func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
|
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
|
||||||
|
|
||||||
defer func(t time.Time) {
|
defer func(t time.Time) {
|
||||||
|
@ -205,6 +223,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
|
||||||
c.metrics.duration.Observe(time.Since(t).Seconds())
|
c.metrics.duration.Observe(time.Since(t).Seconds())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
|
dir := filepath.Join(c.dir, uid.String())
|
||||||
tmp := dir + ".tmp"
|
tmp := dir + ".tmp"
|
||||||
|
|
||||||
if err = os.RemoveAll(tmp); err != nil {
|
if err = os.RemoveAll(tmp); err != nil {
|
||||||
|
@ -230,6 +249,8 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "write compaction")
|
return errors.Wrap(err, "write compaction")
|
||||||
}
|
}
|
||||||
|
meta.ULID = uid
|
||||||
|
|
||||||
if err = writeMetaFile(tmp, meta); err != nil {
|
if err = writeMetaFile(tmp, meta); err != nil {
|
||||||
return errors.Wrap(err, "write merged meta")
|
return errors.Wrap(err, "write merged meta")
|
||||||
}
|
}
|
||||||
|
@ -254,7 +275,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
|
||||||
if err := renameFile(tmp, dir); err != nil {
|
if err := renameFile(tmp, dir); err != nil {
|
||||||
return errors.Wrap(err, "rename block dir")
|
return errors.Wrap(err, "rename block dir")
|
||||||
}
|
}
|
||||||
for _, b := range blocks[1:] {
|
for _, b := range blocks {
|
||||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
if err := os.RemoveAll(b.Dir()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
358
db.go
358
db.go
|
@ -22,6 +22,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -33,6 +34,7 @@ import (
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/nightlyone/lockfile"
|
"github.com/nightlyone/lockfile"
|
||||||
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
|
@ -45,7 +47,6 @@ var DefaultOptions = &Options{
|
||||||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||||
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||||
AppendableBlocks: 2,
|
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,13 +65,6 @@ type Options struct {
|
||||||
// The maximum timestamp range of compacted blocks.
|
// The maximum timestamp range of compacted blocks.
|
||||||
MaxBlockDuration uint64
|
MaxBlockDuration uint64
|
||||||
|
|
||||||
// Number of head blocks that can be appended to.
|
|
||||||
// Should be two or higher to prevent write errors in general scenarios.
|
|
||||||
//
|
|
||||||
// After a new block is started for timestamp t0 or higher, appends with
|
|
||||||
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
|
|
||||||
AppendableBlocks int
|
|
||||||
|
|
||||||
// NoLockfile disables creation and consideration of a lock file.
|
// NoLockfile disables creation and consideration of a lock file.
|
||||||
NoLockfile bool
|
NoLockfile bool
|
||||||
}
|
}
|
||||||
|
@ -86,11 +80,11 @@ type Appender interface {
|
||||||
// Returned reference numbers are ephemeral and may be rejected in calls
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
||||||
// to AddFast() at any point. Adding the sample via Add() returns a new
|
// to AddFast() at any point. Adding the sample via Add() returns a new
|
||||||
// reference number.
|
// reference number.
|
||||||
Add(l labels.Labels, t int64, v float64) (uint64, error)
|
Add(l labels.Labels, t int64, v float64) (string, error)
|
||||||
|
|
||||||
// Add adds a sample pair for the referenced series. It is generally faster
|
// Add adds a sample pair for the referenced series. It is generally faster
|
||||||
// than adding a sample by providing its full label set.
|
// than adding a sample by providing its full label set.
|
||||||
AddFast(ref uint64, t int64, v float64) error
|
AddFast(ref string, t int64, v float64) error
|
||||||
|
|
||||||
// Commit submits the collected samples and purges the batch.
|
// Commit submits the collected samples and purges the batch.
|
||||||
Commit() error
|
Commit() error
|
||||||
|
@ -162,11 +156,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
absdir, err := filepath.Abs(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if l == nil {
|
if l == nil {
|
||||||
l = log.NewLogfmtLogger(os.Stdout)
|
l = log.NewLogfmtLogger(os.Stdout)
|
||||||
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
||||||
|
@ -175,9 +164,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = DefaultOptions
|
opts = DefaultOptions
|
||||||
}
|
}
|
||||||
if opts.AppendableBlocks < 1 {
|
|
||||||
return nil, errors.Errorf("AppendableBlocks must be greater than 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
db = &DB{
|
db = &DB{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
|
@ -189,6 +175,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
}
|
}
|
||||||
if !opts.NoLockfile {
|
if !opts.NoLockfile {
|
||||||
|
absdir, err := filepath.Abs(dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
|
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -199,7 +189,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
db.lockf = &lockf
|
db.lockf = &lockf
|
||||||
}
|
}
|
||||||
|
|
||||||
db.compactor = newCompactor(r, l, &compactorOptions{
|
db.compactor = newCompactor(dir, r, l, &compactorOptions{
|
||||||
maxBlockRange: opts.MaxBlockDuration,
|
maxBlockRange: opts.MaxBlockDuration,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -287,8 +277,8 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
// returning the lock to not block Appenders.
|
// returning the lock to not block Appenders.
|
||||||
// Selected blocks are semantically ensured to not be written to afterwards
|
// Selected blocks are semantically ensured to not be written to afterwards
|
||||||
// by appendable().
|
// by appendable().
|
||||||
if len(db.heads) > db.opts.AppendableBlocks {
|
if len(db.heads) > 2 {
|
||||||
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] {
|
for _, h := range db.heads[:len(db.heads)-2] {
|
||||||
// Blocks that won't be appendable when instantiating a new appender
|
// Blocks that won't be appendable when instantiating a new appender
|
||||||
// might still have active appenders on them.
|
// might still have active appenders on them.
|
||||||
// Abort at the first one we encounter.
|
// Abort at the first one we encounter.
|
||||||
|
@ -308,7 +298,7 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = db.compactor.Write(h.Dir(), h); err != nil {
|
if err = db.compactor.Write(h); err != nil {
|
||||||
return changes, errors.Wrap(err, "persist head block")
|
return changes, errors.Wrap(err, "persist head block")
|
||||||
}
|
}
|
||||||
changes = true
|
changes = true
|
||||||
|
@ -317,7 +307,7 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
|
|
||||||
// Check for compactions of multiple blocks.
|
// Check for compactions of multiple blocks.
|
||||||
for {
|
for {
|
||||||
plans, err := db.compactor.Plan(db.dir)
|
plans, err := db.compactor.Plan()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return changes, errors.Wrap(err, "plan compaction")
|
return changes, errors.Wrap(err, "plan compaction")
|
||||||
}
|
}
|
||||||
|
@ -381,9 +371,9 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
|
||||||
return changes, fileutil.Fsync(df)
|
return changes, fileutil.Fsync(df)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) seqBlock(i int) (Block, bool) {
|
func (db *DB) getBlock(id ulid.ULID) (Block, bool) {
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
if b.Meta().Sequence == i {
|
if b.Meta().ULID == id {
|
||||||
return b, true
|
return b, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -405,10 +395,8 @@ func (db *DB) reloadBlocks() error {
|
||||||
return errors.Wrap(err, "find blocks")
|
return errors.Wrap(err, "find blocks")
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
metas []*BlockMeta
|
blocks []Block
|
||||||
blocks []Block
|
exist = map[ulid.ULID]struct{}{}
|
||||||
heads []headBlock
|
|
||||||
seqBlocks = make(map[int]Block, len(dirs))
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, dir := range dirs {
|
for _, dir := range dirs {
|
||||||
|
@ -416,47 +404,58 @@ func (db *DB) reloadBlocks() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "read meta information %s", dir)
|
return errors.Wrapf(err, "read meta information %s", dir)
|
||||||
}
|
}
|
||||||
metas = append(metas, meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, meta := range metas {
|
b, ok := db.getBlock(meta.ULID)
|
||||||
b, ok := db.seqBlock(meta.Sequence)
|
if !ok {
|
||||||
|
if meta.Compaction.Generation == 0 {
|
||||||
if meta.Compaction.Generation == 0 {
|
b, err = db.openHeadBlock(dir)
|
||||||
if !ok {
|
} else {
|
||||||
b, err = db.openHeadBlock(dirs[i])
|
b, err = newPersistedBlock(dir)
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "load head at %s", dirs[i])
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if meta.ULID != b.Meta().ULID {
|
if err != nil {
|
||||||
return errors.Errorf("head block ULID changed unexpectedly")
|
return errors.Wrapf(err, "open block %s", dir)
|
||||||
}
|
|
||||||
heads = append(heads, b.(headBlock))
|
|
||||||
} else {
|
|
||||||
if !ok || meta.ULID != b.Meta().ULID {
|
|
||||||
b, err = newPersistedBlock(dirs[i])
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "open persisted block %s", dirs[i])
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
seqBlocks[meta.Sequence] = b
|
|
||||||
blocks = append(blocks, b)
|
blocks = append(blocks, b)
|
||||||
|
exist[meta.ULID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all blocks that we no longer need. They are closed after returning all
|
if err := validateBlockSequence(blocks); err != nil {
|
||||||
// locks to avoid questionable locking order.
|
return errors.Wrap(err, "invalid block sequence")
|
||||||
|
}
|
||||||
|
// Close all opened blocks that no longer exist after we returned all locks.
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
if nb, ok := seqBlocks[b.Meta().Sequence]; !ok || nb != b {
|
if _, ok := exist[b.Meta().ULID]; !ok {
|
||||||
cs = append(cs, b)
|
cs = append(cs, b)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
db.blocks = blocks
|
db.blocks = blocks
|
||||||
db.heads = heads
|
db.heads = nil
|
||||||
|
|
||||||
|
for _, b := range blocks {
|
||||||
|
if b.Meta().Compaction.Generation == 0 {
|
||||||
|
db.heads = append(db.heads, b.(*HeadBlock))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateBlockSequence(bs []Block) error {
|
||||||
|
if len(bs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sort.Slice(bs, func(i, j int) bool {
|
||||||
|
return bs[i].Meta().MinTime < bs[j].Meta().MinTime
|
||||||
|
})
|
||||||
|
prev := bs[0]
|
||||||
|
for _, b := range bs[1:] {
|
||||||
|
if b.Meta().MinTime < prev.Meta().MaxTime {
|
||||||
|
return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -488,27 +487,7 @@ func (db *DB) Close() error {
|
||||||
// Appender returns a new Appender on the database.
|
// Appender returns a new Appender on the database.
|
||||||
func (db *DB) Appender() Appender {
|
func (db *DB) Appender() Appender {
|
||||||
db.mtx.RLock()
|
db.mtx.RLock()
|
||||||
a := &dbAppender{db: db}
|
return &dbAppender{db: db}
|
||||||
|
|
||||||
// XXX(fabxc): turn off creating initial appender as it will happen on-demand
|
|
||||||
// anyway. For now this, with combination of only having a single timestamp per batch,
|
|
||||||
// prevents opening more than one appender and hitting an unresolved deadlock (#11).
|
|
||||||
//
|
|
||||||
|
|
||||||
// Only instantiate appender after returning the headmtx to avoid
|
|
||||||
// questionable locking order.
|
|
||||||
db.headmtx.RLock()
|
|
||||||
app := db.appendable()
|
|
||||||
db.headmtx.RUnlock()
|
|
||||||
|
|
||||||
for _, b := range app {
|
|
||||||
a.heads = append(a.heads, &metaAppender{
|
|
||||||
meta: b.Meta(),
|
|
||||||
app: b.Appender(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return a
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type dbAppender struct {
|
type dbAppender struct {
|
||||||
|
@ -523,34 +502,39 @@ type metaAppender struct {
|
||||||
app Appender
|
app Appender
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||||
h, err := a.appenderFor(t)
|
h, err := a.appenderAt(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return "", err
|
||||||
}
|
}
|
||||||
ref, err := h.app.Add(lset, t, v)
|
ref, err := h.app.Add(lset, t, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return "", err
|
||||||
}
|
}
|
||||||
a.samples++
|
a.samples++
|
||||||
// Store last byte of sequence number in 3rd byte of reference.
|
|
||||||
return ref | (uint64(h.meta.Sequence&0xff) << 40), nil
|
return string(append(h.meta.ULID[:], ref...)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
|
func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
// Load the head last byte of the head sequence from the 3rd byte of the
|
if len(ref) < 16 {
|
||||||
// reference number.
|
return errors.Wrap(ErrNotFound, "invalid ref length")
|
||||||
gen := (ref << 16) >> 56
|
}
|
||||||
|
// The first 16 bytes a ref hold the ULID of the head block.
|
||||||
h, err := a.appenderFor(t)
|
h, err := a.appenderAt(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// If the last byte of the sequence does not add up, the reference is not valid.
|
// Validate the ref points to the same block we got for t.
|
||||||
if uint64(h.meta.Sequence&0xff) != gen {
|
if string(h.meta.ULID[:]) != ref[:16] {
|
||||||
return ErrNotFound
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
if err := h.app.AddFast(ref, t, v); err != nil {
|
if err := h.app.AddFast(ref[16:], t, v); err != nil {
|
||||||
|
// The block the ref points to might fit the given timestamp.
|
||||||
|
// We mask the error to stick with our contract.
|
||||||
|
if errors.Cause(err) == ErrOutOfBounds {
|
||||||
|
err = ErrNotFound
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -560,85 +544,84 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
|
|
||||||
// appenderFor gets the appender for the head containing timestamp t.
|
// appenderFor gets the appender for the head containing timestamp t.
|
||||||
// If the head block doesn't exist yet, it gets created.
|
// If the head block doesn't exist yet, it gets created.
|
||||||
func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
|
func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
|
||||||
// If there's no fitting head block for t, ensure it gets created.
|
for _, h := range a.heads {
|
||||||
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
|
if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) {
|
||||||
a.db.headmtx.Lock()
|
|
||||||
|
|
||||||
var newHeads []headBlock
|
|
||||||
|
|
||||||
if err := a.db.ensureHead(t); err != nil {
|
|
||||||
a.db.headmtx.Unlock()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(a.heads) == 0 {
|
|
||||||
newHeads = append(newHeads, a.db.appendable()...)
|
|
||||||
} else {
|
|
||||||
maxSeq := a.heads[len(a.heads)-1].meta.Sequence
|
|
||||||
for _, b := range a.db.appendable() {
|
|
||||||
if b.Meta().Sequence > maxSeq {
|
|
||||||
newHeads = append(newHeads, b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
a.db.headmtx.Unlock()
|
|
||||||
|
|
||||||
// XXX(fabxc): temporary workaround. See comment on instantiating DB.Appender.
|
|
||||||
// for _, b := range newHeads {
|
|
||||||
// // Only get appender for the block with the specific timestamp.
|
|
||||||
// if t >= b.Meta().MaxTime {
|
|
||||||
// continue
|
|
||||||
// }
|
|
||||||
// a.heads = append(a.heads, &metaAppender{
|
|
||||||
// app: b.Appender(),
|
|
||||||
// meta: b.Meta(),
|
|
||||||
// })
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Instantiate appenders after returning headmtx to avoid questionable
|
|
||||||
// locking order.
|
|
||||||
for _, b := range newHeads {
|
|
||||||
a.heads = append(a.heads, &metaAppender{
|
|
||||||
app: b.Appender(),
|
|
||||||
meta: b.Meta(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for i := len(a.heads) - 1; i >= 0; i-- {
|
|
||||||
if h := a.heads[i]; t >= h.meta.MinTime {
|
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Currently opened appenders do not cover t. Ensure the head block is
|
||||||
|
// created and add missing appenders.
|
||||||
|
a.db.headmtx.Lock()
|
||||||
|
|
||||||
return nil, ErrNotFound
|
if err := a.db.ensureHead(t); err != nil {
|
||||||
|
a.db.headmtx.Unlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var hb headBlock
|
||||||
|
for _, h := range a.db.appendable() {
|
||||||
|
m := h.Meta()
|
||||||
|
|
||||||
|
if intervalContains(m.MinTime, m.MaxTime-1, t) {
|
||||||
|
hb = h
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
a.db.headmtx.Unlock()
|
||||||
|
|
||||||
|
if hb == nil {
|
||||||
|
return nil, ErrOutOfBounds
|
||||||
|
}
|
||||||
|
// Instantiate appender after returning headmtx!
|
||||||
|
app := &metaAppender{
|
||||||
|
meta: hb.Meta(),
|
||||||
|
app: hb.Appender(),
|
||||||
|
}
|
||||||
|
a.heads = append(a.heads, app)
|
||||||
|
|
||||||
|
return app, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
||||||
|
mint = (t / width) * width
|
||||||
|
return mint, mint + width
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureHead makes sure that there is a head block for the timestamp t if
|
// ensureHead makes sure that there is a head block for the timestamp t if
|
||||||
// it is within or after the currently appendable window.
|
// it is within or after the currently appendable window.
|
||||||
func (db *DB) ensureHead(t int64) error {
|
func (db *DB) ensureHead(t int64) error {
|
||||||
// Initial case for a new database: we must create the first
|
var (
|
||||||
// AppendableBlocks-1 front padding heads.
|
mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
||||||
if len(db.heads) == 0 {
|
addBuffer = len(db.blocks) == 0
|
||||||
for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- {
|
last BlockMeta
|
||||||
if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil {
|
)
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
if !addBuffer {
|
||||||
h := db.heads[len(db.heads)-1]
|
last = db.blocks[len(db.blocks)-1].Meta()
|
||||||
m := h.Meta()
|
addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration)
|
||||||
// If t doesn't exceed the range of heads blocks, there's nothing to do.
|
}
|
||||||
if t < m.MaxTime {
|
// Create another block of buffer in front if the DB is initialized or retrieving
|
||||||
return nil
|
// new data after a long gap.
|
||||||
}
|
// This ensures we always have a full block width if append window.
|
||||||
if _, err := db.cut(m.MaxTime); err != nil {
|
if addBuffer {
|
||||||
|
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// If the previous block reaches into our new window, make it smaller.
|
||||||
|
} else if mt := last.MaxTime; mt > mint {
|
||||||
|
mint = mt
|
||||||
}
|
}
|
||||||
|
if mint >= maxt {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Error if the requested time for a head is before the appendable window.
|
||||||
|
if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime {
|
||||||
|
return ErrOutOfBounds
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := db.createHeadBlock(mint, maxt)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) Commit() error {
|
func (a *dbAppender) Commit() error {
|
||||||
|
@ -646,14 +629,22 @@ func (a *dbAppender) Commit() error {
|
||||||
|
|
||||||
// Commits to partial appenders must be concurrent as concurrent appenders
|
// Commits to partial appenders must be concurrent as concurrent appenders
|
||||||
// may have conflicting locks on head appenders.
|
// may have conflicting locks on head appenders.
|
||||||
// XXX(fabxc): is this a leaky abstraction? Should make an effort to catch a multi-error?
|
// For high-throughput use cases the errgroup causes significant blocking. Typically,
|
||||||
var g errgroup.Group
|
// we just deal with a single appender and special case it.
|
||||||
|
var err error
|
||||||
|
|
||||||
for _, h := range a.heads {
|
switch len(a.heads) {
|
||||||
g.Go(h.app.Commit)
|
case 1:
|
||||||
|
err = a.heads[0].app.Commit()
|
||||||
|
default:
|
||||||
|
var g errgroup.Group
|
||||||
|
for _, h := range a.heads {
|
||||||
|
g.Go(h.app.Commit)
|
||||||
|
}
|
||||||
|
err = g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := g.Wait(); err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// XXX(fabxc): Push the metric down into head block to account properly
|
// XXX(fabxc): Push the metric down into head block to account properly
|
||||||
|
@ -701,14 +692,15 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
||||||
func (db *DB) appendable() []headBlock {
|
func (db *DB) appendable() (r []headBlock) {
|
||||||
var i int
|
switch len(db.heads) {
|
||||||
app := make([]headBlock, 0, db.opts.AppendableBlocks)
|
case 0:
|
||||||
|
case 1:
|
||||||
if len(db.heads) > db.opts.AppendableBlocks {
|
r = append(r, db.heads[0])
|
||||||
i = len(db.heads) - db.opts.AppendableBlocks
|
default:
|
||||||
|
r = append(r, db.heads[len(db.heads)-2:]...)
|
||||||
}
|
}
|
||||||
return append(app, db.heads[i:]...)
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||||
|
@ -741,7 +733,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
|
||||||
// openHeadBlock opens the head block at dir.
|
// openHeadBlock opens the head block at dir.
|
||||||
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
|
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
|
||||||
var (
|
var (
|
||||||
wdir = filepath.Join(dir, "wal")
|
wdir = walDir(dir)
|
||||||
l = log.With(db.logger, "wal", wdir)
|
l = log.With(db.logger, "wal", wdir)
|
||||||
)
|
)
|
||||||
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
|
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
|
||||||
|
@ -756,16 +748,10 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// cut starts a new head block to append to. The completed head block
|
// createHeadBlock starts a new head block to append to.
|
||||||
// will still be appendable for the configured grace period.
|
func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) {
|
||||||
func (db *DB) cut(mint int64) (headBlock, error) {
|
dir, err := TouchHeadBlock(db.dir, mint, maxt)
|
||||||
maxt := mint + int64(db.opts.MinBlockDuration)
|
|
||||||
|
|
||||||
dir, seq, err := nextSequenceFile(db.dir, "b-")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "touch head block %s", dir)
|
return nil, errors.Wrapf(err, "touch head block %s", dir)
|
||||||
}
|
}
|
||||||
newHead, err := db.openHeadBlock(dir)
|
newHead, err := db.openHeadBlock(dir)
|
||||||
|
@ -788,13 +774,8 @@ func isBlockDir(fi os.FileInfo) bool {
|
||||||
if !fi.IsDir() {
|
if !fi.IsDir() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if !strings.HasPrefix(fi.Name(), "b-") {
|
_, err := ulid.Parse(fi.Name())
|
||||||
return false
|
return err == nil
|
||||||
}
|
|
||||||
if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func blockDirs(dir string) ([]string, error) {
|
func blockDirs(dir string) ([]string, error) {
|
||||||
|
@ -899,9 +880,8 @@ func (es MultiError) Err() error {
|
||||||
return es
|
return es
|
||||||
}
|
}
|
||||||
|
|
||||||
func yoloString(b []byte) string {
|
func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) }
|
||||||
return *((*string)(unsafe.Pointer(&b)))
|
func yoloBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) }
|
||||||
}
|
|
||||||
|
|
||||||
func closeAll(cs ...io.Closer) error {
|
func closeAll(cs ...io.Closer) error {
|
||||||
var merr MultiError
|
var merr MultiError
|
||||||
|
|
36
db_test.go
36
db_test.go
|
@ -18,6 +18,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -107,25 +108,36 @@ func TestDBAppenderAddRef(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
app := db.Appender()
|
app1 := db.Appender()
|
||||||
defer app.Rollback()
|
|
||||||
|
|
||||||
ref, err := app.Add(labels.FromStrings("a", "b"), 0, 0)
|
ref, err := app1.Add(labels.FromStrings("a", "b"), 0, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Head sequence number should be in 3rd MSB and be greater than 0.
|
// When a series is first created, refs don't work within that transaction.
|
||||||
gen := (ref << 16) >> 56
|
err = app1.AddFast(ref, 1, 1)
|
||||||
require.True(t, gen > 1)
|
require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
|
||||||
|
|
||||||
|
err = app1.Commit()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
app2 := db.Appender()
|
||||||
|
defer app2.Rollback()
|
||||||
|
|
||||||
|
ref, err = app2.Add(labels.FromStrings("a", "b"), 1, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Ref must be prefixed with block ULID of the block we wrote to.
|
||||||
|
id := db.blocks[len(db.blocks)-1].Meta().ULID
|
||||||
|
require.Equal(t, string(id[:]), ref[:16])
|
||||||
|
|
||||||
// Reference must be valid to add another sample.
|
// Reference must be valid to add another sample.
|
||||||
err = app.AddFast(ref, 1, 1)
|
err = app2.AddFast(ref, 2, 2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// AddFast for the same timestamp must fail if the generation in the reference
|
// AddFast for the same timestamp must fail if the generation in the reference
|
||||||
// doesn't add up.
|
// doesn't add up.
|
||||||
refBad := ref | ((gen + 1) << 4)
|
refb := []byte(ref)
|
||||||
err = app.AddFast(refBad, 1, 1)
|
refb[15] ^= refb[15]
|
||||||
require.Error(t, err)
|
err = app2.AddFast(string(refb), 1, 1)
|
||||||
|
require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
|
||||||
require.Equal(t, 2, app.(*dbAppender).samples)
|
|
||||||
}
|
}
|
||||||
|
|
167
head.go
167
head.go
|
@ -18,11 +18,14 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"encoding/binary"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -73,36 +76,36 @@ type HeadBlock struct {
|
||||||
|
|
||||||
// TouchHeadBlock atomically touches a new head block in dir for
|
// TouchHeadBlock atomically touches a new head block in dir for
|
||||||
// samples in the range [mint,maxt).
|
// samples in the range [mint,maxt).
|
||||||
func TouchHeadBlock(dir string, seq int, mint, maxt int64) error {
|
func TouchHeadBlock(dir string, mint, maxt int64) (string, error) {
|
||||||
// Make head block creation appear atomic.
|
|
||||||
tmp := dir + ".tmp"
|
|
||||||
|
|
||||||
if err := os.MkdirAll(tmp, 0777); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
ulid, err := ulid.New(ulid.Now(), entropy)
|
ulid, err := ulid.New(ulid.Now(), entropy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make head block creation appear atomic.
|
||||||
|
dir = filepath.Join(dir, ulid.String())
|
||||||
|
tmp := dir + ".tmp"
|
||||||
|
|
||||||
|
if err := os.MkdirAll(tmp, 0777); err != nil {
|
||||||
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := writeMetaFile(tmp, &BlockMeta{
|
if err := writeMetaFile(tmp, &BlockMeta{
|
||||||
ULID: ulid,
|
ULID: ulid,
|
||||||
Sequence: seq,
|
MinTime: mint,
|
||||||
MinTime: mint,
|
MaxTime: maxt,
|
||||||
MaxTime: maxt,
|
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write an empty tombstones file.
|
// Write an empty tombstones file.
|
||||||
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
|
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return renameFile(tmp, dir)
|
return dir, renameFile(tmp, dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenHeadBlock opens the head block in dir.
|
// OpenHeadBlock opens the head block in dir.
|
||||||
|
@ -171,7 +174,7 @@ func (h *HeadBlock) inBounds(t int64) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HeadBlock) String() string {
|
func (h *HeadBlock) String() string {
|
||||||
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
|
return h.meta.ULID.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close syncs all data and closes underlying resources of the head block.
|
// Close syncs all data and closes underlying resources of the head block.
|
||||||
|
@ -199,11 +202,10 @@ func (h *HeadBlock) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Meta implements headBlock
|
// Meta returns a BlockMeta for the head block.
|
||||||
func (h *HeadBlock) Meta() BlockMeta {
|
func (h *HeadBlock) Meta() BlockMeta {
|
||||||
m := BlockMeta{
|
m := BlockMeta{
|
||||||
ULID: h.meta.ULID,
|
ULID: h.meta.ULID,
|
||||||
Sequence: h.meta.Sequence,
|
|
||||||
MinTime: h.meta.MinTime,
|
MinTime: h.meta.MinTime,
|
||||||
MaxTime: h.meta.MaxTime,
|
MaxTime: h.meta.MaxTime,
|
||||||
Compaction: h.meta.Compaction,
|
Compaction: h.meta.Compaction,
|
||||||
|
@ -216,19 +218,7 @@ func (h *HeadBlock) Meta() BlockMeta {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dir implements headBlock.
|
// Tombstones returns the TombstoneReader against the block.
|
||||||
func (h *HeadBlock) Dir() string { return h.dir }
|
|
||||||
|
|
||||||
// Persisted implements headBlock.
|
|
||||||
func (h *HeadBlock) Persisted() bool { return false }
|
|
||||||
|
|
||||||
// Index implements headBlock.
|
|
||||||
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
|
|
||||||
|
|
||||||
// Chunks implements headBlock.
|
|
||||||
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
|
||||||
|
|
||||||
// Tombstones implements headBlock.
|
|
||||||
func (h *HeadBlock) Tombstones() TombstoneReader {
|
func (h *HeadBlock) Tombstones() TombstoneReader {
|
||||||
return h.tombstones.Copy()
|
return h.tombstones.Copy()
|
||||||
}
|
}
|
||||||
|
@ -266,7 +256,16 @@ Outer:
|
||||||
return writeTombstoneFile(h.dir, h.tombstones.Copy())
|
return writeTombstoneFile(h.dir, h.tombstones.Copy())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Querier implements Queryable and headBlock.
|
// Dir returns the directory of the block.
|
||||||
|
func (h *HeadBlock) Dir() string { return h.dir }
|
||||||
|
|
||||||
|
// Index returns an IndexReader against the block.
|
||||||
|
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
|
||||||
|
|
||||||
|
// Chunks returns a ChunkReader against the block.
|
||||||
|
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
||||||
|
|
||||||
|
// Querier returns a new Querier against the block for the range [mint, maxt].
|
||||||
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||||
h.mtx.RLock()
|
h.mtx.RLock()
|
||||||
defer h.mtx.RUnlock()
|
defer h.mtx.RUnlock()
|
||||||
|
@ -308,7 +307,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appender implements headBlock
|
// Appender returns a new Appender against the head block.
|
||||||
func (h *HeadBlock) Appender() Appender {
|
func (h *HeadBlock) Appender() Appender {
|
||||||
atomic.AddUint64(&h.activeWriters, 1)
|
atomic.AddUint64(&h.activeWriters, 1)
|
||||||
|
|
||||||
|
@ -320,7 +319,7 @@ func (h *HeadBlock) Appender() Appender {
|
||||||
return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
|
return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Busy implements headBlock
|
// Busy returns true if the block has open write transactions.
|
||||||
func (h *HeadBlock) Busy() bool {
|
func (h *HeadBlock) Busy() bool {
|
||||||
return atomic.LoadUint64(&h.activeWriters) > 0
|
return atomic.LoadUint64(&h.activeWriters) > 0
|
||||||
}
|
}
|
||||||
|
@ -342,74 +341,89 @@ func putHeadAppendBuffer(b []RefSample) {
|
||||||
type headAppender struct {
|
type headAppender struct {
|
||||||
*HeadBlock
|
*HeadBlock
|
||||||
|
|
||||||
newSeries map[uint64]hashedLabels
|
newSeries []*hashedLabels
|
||||||
newHashes map[uint64]uint64
|
|
||||||
refmap map[uint64]uint64
|
|
||||||
newLabels []labels.Labels
|
newLabels []labels.Labels
|
||||||
|
newHashes map[uint64]uint64
|
||||||
|
|
||||||
samples []RefSample
|
samples []RefSample
|
||||||
}
|
}
|
||||||
|
|
||||||
type hashedLabels struct {
|
type hashedLabels struct {
|
||||||
|
ref uint64
|
||||||
hash uint64
|
hash uint64
|
||||||
labels labels.Labels
|
labels labels.Labels
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||||
if !a.inBounds(t) {
|
if !a.inBounds(t) {
|
||||||
return 0, ErrOutOfBounds
|
return "", ErrOutOfBounds
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := lset.Hash()
|
hash := lset.Hash()
|
||||||
|
refb := make([]byte, 8)
|
||||||
|
|
||||||
|
// Series exists already in the block.
|
||||||
if ms := a.get(hash, lset); ms != nil {
|
if ms := a.get(hash, lset); ms != nil {
|
||||||
return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v)
|
binary.BigEndian.PutUint64(refb, uint64(ms.ref))
|
||||||
|
return string(refb), a.AddFast(string(refb), t, v)
|
||||||
}
|
}
|
||||||
|
// Series was added in this transaction previously.
|
||||||
if ref, ok := a.newHashes[hash]; ok {
|
if ref, ok := a.newHashes[hash]; ok {
|
||||||
return uint64(ref), a.AddFast(uint64(ref), t, v)
|
binary.BigEndian.PutUint64(refb, ref)
|
||||||
|
// XXX(fabxc): there's no fast path for multiple samples for the same new series
|
||||||
|
// in the same transaction. We always return the invalid empty ref. It's has not
|
||||||
|
// been a relevant use case so far and is not worth the trouble.
|
||||||
|
return nullRef, a.AddFast(string(refb), t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We only know the actual reference after committing. We generate an
|
// The series is completely new.
|
||||||
// intermediate reference only valid for this batch.
|
|
||||||
// It is indicated by the the LSB of the 4th byte being set to 1.
|
|
||||||
// We use a random ID to avoid collisions when new series are created
|
|
||||||
// in two subsequent batches.
|
|
||||||
// TODO(fabxc): Provide method for client to determine whether a ref
|
|
||||||
// is valid beyond the current transaction.
|
|
||||||
ref := uint64(rand.Int31()) | (1 << 32)
|
|
||||||
|
|
||||||
if a.newSeries == nil {
|
if a.newSeries == nil {
|
||||||
a.newSeries = map[uint64]hashedLabels{}
|
|
||||||
a.newHashes = map[uint64]uint64{}
|
a.newHashes = map[uint64]uint64{}
|
||||||
a.refmap = map[uint64]uint64{}
|
|
||||||
}
|
}
|
||||||
a.newSeries[ref] = hashedLabels{hash: hash, labels: lset}
|
// First sample for new series.
|
||||||
a.newHashes[hash] = ref
|
ref := uint64(len(a.newSeries))
|
||||||
|
|
||||||
return ref, a.AddFast(ref, t, v)
|
a.newSeries = append(a.newSeries, &hashedLabels{
|
||||||
|
ref: ref,
|
||||||
|
hash: hash,
|
||||||
|
labels: lset,
|
||||||
|
})
|
||||||
|
// First bit indicates its a series created in this transaction.
|
||||||
|
ref |= (1 << 63)
|
||||||
|
|
||||||
|
a.newHashes[hash] = ref
|
||||||
|
binary.BigEndian.PutUint64(refb, ref)
|
||||||
|
|
||||||
|
return nullRef, a.AddFast(string(refb), t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0})
|
||||||
// We only own the last 5 bytes of the reference. Anything before is
|
|
||||||
// used by higher-order appenders. We erase it to avoid issues.
|
|
||||||
ref = (ref << 24) >> 24
|
|
||||||
|
|
||||||
|
func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
|
if len(ref) != 8 {
|
||||||
|
return errors.Wrap(ErrNotFound, "invalid ref length")
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
refn = binary.BigEndian.Uint64(yoloBytes(ref))
|
||||||
|
id = (refn << 1) >> 1
|
||||||
|
inTx = refn&(1<<63) != 0
|
||||||
|
)
|
||||||
// Distinguish between existing series and series created in
|
// Distinguish between existing series and series created in
|
||||||
// this transaction.
|
// this transaction.
|
||||||
if ref&(1<<32) != 0 {
|
if inTx {
|
||||||
if _, ok := a.newSeries[ref]; !ok {
|
if id > uint64(len(a.newSeries)-1) {
|
||||||
return ErrNotFound
|
return errors.Wrap(ErrNotFound, "transaction series ID too high")
|
||||||
}
|
}
|
||||||
// TODO(fabxc): we also have to validate here that the
|
// TODO(fabxc): we also have to validate here that the
|
||||||
// sample sequence is valid.
|
// sample sequence is valid.
|
||||||
// We also have to revalidate it as we switch locks an create
|
// We also have to revalidate it as we switch locks and create
|
||||||
// the new series.
|
// the new series.
|
||||||
} else if ref > uint64(len(a.series)) {
|
} else if id > uint64(len(a.series)) {
|
||||||
return ErrNotFound
|
return errors.Wrap(ErrNotFound, "transaction series ID too high")
|
||||||
} else {
|
} else {
|
||||||
ms := a.series[int(ref)]
|
ms := a.series[id]
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
return ErrNotFound
|
return errors.Wrap(ErrNotFound, "nil series")
|
||||||
}
|
}
|
||||||
// TODO(fabxc): memory series should be locked here already.
|
// TODO(fabxc): memory series should be locked here already.
|
||||||
// Only problem is release of locks in case of a rollback.
|
// Only problem is release of locks in case of a rollback.
|
||||||
|
@ -430,7 +444,7 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
a.samples = append(a.samples, RefSample{
|
a.samples = append(a.samples, RefSample{
|
||||||
Ref: ref,
|
Ref: refn,
|
||||||
T: t,
|
T: t,
|
||||||
V: v,
|
V: v,
|
||||||
})
|
})
|
||||||
|
@ -449,18 +463,18 @@ func (a *headAppender) createSeries() {
|
||||||
|
|
||||||
base1 := len(a.series)
|
base1 := len(a.series)
|
||||||
|
|
||||||
for ref, l := range a.newSeries {
|
for _, l := range a.newSeries {
|
||||||
// We switched locks and have to re-validate that the series were not
|
// We switched locks and have to re-validate that the series were not
|
||||||
// created by another goroutine in the meantime.
|
// created by another goroutine in the meantime.
|
||||||
if base1 > base0 {
|
if base1 > base0 {
|
||||||
if ms := a.get(l.hash, l.labels); ms != nil {
|
if ms := a.get(l.hash, l.labels); ms != nil {
|
||||||
a.refmap[ref] = uint64(ms.ref)
|
l.ref = uint64(ms.ref)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Series is still new.
|
// Series is still new.
|
||||||
a.newLabels = append(a.newLabels, l.labels)
|
a.newLabels = append(a.newLabels, l.labels)
|
||||||
a.refmap[ref] = uint64(len(a.series))
|
l.ref = uint64(len(a.series))
|
||||||
|
|
||||||
a.create(l.hash, l.labels)
|
a.create(l.hash, l.labels)
|
||||||
}
|
}
|
||||||
|
@ -475,11 +489,11 @@ func (a *headAppender) Commit() error {
|
||||||
|
|
||||||
a.createSeries()
|
a.createSeries()
|
||||||
|
|
||||||
|
// We have to update the refs of samples for series we just created.
|
||||||
for i := range a.samples {
|
for i := range a.samples {
|
||||||
s := &a.samples[i]
|
s := &a.samples[i]
|
||||||
|
if s.Ref&(1<<63) != 0 {
|
||||||
if s.Ref&(1<<32) > 0 {
|
s.Ref = a.newSeries[(s.Ref<<1)>>1].ref
|
||||||
s.Ref = a.refmap[s.Ref]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -589,6 +603,9 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error
|
||||||
}
|
}
|
||||||
|
|
||||||
s := h.series[ref]
|
s := h.series[ref]
|
||||||
|
if s == nil {
|
||||||
|
return nil, nil, ErrNotFound
|
||||||
|
}
|
||||||
metas := make([]*ChunkMeta, 0, len(s.chunks))
|
metas := make([]*ChunkMeta, 0, len(s.chunks))
|
||||||
|
|
||||||
s.mtx.RLock()
|
s.mtx.RLock()
|
||||||
|
|
|
@ -33,7 +33,7 @@ import (
|
||||||
|
|
||||||
// createTestHeadBlock creates a new head block with a SegmentWAL.
|
// createTestHeadBlock creates a new head block with a SegmentWAL.
|
||||||
func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock {
|
func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock {
|
||||||
err := TouchHeadBlock(dir, 0, mint, maxt)
|
dir, err := TouchHeadBlock(dir, mint, maxt)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
return openTestHeadBlock(t, dir)
|
return openTestHeadBlock(t, dir)
|
||||||
|
|
4
wal.go
4
wal.go
|
@ -21,7 +21,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -91,7 +90,6 @@ type RefSample struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
walDirName = "wal"
|
|
||||||
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
|
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -107,8 +105,6 @@ func init() {
|
||||||
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
|
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
|
||||||
// The WAL must be read completely before new data is written.
|
// The WAL must be read completely before new data is written.
|
||||||
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
|
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
|
||||||
dir = filepath.Join(dir, walDirName)
|
|
||||||
|
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -320,17 +320,13 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
require.Equal(t, 0, len(l))
|
require.Equal(t, 0, len(l))
|
||||||
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
||||||
|
|
||||||
// Truncation should happen transparently and now cause an error.
|
// Truncation should happen transparently and not cause an error.
|
||||||
require.False(t, r.Next())
|
require.False(t, r.Next())
|
||||||
require.Nil(t, r.Err())
|
require.Nil(t, r.Err())
|
||||||
|
|
||||||
require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}}))
|
require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}}))
|
||||||
require.NoError(t, w2.Close())
|
require.NoError(t, w2.Close())
|
||||||
|
|
||||||
files, err := fileutil.ReadDir(dir)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, len(files))
|
|
||||||
|
|
||||||
// We should see the first valid entry and the new one, everything after
|
// We should see the first valid entry and the new one, everything after
|
||||||
// is truncated.
|
// is truncated.
|
||||||
w3, err := OpenSegmentWAL(dir, logger, 0)
|
w3, err := OpenSegmentWAL(dir, logger, 0)
|
||||||
|
|
Loading…
Reference in a new issue