mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #79 from prometheus/typerest
Various type improvements
This commit is contained in:
commit
56e6af99f9
4
block.go
4
block.go
|
@ -48,8 +48,8 @@ type Block interface {
|
||||||
Queryable
|
Queryable
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadBlock is a regular block that can still be appended to.
|
// headBlock is a regular block that can still be appended to.
|
||||||
type HeadBlock interface {
|
type headBlock interface {
|
||||||
Block
|
Block
|
||||||
Appendable
|
Appendable
|
||||||
}
|
}
|
||||||
|
|
39
db.go
39
db.go
|
@ -118,7 +118,7 @@ type DB struct {
|
||||||
// or the general layout.
|
// or the general layout.
|
||||||
// Must never be held when acquiring a blocks's mutex!
|
// Must never be held when acquiring a blocks's mutex!
|
||||||
headmtx sync.RWMutex
|
headmtx sync.RWMutex
|
||||||
heads []HeadBlock
|
heads []headBlock
|
||||||
|
|
||||||
compactor Compactor
|
compactor Compactor
|
||||||
|
|
||||||
|
@ -401,7 +401,7 @@ func (db *DB) reloadBlocks() error {
|
||||||
var (
|
var (
|
||||||
metas []*BlockMeta
|
metas []*BlockMeta
|
||||||
blocks []Block
|
blocks []Block
|
||||||
heads []HeadBlock
|
heads []headBlock
|
||||||
seqBlocks = make(map[int]Block, len(dirs))
|
seqBlocks = make(map[int]Block, len(dirs))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error {
|
||||||
|
|
||||||
if meta.Compaction.Generation == 0 {
|
if meta.Compaction.Generation == 0 {
|
||||||
if !ok {
|
if !ok {
|
||||||
b, err = openHeadBlock(dirs[i], db.logger)
|
b, err = db.openHeadBlock(dirs[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "load head at %s", dirs[i])
|
return errors.Wrapf(err, "load head at %s", dirs[i])
|
||||||
}
|
}
|
||||||
|
@ -426,7 +426,7 @@ func (db *DB) reloadBlocks() error {
|
||||||
if meta.ULID != b.Meta().ULID {
|
if meta.ULID != b.Meta().ULID {
|
||||||
return errors.Errorf("head block ULID changed unexpectedly")
|
return errors.Errorf("head block ULID changed unexpectedly")
|
||||||
}
|
}
|
||||||
heads = append(heads, b.(HeadBlock))
|
heads = append(heads, b.(headBlock))
|
||||||
} else {
|
} else {
|
||||||
if !ok || meta.ULID != b.Meta().ULID {
|
if !ok || meta.ULID != b.Meta().ULID {
|
||||||
b, err = newPersistedBlock(dirs[i])
|
b, err = newPersistedBlock(dirs[i])
|
||||||
|
@ -559,7 +559,7 @@ func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
|
||||||
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
|
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
|
||||||
a.db.headmtx.Lock()
|
a.db.headmtx.Lock()
|
||||||
|
|
||||||
var newHeads []HeadBlock
|
var newHeads []headBlock
|
||||||
|
|
||||||
if err := a.db.ensureHead(t); err != nil {
|
if err := a.db.ensureHead(t); err != nil {
|
||||||
a.db.headmtx.Unlock()
|
a.db.headmtx.Unlock()
|
||||||
|
@ -670,9 +670,9 @@ func (a *dbAppender) Rollback() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
||||||
func (db *DB) appendable() []HeadBlock {
|
func (db *DB) appendable() []headBlock {
|
||||||
var i int
|
var i int
|
||||||
app := make([]HeadBlock, 0, db.opts.AppendableBlocks)
|
app := make([]headBlock, 0, db.opts.AppendableBlocks)
|
||||||
|
|
||||||
if len(db.heads) > db.opts.AppendableBlocks {
|
if len(db.heads) > db.opts.AppendableBlocks {
|
||||||
i = len(db.heads) - db.opts.AppendableBlocks
|
i = len(db.heads) - db.opts.AppendableBlocks
|
||||||
|
@ -709,16 +709,37 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
|
||||||
return bs
|
return bs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// openHeadBlock opens the head block at dir.
|
||||||
|
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
|
||||||
|
var (
|
||||||
|
wdir = filepath.Join(dir, "wal")
|
||||||
|
l = log.With(db.logger, "wal", wdir)
|
||||||
|
)
|
||||||
|
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "open WAL %s")
|
||||||
|
}
|
||||||
|
|
||||||
|
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "open head block %s", dir)
|
||||||
|
}
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
// cut starts a new head block to append to. The completed head block
|
// cut starts a new head block to append to. The completed head block
|
||||||
// will still be appendable for the configured grace period.
|
// will still be appendable for the configured grace period.
|
||||||
func (db *DB) cut(mint int64) (HeadBlock, error) {
|
func (db *DB) cut(mint int64) (headBlock, error) {
|
||||||
maxt := mint + int64(db.opts.MinBlockDuration)
|
maxt := mint + int64(db.opts.MinBlockDuration)
|
||||||
|
|
||||||
dir, seq, err := nextSequenceFile(db.dir, "b-")
|
dir, seq, err := nextSequenceFile(db.dir, "b-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
newHead, err := createHeadBlock(dir, seq, db.logger, mint, maxt)
|
if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "touch head block %s", dir)
|
||||||
|
}
|
||||||
|
newHead, err := db.openHeadBlock(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
118
head.go
118
head.go
|
@ -47,11 +47,11 @@ var (
|
||||||
ErrOutOfBounds = errors.New("out of bounds")
|
ErrOutOfBounds = errors.New("out of bounds")
|
||||||
)
|
)
|
||||||
|
|
||||||
// headBlock handles reads and writes of time series data within a time window.
|
// HeadBlock handles reads and writes of time series data within a time window.
|
||||||
type headBlock struct {
|
type HeadBlock struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
dir string
|
dir string
|
||||||
wal *WAL
|
wal WAL
|
||||||
|
|
||||||
activeWriters uint64
|
activeWriters uint64
|
||||||
closed bool
|
closed bool
|
||||||
|
@ -69,19 +69,21 @@ type headBlock struct {
|
||||||
meta BlockMeta
|
meta BlockMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) {
|
// TouchHeadBlock atomically touches a new head block in dir for
|
||||||
|
// samples in the range [mint,maxt).
|
||||||
|
func TouchHeadBlock(dir string, seq int, mint, maxt int64) error {
|
||||||
// Make head block creation appear atomic.
|
// Make head block creation appear atomic.
|
||||||
tmp := dir + ".tmp"
|
tmp := dir + ".tmp"
|
||||||
|
|
||||||
if err := os.MkdirAll(tmp, 0777); err != nil {
|
if err := os.MkdirAll(tmp, 0777); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
ulid, err := ulid.New(ulid.Now(), entropy)
|
ulid, err := ulid.New(ulid.Now(), entropy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := writeMetaFile(tmp, &BlockMeta{
|
if err := writeMetaFile(tmp, &BlockMeta{
|
||||||
|
@ -90,26 +92,19 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
MaxTime: maxt,
|
MaxTime: maxt,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
if err := renameFile(tmp, dir); err != nil {
|
return renameFile(tmp, dir)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return openHeadBlock(dir, l)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// openHeadBlock creates a new empty head block.
|
// OpenHeadBlock opens the head block in dir.
|
||||||
func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
|
func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
||||||
wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &headBlock{
|
h := &HeadBlock{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
wal: wal,
|
wal: wal,
|
||||||
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
||||||
|
@ -118,10 +113,12 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
|
||||||
postings: &memPostings{m: make(map[term][]uint32)},
|
postings: &memPostings{m: make(map[term][]uint32)},
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
}
|
}
|
||||||
|
return h, h.init()
|
||||||
|
}
|
||||||
|
|
||||||
r := wal.Reader()
|
func (h *HeadBlock) init() error {
|
||||||
|
r := h.wal.Reader()
|
||||||
|
|
||||||
Outer:
|
|
||||||
for r.Next() {
|
for r.Next() {
|
||||||
series, samples := r.At()
|
series, samples := r.At()
|
||||||
|
|
||||||
|
@ -130,37 +127,32 @@ Outer:
|
||||||
h.meta.Stats.NumSeries++
|
h.meta.Stats.NumSeries++
|
||||||
}
|
}
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
if int(s.ref) >= len(h.series) {
|
if int(s.Ref) >= len(h.series) {
|
||||||
l.Log("msg", "unknown series reference, abort WAL restore", "got", s.ref, "max", len(h.series)-1)
|
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series))
|
||||||
break Outer
|
|
||||||
}
|
}
|
||||||
h.series[s.ref].append(s.t, s.v)
|
h.series[s.Ref].append(s.T, s.V)
|
||||||
|
|
||||||
if !h.inBounds(s.t) {
|
if !h.inBounds(s.T) {
|
||||||
return nil, errors.Wrap(ErrOutOfBounds, "consume WAL")
|
return errors.Wrap(ErrOutOfBounds, "consume WAL")
|
||||||
}
|
}
|
||||||
h.meta.Stats.NumSamples++
|
h.meta.Stats.NumSamples++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := r.Err(); err != nil {
|
return errors.Wrap(r.Err(), "consume WAL")
|
||||||
return nil, errors.Wrap(err, "consume WAL")
|
|
||||||
}
|
|
||||||
|
|
||||||
return h, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// inBounds returns true if the given timestamp is within the valid
|
// inBounds returns true if the given timestamp is within the valid
|
||||||
// time bounds of the block.
|
// time bounds of the block.
|
||||||
func (h *headBlock) inBounds(t int64) bool {
|
func (h *HeadBlock) inBounds(t int64) bool {
|
||||||
return t >= h.meta.MinTime && t <= h.meta.MaxTime
|
return t >= h.meta.MinTime && t <= h.meta.MaxTime
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headBlock) String() string {
|
func (h *HeadBlock) String() string {
|
||||||
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
|
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close syncs all data and closes underlying resources of the head block.
|
// Close syncs all data and closes underlying resources of the head block.
|
||||||
func (h *headBlock) Close() error {
|
func (h *HeadBlock) Close() error {
|
||||||
h.mtx.Lock()
|
h.mtx.Lock()
|
||||||
defer h.mtx.Unlock()
|
defer h.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -184,7 +176,7 @@ func (h *headBlock) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headBlock) Meta() BlockMeta {
|
func (h *HeadBlock) Meta() BlockMeta {
|
||||||
m := BlockMeta{
|
m := BlockMeta{
|
||||||
ULID: h.meta.ULID,
|
ULID: h.meta.ULID,
|
||||||
Sequence: h.meta.Sequence,
|
Sequence: h.meta.Sequence,
|
||||||
|
@ -200,12 +192,12 @@ func (h *headBlock) Meta() BlockMeta {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headBlock) Dir() string { return h.dir }
|
func (h *HeadBlock) Dir() string { return h.dir }
|
||||||
func (h *headBlock) Persisted() bool { return false }
|
func (h *HeadBlock) Persisted() bool { return false }
|
||||||
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
|
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
|
||||||
func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
||||||
|
|
||||||
func (h *headBlock) Querier(mint, maxt int64) Querier {
|
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||||
h.mtx.RLock()
|
h.mtx.RLock()
|
||||||
defer h.mtx.RUnlock()
|
defer h.mtx.RUnlock()
|
||||||
|
|
||||||
|
@ -244,7 +236,7 @@ func (h *headBlock) Querier(mint, maxt int64) Querier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headBlock) Appender() Appender {
|
func (h *HeadBlock) Appender() Appender {
|
||||||
atomic.AddUint64(&h.activeWriters, 1)
|
atomic.AddUint64(&h.activeWriters, 1)
|
||||||
|
|
||||||
h.mtx.RLock()
|
h.mtx.RLock()
|
||||||
|
@ -252,36 +244,36 @@ func (h *headBlock) Appender() Appender {
|
||||||
if h.closed {
|
if h.closed {
|
||||||
panic(fmt.Sprintf("block %s already closed", h.dir))
|
panic(fmt.Sprintf("block %s already closed", h.dir))
|
||||||
}
|
}
|
||||||
return &headAppender{headBlock: h, samples: getHeadAppendBuffer()}
|
return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headBlock) Busy() bool {
|
func (h *HeadBlock) Busy() bool {
|
||||||
return atomic.LoadUint64(&h.activeWriters) > 0
|
return atomic.LoadUint64(&h.activeWriters) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
var headPool = sync.Pool{}
|
var headPool = sync.Pool{}
|
||||||
|
|
||||||
func getHeadAppendBuffer() []refdSample {
|
func getHeadAppendBuffer() []RefSample {
|
||||||
b := headPool.Get()
|
b := headPool.Get()
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return make([]refdSample, 0, 512)
|
return make([]RefSample, 0, 512)
|
||||||
}
|
}
|
||||||
return b.([]refdSample)
|
return b.([]RefSample)
|
||||||
}
|
}
|
||||||
|
|
||||||
func putHeadAppendBuffer(b []refdSample) {
|
func putHeadAppendBuffer(b []RefSample) {
|
||||||
headPool.Put(b[:0])
|
headPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
type headAppender struct {
|
type headAppender struct {
|
||||||
*headBlock
|
*HeadBlock
|
||||||
|
|
||||||
newSeries map[uint64]hashedLabels
|
newSeries map[uint64]hashedLabels
|
||||||
newHashes map[uint64]uint64
|
newHashes map[uint64]uint64
|
||||||
refmap map[uint64]uint64
|
refmap map[uint64]uint64
|
||||||
newLabels []labels.Labels
|
newLabels []labels.Labels
|
||||||
|
|
||||||
samples []refdSample
|
samples []RefSample
|
||||||
}
|
}
|
||||||
|
|
||||||
type hashedLabels struct {
|
type hashedLabels struct {
|
||||||
|
@ -289,12 +281,6 @@ type hashedLabels struct {
|
||||||
labels labels.Labels
|
labels labels.Labels
|
||||||
}
|
}
|
||||||
|
|
||||||
type refdSample struct {
|
|
||||||
ref uint64
|
|
||||||
t int64
|
|
||||||
v float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||||
if !a.inBounds(t) {
|
if !a.inBounds(t) {
|
||||||
return 0, ErrOutOfBounds
|
return 0, ErrOutOfBounds
|
||||||
|
@ -369,10 +355,10 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
a.samples = append(a.samples, refdSample{
|
a.samples = append(a.samples, RefSample{
|
||||||
ref: ref,
|
Ref: ref,
|
||||||
t: t,
|
T: t,
|
||||||
v: v,
|
V: v,
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -418,8 +404,8 @@ func (a *headAppender) Commit() error {
|
||||||
for i := range a.samples {
|
for i := range a.samples {
|
||||||
s := &a.samples[i]
|
s := &a.samples[i]
|
||||||
|
|
||||||
if s.ref&(1<<32) > 0 {
|
if s.Ref&(1<<32) > 0 {
|
||||||
s.ref = a.refmap[s.ref]
|
s.Ref = a.refmap[s.Ref]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,7 +419,7 @@ func (a *headAppender) Commit() error {
|
||||||
total := uint64(len(a.samples))
|
total := uint64(len(a.samples))
|
||||||
|
|
||||||
for _, s := range a.samples {
|
for _, s := range a.samples {
|
||||||
if !a.series[s.ref].append(s.t, s.v) {
|
if !a.series[s.Ref].append(s.T, s.V) {
|
||||||
total--
|
total--
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -454,7 +440,7 @@ func (a *headAppender) Rollback() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
type headChunkReader struct {
|
type headChunkReader struct {
|
||||||
*headBlock
|
*HeadBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chunk returns the chunk for the reference number.
|
// Chunk returns the chunk for the reference number.
|
||||||
|
@ -490,7 +476,7 @@ func (c *safeChunk) Iterator() chunks.Iterator {
|
||||||
// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
|
// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
|
||||||
|
|
||||||
type headIndexReader struct {
|
type headIndexReader struct {
|
||||||
*headBlock
|
*HeadBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValues returns the possible label values
|
// LabelValues returns the possible label values
|
||||||
|
@ -558,7 +544,7 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
||||||
|
|
||||||
// get retrieves the chunk with the hash and label set and creates
|
// get retrieves the chunk with the hash and label set and creates
|
||||||
// a new one if it doesn't exist yet.
|
// a new one if it doesn't exist yet.
|
||||||
func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries {
|
func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries {
|
||||||
series := h.hashes[hash]
|
series := h.hashes[hash]
|
||||||
|
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
|
@ -569,7 +555,7 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
|
func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
|
||||||
s := &memSeries{
|
s := &memSeries{
|
||||||
lset: lset,
|
lset: lset,
|
||||||
ref: uint32(len(h.series)),
|
ref: uint32(len(h.series)),
|
||||||
|
|
60
head_test.go
60
head_test.go
|
@ -20,6 +20,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -30,6 +31,19 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// createTestHeadBlock creates a new head block with a SegmentWAL.
|
||||||
|
func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock {
|
||||||
|
err := TouchHeadBlock(dir, 0, mint, maxt)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
wal, err := OpenSegmentWAL(dir, nil, 5*time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
h, err := OpenHeadBlock(dir, nil, wal)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkCreateSeries(b *testing.B) {
|
func BenchmarkCreateSeries(b *testing.B) {
|
||||||
lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6)
|
lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
@ -39,8 +53,7 @@ func BenchmarkCreateSeries(b *testing.B) {
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
h, err := createHeadBlock(dir, 0, nil, 0, 1)
|
h := createTestHeadBlock(b, dir, 0, 1)
|
||||||
require.NoError(b, err)
|
|
||||||
|
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
@ -90,14 +103,13 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAmendDatapointCausesError(t *testing.T) {
|
func TestAmendDatapointCausesError(t *testing.T) {
|
||||||
tmpdir, _ := ioutil.TempDir("", "test")
|
dir, _ := ioutil.TempDir("", "test")
|
||||||
defer os.RemoveAll(tmpdir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
hb := createTestHeadBlock(t, dir, 0, 1000)
|
||||||
require.NoError(t, err, "Error creating head block")
|
|
||||||
|
|
||||||
app := hb.Appender()
|
app := hb.Appender()
|
||||||
_, err = app.Add(labels.Labels{}, 0, 0)
|
_, err := app.Add(labels.Labels{}, 0, 0)
|
||||||
require.NoError(t, err, "Failed to add sample")
|
require.NoError(t, err, "Failed to add sample")
|
||||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||||
|
|
||||||
|
@ -107,14 +119,13 @@ func TestAmendDatapointCausesError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
||||||
tmpdir, _ := ioutil.TempDir("", "test")
|
dir, _ := ioutil.TempDir("", "test")
|
||||||
defer os.RemoveAll(tmpdir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
hb := createTestHeadBlock(t, dir, 0, 1000)
|
||||||
require.NoError(t, err, "Error creating head block")
|
|
||||||
|
|
||||||
app := hb.Appender()
|
app := hb.Appender()
|
||||||
_, err = app.Add(labels.Labels{}, 0, math.NaN())
|
_, err := app.Add(labels.Labels{}, 0, math.NaN())
|
||||||
require.NoError(t, err, "Failed to add sample")
|
require.NoError(t, err, "Failed to add sample")
|
||||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||||
|
|
||||||
|
@ -124,14 +135,13 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
||||||
tmpdir, _ := ioutil.TempDir("", "test")
|
dir, _ := ioutil.TempDir("", "test")
|
||||||
defer os.RemoveAll(tmpdir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
hb := createTestHeadBlock(t, dir, 0, 1000)
|
||||||
require.NoError(t, err, "Error creating head block")
|
|
||||||
|
|
||||||
app := hb.Appender()
|
app := hb.Appender()
|
||||||
_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
|
_, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
|
||||||
require.NoError(t, err, "Failed to add sample")
|
require.NoError(t, err, "Failed to add sample")
|
||||||
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
require.NoError(t, app.Commit(), "Unexpected error committing appender")
|
||||||
|
|
||||||
|
@ -141,15 +151,14 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
||||||
tmpdir, _ := ioutil.TempDir("", "test")
|
dir, _ := ioutil.TempDir("", "test")
|
||||||
defer os.RemoveAll(tmpdir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
|
hb := createTestHeadBlock(t, dir, 0, 1000)
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Append AmendedValue.
|
// Append AmendedValue.
|
||||||
app := hb.Appender()
|
app := hb.Appender()
|
||||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 1)
|
_, err := app.Add(labels.Labels{{"a", "b"}}, 0, 1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2)
|
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -243,11 +252,10 @@ func TestHeadBlock_e2e(t *testing.T) {
|
||||||
seriesMap[labels.New(l...).String()] = []sample{}
|
seriesMap[labels.New(l...).String()] = []sample{}
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpdir, _ := ioutil.TempDir("", "test")
|
dir, _ := ioutil.TempDir("", "test")
|
||||||
defer os.RemoveAll(tmpdir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime)
|
hb := createTestHeadBlock(t, dir, minTime, maxTime)
|
||||||
require.NoError(t, err)
|
|
||||||
app := hb.Appender()
|
app := hb.Appender()
|
||||||
|
|
||||||
for _, l := range lbls {
|
for _, l := range lbls {
|
||||||
|
|
115
wal.go
115
wal.go
|
@ -49,9 +49,8 @@ const (
|
||||||
WALEntrySamples WALEntryType = 3
|
WALEntrySamples WALEntryType = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
// WAL is a write ahead log for series data. It can only be written to.
|
// SegmentWAL is a write ahead log for series data.
|
||||||
// Use WALReader to read back from a write ahead log.
|
type SegmentWAL struct {
|
||||||
type WAL struct {
|
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
|
|
||||||
dirFile *os.File
|
dirFile *os.File
|
||||||
|
@ -69,6 +68,28 @@ type WAL struct {
|
||||||
donec chan struct{}
|
donec chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WAL is a write ahead log that can log new series labels and samples.
|
||||||
|
// It must be completely read before new entries are logged.
|
||||||
|
type WAL interface {
|
||||||
|
Reader() WALReader
|
||||||
|
Log([]labels.Labels, []RefSample) error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// WALReader reads entries from a WAL.
|
||||||
|
type WALReader interface {
|
||||||
|
At() ([]labels.Labels, []RefSample)
|
||||||
|
Next() bool
|
||||||
|
Err() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||||
|
type RefSample struct {
|
||||||
|
Ref uint64
|
||||||
|
T int64
|
||||||
|
V float64
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
walDirName = "wal"
|
walDirName = "wal"
|
||||||
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
|
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
|
||||||
|
@ -83,9 +104,9 @@ func init() {
|
||||||
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
|
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenWAL opens or creates a write ahead log in the given directory.
|
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
|
||||||
// The WAL must be read completely before new data is written.
|
// The WAL must be read completely before new data is written.
|
||||||
func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) {
|
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
|
||||||
dir = filepath.Join(dir, walDirName)
|
dir = filepath.Join(dir, walDirName)
|
||||||
|
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
|
@ -99,7 +120,7 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL,
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
w := &WAL{
|
w := &SegmentWAL{
|
||||||
dirFile: df,
|
dirFile: df,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
flushInterval: flushInterval,
|
flushInterval: flushInterval,
|
||||||
|
@ -119,12 +140,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL,
|
||||||
|
|
||||||
// Reader returns a new reader over the the write ahead log data.
|
// Reader returns a new reader over the the write ahead log data.
|
||||||
// It must be completely consumed before writing to the WAL.
|
// It must be completely consumed before writing to the WAL.
|
||||||
func (w *WAL) Reader() *WALReader {
|
func (w *SegmentWAL) Reader() WALReader {
|
||||||
return NewWALReader(w.logger, w)
|
return newWALReader(w, w.logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log writes a batch of new series labels and samples to the log.
|
// Log writes a batch of new series labels and samples to the log.
|
||||||
func (w *WAL) Log(series []labels.Labels, samples []refdSample) error {
|
func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
|
||||||
if err := w.encodeSeries(series); err != nil {
|
if err := w.encodeSeries(series); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -139,7 +160,7 @@ func (w *WAL) Log(series []labels.Labels, samples []refdSample) error {
|
||||||
|
|
||||||
// initSegments finds all existing segment files and opens them in the
|
// initSegments finds all existing segment files and opens them in the
|
||||||
// appropriate file modes.
|
// appropriate file modes.
|
||||||
func (w *WAL) initSegments() error {
|
func (w *SegmentWAL) initSegments() error {
|
||||||
fns, err := sequenceFiles(w.dirFile.Name(), "")
|
fns, err := sequenceFiles(w.dirFile.Name(), "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -180,7 +201,7 @@ func (w *WAL) initSegments() error {
|
||||||
|
|
||||||
// cut finishes the currently active segments and opens the next one.
|
// cut finishes the currently active segments and opens the next one.
|
||||||
// The encoder is reset to point to the new segment.
|
// The encoder is reset to point to the new segment.
|
||||||
func (w *WAL) cut() error {
|
func (w *SegmentWAL) cut() error {
|
||||||
// Sync current tail to disk and close.
|
// Sync current tail to disk and close.
|
||||||
if tf := w.tail(); tf != nil {
|
if tf := w.tail(); tf != nil {
|
||||||
if err := w.sync(); err != nil {
|
if err := w.sync(); err != nil {
|
||||||
|
@ -229,7 +250,7 @@ func (w *WAL) cut() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) tail() *os.File {
|
func (w *SegmentWAL) tail() *os.File {
|
||||||
if len(w.files) == 0 {
|
if len(w.files) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -237,14 +258,14 @@ func (w *WAL) tail() *os.File {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync flushes the changes to disk.
|
// Sync flushes the changes to disk.
|
||||||
func (w *WAL) Sync() error {
|
func (w *SegmentWAL) Sync() error {
|
||||||
w.mtx.Lock()
|
w.mtx.Lock()
|
||||||
defer w.mtx.Unlock()
|
defer w.mtx.Unlock()
|
||||||
|
|
||||||
return w.sync()
|
return w.sync()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) sync() error {
|
func (w *SegmentWAL) sync() error {
|
||||||
if w.cur == nil {
|
if w.cur == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -254,7 +275,7 @@ func (w *WAL) sync() error {
|
||||||
return fileutil.Fdatasync(w.tail())
|
return fileutil.Fdatasync(w.tail())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) run(interval time.Duration) {
|
func (w *SegmentWAL) run(interval time.Duration) {
|
||||||
var tick <-chan time.Time
|
var tick <-chan time.Time
|
||||||
|
|
||||||
if interval > 0 {
|
if interval > 0 {
|
||||||
|
@ -277,7 +298,7 @@ func (w *WAL) run(interval time.Duration) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close syncs all data and closes the underlying resources.
|
// Close syncs all data and closes the underlying resources.
|
||||||
func (w *WAL) Close() error {
|
func (w *SegmentWAL) Close() error {
|
||||||
close(w.stopc)
|
close(w.stopc)
|
||||||
<-w.donec
|
<-w.donec
|
||||||
|
|
||||||
|
@ -305,7 +326,7 @@ const (
|
||||||
walPageBytes = 16 * minSectorSize
|
walPageBytes = 16 * minSectorSize
|
||||||
)
|
)
|
||||||
|
|
||||||
func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
|
func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error {
|
||||||
w.mtx.Lock()
|
w.mtx.Lock()
|
||||||
defer w.mtx.Unlock()
|
defer w.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -369,7 +390,7 @@ func putWALBuffer(b []byte) {
|
||||||
walBuffers.Put(b)
|
walBuffers.Put(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) encodeSeries(series []labels.Labels) error {
|
func (w *SegmentWAL) encodeSeries(series []labels.Labels) error {
|
||||||
if len(series) == 0 {
|
if len(series) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -395,7 +416,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error {
|
||||||
return w.entry(WALEntrySeries, walSeriesSimple, buf)
|
return w.entry(WALEntrySeries, walSeriesSimple, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) encodeSamples(samples []refdSample) error {
|
func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
|
||||||
if len(samples) == 0 {
|
if len(samples) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -409,67 +430,65 @@ func (w *WAL) encodeSamples(samples []refdSample) error {
|
||||||
// TODO(fabxc): optimize for all samples having the same timestamp.
|
// TODO(fabxc): optimize for all samples having the same timestamp.
|
||||||
first := samples[0]
|
first := samples[0]
|
||||||
|
|
||||||
binary.BigEndian.PutUint64(b, first.ref)
|
binary.BigEndian.PutUint64(b, first.Ref)
|
||||||
buf = append(buf, b[:8]...)
|
buf = append(buf, b[:8]...)
|
||||||
binary.BigEndian.PutUint64(b, uint64(first.t))
|
binary.BigEndian.PutUint64(b, uint64(first.T))
|
||||||
buf = append(buf, b[:8]...)
|
buf = append(buf, b[:8]...)
|
||||||
|
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
n := binary.PutVarint(b, int64(s.ref)-int64(first.ref))
|
n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref))
|
||||||
buf = append(buf, b[:n]...)
|
buf = append(buf, b[:n]...)
|
||||||
|
|
||||||
n = binary.PutVarint(b, s.t-first.t)
|
n = binary.PutVarint(b, s.T-first.T)
|
||||||
buf = append(buf, b[:n]...)
|
buf = append(buf, b[:n]...)
|
||||||
|
|
||||||
binary.BigEndian.PutUint64(b, math.Float64bits(s.v))
|
binary.BigEndian.PutUint64(b, math.Float64bits(s.V))
|
||||||
buf = append(buf, b[:8]...)
|
buf = append(buf, b[:8]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return w.entry(WALEntrySamples, walSamplesSimple, buf)
|
return w.entry(WALEntrySamples, walSamplesSimple, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WALReader decodes and emits write ahead log entries.
|
// walReader decodes and emits write ahead log entries.
|
||||||
type WALReader struct {
|
type walReader struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
|
||||||
wal *WAL
|
wal *SegmentWAL
|
||||||
cur int
|
cur int
|
||||||
buf []byte
|
buf []byte
|
||||||
crc32 hash.Hash32
|
crc32 hash.Hash32
|
||||||
|
|
||||||
err error
|
err error
|
||||||
labels []labels.Labels
|
labels []labels.Labels
|
||||||
samples []refdSample
|
samples []RefSample
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWALReader returns a new WALReader over the sequence of the given ReadClosers.
|
func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
|
||||||
func NewWALReader(logger log.Logger, w *WAL) *WALReader {
|
if l == nil {
|
||||||
if logger == nil {
|
l = log.NewNopLogger()
|
||||||
logger = log.NewNopLogger()
|
|
||||||
}
|
}
|
||||||
r := &WALReader{
|
return &walReader{
|
||||||
logger: logger,
|
logger: l,
|
||||||
wal: w,
|
wal: w,
|
||||||
buf: make([]byte, 0, 128*4096),
|
buf: make([]byte, 0, 128*4096),
|
||||||
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
||||||
}
|
}
|
||||||
return r
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// At returns the last decoded entry of labels or samples.
|
// At returns the last decoded entry of labels or samples.
|
||||||
// The returned slices are only valid until the next call to Next(). Their elements
|
// The returned slices are only valid until the next call to Next(). Their elements
|
||||||
// have to be copied to preserve them.
|
// have to be copied to preserve them.
|
||||||
func (r *WALReader) At() ([]labels.Labels, []refdSample) {
|
func (r *walReader) At() ([]labels.Labels, []RefSample) {
|
||||||
return r.labels, r.samples
|
return r.labels, r.samples
|
||||||
}
|
}
|
||||||
|
|
||||||
// Err returns the last error the reader encountered.
|
// Err returns the last error the reader encountered.
|
||||||
func (r *WALReader) Err() error {
|
func (r *walReader) Err() error {
|
||||||
return r.err
|
return r.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
||||||
func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
||||||
if r.cur >= len(r.wal.files) {
|
if r.cur >= len(r.wal.files) {
|
||||||
return 0, 0, nil, io.EOF
|
return 0, 0, nil, io.EOF
|
||||||
}
|
}
|
||||||
|
@ -492,7 +511,7 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
||||||
|
|
||||||
// Next returns decodes the next entry pair and returns true
|
// Next returns decodes the next entry pair and returns true
|
||||||
// if it was succesful.
|
// if it was succesful.
|
||||||
func (r *WALReader) Next() bool {
|
func (r *walReader) Next() bool {
|
||||||
r.labels = r.labels[:0]
|
r.labels = r.labels[:0]
|
||||||
r.samples = r.samples[:0]
|
r.samples = r.samples[:0]
|
||||||
|
|
||||||
|
@ -549,12 +568,12 @@ func (r *WALReader) Next() bool {
|
||||||
return r.err == nil
|
return r.err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WALReader) current() *os.File {
|
func (r *walReader) current() *os.File {
|
||||||
return r.wal.files[r.cur]
|
return r.wal.files[r.cur]
|
||||||
}
|
}
|
||||||
|
|
||||||
// truncate the WAL after the last valid entry.
|
// truncate the WAL after the last valid entry.
|
||||||
func (r *WALReader) truncate(lastOffset int64) error {
|
func (r *walReader) truncate(lastOffset int64) error {
|
||||||
r.logger.Log("msg", "WAL corruption detected; truncating",
|
r.logger.Log("msg", "WAL corruption detected; truncating",
|
||||||
"err", r.err, "file", r.current().Name(), "pos", lastOffset)
|
"err", r.err, "file", r.current().Name(), "pos", lastOffset)
|
||||||
|
|
||||||
|
@ -582,7 +601,7 @@ func walCorruptionErrf(s string, args ...interface{}) error {
|
||||||
return walCorruptionErr(errors.Errorf(s, args...))
|
return walCorruptionErr(errors.Errorf(s, args...))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
||||||
r.crc32.Reset()
|
r.crc32.Reset()
|
||||||
tr := io.TeeReader(cr, r.crc32)
|
tr := io.TeeReader(cr, r.crc32)
|
||||||
|
|
||||||
|
@ -629,7 +648,7 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
||||||
return etype, flag, buf, nil
|
return etype, flag, buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WALReader) decodeSeries(flag byte, b []byte) error {
|
func (r *walReader) decodeSeries(flag byte, b []byte) error {
|
||||||
for len(b) > 0 {
|
for len(b) > 0 {
|
||||||
l, n := binary.Uvarint(b)
|
l, n := binary.Uvarint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
|
@ -659,7 +678,7 @@ func (r *WALReader) decodeSeries(flag byte, b []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WALReader) decodeSamples(flag byte, b []byte) error {
|
func (r *walReader) decodeSamples(flag byte, b []byte) error {
|
||||||
if len(b) < 16 {
|
if len(b) < 16 {
|
||||||
return errors.Wrap(errInvalidSize, "header length")
|
return errors.Wrap(errInvalidSize, "header length")
|
||||||
}
|
}
|
||||||
|
@ -670,7 +689,7 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error {
|
||||||
b = b[16:]
|
b = b[16:]
|
||||||
|
|
||||||
for len(b) > 0 {
|
for len(b) > 0 {
|
||||||
var smpl refdSample
|
var smpl RefSample
|
||||||
|
|
||||||
dref, n := binary.Varint(b)
|
dref, n := binary.Varint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
|
@ -678,19 +697,19 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error {
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
|
|
||||||
smpl.ref = uint64(int64(baseRef) + dref)
|
smpl.Ref = uint64(int64(baseRef) + dref)
|
||||||
|
|
||||||
dtime, n := binary.Varint(b)
|
dtime, n := binary.Varint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return errors.Wrap(errInvalidSize, "sample timestamp delta")
|
return errors.Wrap(errInvalidSize, "sample timestamp delta")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
smpl.t = baseTime + dtime
|
smpl.T = baseTime + dtime
|
||||||
|
|
||||||
if len(b) < 8 {
|
if len(b) < 8 {
|
||||||
return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
|
return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
|
||||||
}
|
}
|
||||||
smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
|
smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
|
||||||
b = b[8:]
|
b = b[8:]
|
||||||
|
|
||||||
r.samples = append(r.samples, smpl)
|
r.samples = append(r.samples, smpl)
|
||||||
|
|
64
wal_test.go
64
wal_test.go
|
@ -27,7 +27,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWAL_initSegments(t *testing.T) {
|
func TestSegmentWAL_initSegments(t *testing.T) {
|
||||||
tmpdir, err := ioutil.TempDir("", "test_wal_open")
|
tmpdir, err := ioutil.TempDir("", "test_wal_open")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer os.RemoveAll(tmpdir)
|
defer os.RemoveAll(tmpdir)
|
||||||
|
@ -35,7 +35,7 @@ func TestWAL_initSegments(t *testing.T) {
|
||||||
df, err := fileutil.OpenDir(tmpdir)
|
df, err := fileutil.OpenDir(tmpdir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
w := &WAL{dirFile: df}
|
w := &SegmentWAL{dirFile: df}
|
||||||
|
|
||||||
// Create segment files with an appropriate header.
|
// Create segment files with an appropriate header.
|
||||||
for i := 1; i <= 5; i++ {
|
for i := 1; i <= 5; i++ {
|
||||||
|
@ -80,7 +80,7 @@ func TestWAL_initSegments(t *testing.T) {
|
||||||
_, err = f.WriteAt([]byte{0}, 4)
|
_, err = f.WriteAt([]byte{0}, 4)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
w = &WAL{dirFile: df}
|
w = &SegmentWAL{dirFile: df}
|
||||||
require.Error(t, w.initSegments(), "init corrupted segments")
|
require.Error(t, w.initSegments(), "init corrupted segments")
|
||||||
|
|
||||||
for _, f := range w.files {
|
for _, f := range w.files {
|
||||||
|
@ -88,13 +88,13 @@ func TestWAL_initSegments(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWAL_cut(t *testing.T) {
|
func TestSegmentWAL_cut(t *testing.T) {
|
||||||
tmpdir, err := ioutil.TempDir("", "test_wal_cut")
|
tmpdir, err := ioutil.TempDir("", "test_wal_cut")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer os.RemoveAll(tmpdir)
|
defer os.RemoveAll(tmpdir)
|
||||||
|
|
||||||
// This calls cut() implicitly the first time without a previous tail.
|
// This calls cut() implicitly the first time without a previous tail.
|
||||||
w, err := OpenWAL(tmpdir, nil, 0)
|
w, err := OpenSegmentWAL(tmpdir, nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!")))
|
require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!")))
|
||||||
|
@ -122,7 +122,7 @@ func TestWAL_cut(t *testing.T) {
|
||||||
|
|
||||||
// We cannot actually check for correct pre-allocation as it is
|
// We cannot actually check for correct pre-allocation as it is
|
||||||
// optional per filesystem and handled transparently.
|
// optional per filesystem and handled transparently.
|
||||||
et, flag, b, err := NewWALReader(nil, nil).entry(f)
|
et, flag, b, err := newWALReader(nil, nil).entry(f)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, WALEntrySeries, et)
|
require.Equal(t, WALEntrySeries, et)
|
||||||
require.Equal(t, flag, byte(walSeriesSimple))
|
require.Equal(t, flag, byte(walSeriesSimple))
|
||||||
|
@ -131,7 +131,7 @@ func TestWAL_cut(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Symmetrical test of reading and writing to the WAL via its main interface.
|
// Symmetrical test of reading and writing to the WAL via its main interface.
|
||||||
func TestWAL_Log_Restore(t *testing.T) {
|
func TestSegmentWAL_Log_Restore(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
numMetrics = 5000
|
numMetrics = 5000
|
||||||
iterations = 5
|
iterations = 5
|
||||||
|
@ -148,14 +148,14 @@ func TestWAL_Log_Restore(t *testing.T) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
recordedSeries [][]labels.Labels
|
recordedSeries [][]labels.Labels
|
||||||
recordedSamples [][]refdSample
|
recordedSamples [][]RefSample
|
||||||
)
|
)
|
||||||
var totalSamples int
|
var totalSamples int
|
||||||
|
|
||||||
// Open WAL a bunch of times, validate all previous data can be read,
|
// Open WAL a bunch of times, validate all previous data can be read,
|
||||||
// write more data to it, close it.
|
// write more data to it, close it.
|
||||||
for k := 0; k < numMetrics; k += numMetrics / iterations {
|
for k := 0; k < numMetrics; k += numMetrics / iterations {
|
||||||
w, err := OpenWAL(dir, nil, 0)
|
w, err := OpenSegmentWAL(dir, nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Set smaller segment size so we can actually write several files.
|
// Set smaller segment size so we can actually write several files.
|
||||||
|
@ -165,7 +165,7 @@ func TestWAL_Log_Restore(t *testing.T) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
resultSeries [][]labels.Labels
|
resultSeries [][]labels.Labels
|
||||||
resultSamples [][]refdSample
|
resultSamples [][]RefSample
|
||||||
)
|
)
|
||||||
|
|
||||||
for r.Next() {
|
for r.Next() {
|
||||||
|
@ -177,7 +177,7 @@ func TestWAL_Log_Restore(t *testing.T) {
|
||||||
resultSeries = append(resultSeries, clsets)
|
resultSeries = append(resultSeries, clsets)
|
||||||
}
|
}
|
||||||
if len(smpls) > 0 {
|
if len(smpls) > 0 {
|
||||||
csmpls := make([]refdSample, len(smpls))
|
csmpls := make([]RefSample, len(smpls))
|
||||||
copy(csmpls, smpls)
|
copy(csmpls, smpls)
|
||||||
resultSamples = append(resultSamples, csmpls)
|
resultSamples = append(resultSamples, csmpls)
|
||||||
}
|
}
|
||||||
|
@ -191,13 +191,13 @@ func TestWAL_Log_Restore(t *testing.T) {
|
||||||
|
|
||||||
// Insert in batches and generate different amounts of samples for each.
|
// Insert in batches and generate different amounts of samples for each.
|
||||||
for i := 0; i < len(series); i += stepSize {
|
for i := 0; i < len(series); i += stepSize {
|
||||||
var samples []refdSample
|
var samples []RefSample
|
||||||
|
|
||||||
for j := 0; j < i*10; j++ {
|
for j := 0; j < i*10; j++ {
|
||||||
samples = append(samples, refdSample{
|
samples = append(samples, RefSample{
|
||||||
ref: uint64(j % 10000),
|
Ref: uint64(j % 10000),
|
||||||
t: int64(j * 2),
|
T: int64(j * 2),
|
||||||
v: rand.Float64(),
|
V: rand.Float64(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,11 +222,11 @@ func TestWAL_Log_Restore(t *testing.T) {
|
||||||
func TestWALRestoreCorrupted(t *testing.T) {
|
func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
f func(*testing.T, *WAL)
|
f func(*testing.T, *SegmentWAL)
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "truncate_checksum",
|
name: "truncate_checksum",
|
||||||
f: func(t *testing.T, w *WAL) {
|
f: func(t *testing.T, w *SegmentWAL) {
|
||||||
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
|
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
@ -239,7 +239,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "truncate_body",
|
name: "truncate_body",
|
||||||
f: func(t *testing.T, w *WAL) {
|
f: func(t *testing.T, w *SegmentWAL) {
|
||||||
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
|
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
@ -252,7 +252,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "body_content",
|
name: "body_content",
|
||||||
f: func(t *testing.T, w *WAL) {
|
f: func(t *testing.T, w *SegmentWAL) {
|
||||||
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
|
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
@ -267,7 +267,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "checksum",
|
name: "checksum",
|
||||||
f: func(t *testing.T, w *WAL) {
|
f: func(t *testing.T, w *SegmentWAL) {
|
||||||
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
|
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
@ -289,16 +289,16 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
w, err := OpenWAL(dir, nil, 0)
|
w, err := OpenSegmentWAL(dir, nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, w.Log(nil, []refdSample{{t: 1, v: 2}}))
|
require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}}))
|
||||||
require.NoError(t, w.Log(nil, []refdSample{{t: 2, v: 3}}))
|
require.NoError(t, w.Log(nil, []RefSample{{T: 2, V: 3}}))
|
||||||
|
|
||||||
require.NoError(t, w.cut())
|
require.NoError(t, w.cut())
|
||||||
|
|
||||||
require.NoError(t, w.Log(nil, []refdSample{{t: 3, v: 4}}))
|
require.NoError(t, w.Log(nil, []RefSample{{T: 3, V: 4}}))
|
||||||
require.NoError(t, w.Log(nil, []refdSample{{t: 5, v: 6}}))
|
require.NoError(t, w.Log(nil, []RefSample{{T: 5, V: 6}}))
|
||||||
|
|
||||||
require.NoError(t, w.Close())
|
require.NoError(t, w.Close())
|
||||||
|
|
||||||
|
@ -310,7 +310,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
|
|
||||||
logger := log.NewLogfmtLogger(os.Stderr)
|
logger := log.NewLogfmtLogger(os.Stderr)
|
||||||
|
|
||||||
w2, err := OpenWAL(dir, logger, 0)
|
w2, err := OpenSegmentWAL(dir, logger, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r := w2.Reader()
|
r := w2.Reader()
|
||||||
|
@ -318,13 +318,13 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
require.True(t, r.Next())
|
require.True(t, r.Next())
|
||||||
l, s := r.At()
|
l, s := r.At()
|
||||||
require.Equal(t, 0, len(l))
|
require.Equal(t, 0, len(l))
|
||||||
require.Equal(t, []refdSample{{t: 1, v: 2}}, s)
|
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
||||||
|
|
||||||
// Truncation should happen transparently and now cause an error.
|
// Truncation should happen transparently and now cause an error.
|
||||||
require.False(t, r.Next())
|
require.False(t, r.Next())
|
||||||
require.Nil(t, r.Err())
|
require.Nil(t, r.Err())
|
||||||
|
|
||||||
require.NoError(t, w2.Log(nil, []refdSample{{t: 99, v: 100}}))
|
require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}}))
|
||||||
require.NoError(t, w2.Close())
|
require.NoError(t, w2.Close())
|
||||||
|
|
||||||
files, err := fileutil.ReadDir(dir)
|
files, err := fileutil.ReadDir(dir)
|
||||||
|
@ -333,7 +333,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
|
|
||||||
// We should see the first valid entry and the new one, everything after
|
// We should see the first valid entry and the new one, everything after
|
||||||
// is truncated.
|
// is truncated.
|
||||||
w3, err := OpenWAL(dir, logger, 0)
|
w3, err := OpenSegmentWAL(dir, logger, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r = w3.Reader()
|
r = w3.Reader()
|
||||||
|
@ -341,12 +341,12 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
require.True(t, r.Next())
|
require.True(t, r.Next())
|
||||||
l, s = r.At()
|
l, s = r.At()
|
||||||
require.Equal(t, 0, len(l))
|
require.Equal(t, 0, len(l))
|
||||||
require.Equal(t, []refdSample{{t: 1, v: 2}}, s)
|
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
||||||
|
|
||||||
require.True(t, r.Next())
|
require.True(t, r.Next())
|
||||||
l, s = r.At()
|
l, s = r.At()
|
||||||
require.Equal(t, 0, len(l))
|
require.Equal(t, 0, len(l))
|
||||||
require.Equal(t, []refdSample{{t: 99, v: 100}}, s)
|
require.Equal(t, []RefSample{{T: 99, V: 100}}, s)
|
||||||
|
|
||||||
require.False(t, r.Next())
|
require.False(t, r.Next())
|
||||||
require.Nil(t, r.Err())
|
require.Nil(t, r.Err())
|
||||||
|
|
Loading…
Reference in a new issue