mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Integrate new WAL and checkpoints
Remove the old WAL and drop in the new one Signed-off-by: Fabian Reinartz <freinartz@google.com>
This commit is contained in:
parent
008399a6e0
commit
def912ce0e
|
@ -15,7 +15,6 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -82,7 +81,7 @@ func TestDeleteCheckpoints(t *testing.T) {
|
||||||
func TestCheckpoint(t *testing.T) {
|
func TestCheckpoint(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "test_checkpoint")
|
dir, err := ioutil.TempDir("", "test_checkpoint")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
fmt.Println(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
var enc RecordEncoder
|
var enc RecordEncoder
|
||||||
// Create a dummy segment to bump the initial number.
|
// Create a dummy segment to bump the initial number.
|
||||||
|
@ -138,11 +137,10 @@ func TestCheckpoint(t *testing.T) {
|
||||||
}
|
}
|
||||||
testutil.Ok(t, w.Close())
|
testutil.Ok(t, w.Close())
|
||||||
|
|
||||||
stats, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool {
|
_, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool {
|
||||||
return x%2 == 0
|
return x%2 == 0
|
||||||
}, last/2)
|
}, last/2)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Equals(t, 106, stats.HighSegment)
|
|
||||||
|
|
||||||
// Only the new checkpoint should be left.
|
// Only the new checkpoint should be left.
|
||||||
files, err := fileutil.ReadDir(dir)
|
files, err := fileutil.ReadDir(dir)
|
||||||
|
|
7
db.go
7
db.go
|
@ -37,6 +37,7 @@ import (
|
||||||
"github.com/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/tsdb/fileutil"
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
|
"github.com/prometheus/tsdb/wal"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -221,18 +222,18 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
return nil, errors.Wrap(err, "create leveled compactor")
|
return nil, errors.Wrap(err, "create leveled compactor")
|
||||||
}
|
}
|
||||||
|
|
||||||
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r)
|
wlog, err := wal.New(l, r, filepath.Join(dir, "wal"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
db.head, err = NewHead(r, l, wal, opts.BlockRanges[0])
|
db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := db.reload(); err != nil {
|
if err := db.reload(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := db.head.ReadWAL(); err != nil {
|
if err := db.head.Init(); err != nil {
|
||||||
return nil, errors.Wrap(err, "read WAL")
|
return nil, errors.Wrap(err, "read WAL")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
270
head.go
270
head.go
|
@ -15,6 +15,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -30,6 +31,7 @@ import (
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/tsdb/index"
|
"github.com/prometheus/tsdb/index"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
|
"github.com/prometheus/tsdb/wal"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -53,9 +55,10 @@ var (
|
||||||
type Head struct {
|
type Head struct {
|
||||||
chunkRange int64
|
chunkRange int64
|
||||||
metrics *headMetrics
|
metrics *headMetrics
|
||||||
wal WAL
|
wal *wal.WAL
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
appendPool sync.Pool
|
appendPool sync.Pool
|
||||||
|
bytesPool sync.Pool
|
||||||
|
|
||||||
minTime, maxTime int64
|
minTime, maxTime int64
|
||||||
lastSeriesID uint64
|
lastSeriesID uint64
|
||||||
|
@ -169,13 +172,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHead opens the head block in dir.
|
// NewHead opens the head block in dir.
|
||||||
func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) {
|
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) {
|
||||||
if l == nil {
|
if l == nil {
|
||||||
l = log.NewNopLogger()
|
l = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
if wal == nil {
|
|
||||||
wal = NopWAL()
|
|
||||||
}
|
|
||||||
if chunkRange < 1 {
|
if chunkRange < 1 {
|
||||||
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
|
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
|
||||||
}
|
}
|
||||||
|
@ -206,6 +206,8 @@ func (h *Head) processWALSamples(
|
||||||
) (unknownRefs uint64) {
|
) (unknownRefs uint64) {
|
||||||
defer close(output)
|
defer close(output)
|
||||||
|
|
||||||
|
maxt := h.MaxTime()
|
||||||
|
|
||||||
for samples := range input {
|
for samples := range input {
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
if s.T < mint || s.Ref%total != partition {
|
if s.T < mint || s.Ref%total != partition {
|
||||||
|
@ -221,17 +223,27 @@ func (h *Head) processWALSamples(
|
||||||
h.metrics.chunksCreated.Inc()
|
h.metrics.chunksCreated.Inc()
|
||||||
h.metrics.chunks.Inc()
|
h.metrics.chunks.Inc()
|
||||||
}
|
}
|
||||||
|
if s.T > maxt {
|
||||||
|
maxt = s.T
|
||||||
|
}
|
||||||
}
|
}
|
||||||
output <- samples
|
output <- samples
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
ht := h.MaxTime()
|
||||||
|
if maxt <= ht {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return unknownRefs
|
return unknownRefs
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadWAL initializes the head by consuming the write ahead log.
|
func (h *Head) loadWAL(r *wal.Reader) error {
|
||||||
func (h *Head) ReadWAL() error {
|
|
||||||
defer h.postings.EnsureOrder()
|
|
||||||
|
|
||||||
r := h.wal.Reader()
|
|
||||||
mint := h.MinTime()
|
mint := h.MinTime()
|
||||||
|
|
||||||
// Track number of samples that referenced a series we don't know about
|
// Track number of samples that referenced a series we don't know about
|
||||||
|
@ -263,49 +275,71 @@ func (h *Head) ReadWAL() error {
|
||||||
input = output
|
input = output
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(fabxc): series entries spread between samples can starve the sample workers.
|
var (
|
||||||
// Even with bufferd channels, this can impact startup time with lots of series churn.
|
dec RecordDecoder
|
||||||
// We must not paralellize series creation itself but could make the indexing asynchronous.
|
series []RefSeries
|
||||||
seriesFunc := func(series []RefSeries) {
|
samples []RefSample
|
||||||
for _, s := range series {
|
tstones []Stone
|
||||||
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
)
|
||||||
|
for r.Next() {
|
||||||
|
series, samples, tstones = series[:0], samples[:0], tstones[:0]
|
||||||
|
rec := r.Record()
|
||||||
|
|
||||||
if h.lastSeriesID < s.Ref {
|
switch dec.Type(rec) {
|
||||||
h.lastSeriesID = s.Ref
|
case RecordSeries:
|
||||||
|
series, err := dec.Series(rec, series)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "decode series")
|
||||||
}
|
}
|
||||||
}
|
for _, s := range series {
|
||||||
}
|
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||||
samplesFunc := func(samples []RefSample) {
|
|
||||||
// We split up the samples into chunks of 5000 samples or less.
|
if h.lastSeriesID < s.Ref {
|
||||||
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
h.lastSeriesID = s.Ref
|
||||||
// cause thousands of very large in flight buffers occupying large amounts
|
|
||||||
// of unused memory.
|
|
||||||
for len(samples) > 0 {
|
|
||||||
n := 5000
|
|
||||||
if len(samples) < n {
|
|
||||||
n = len(samples)
|
|
||||||
}
|
|
||||||
var buf []RefSample
|
|
||||||
select {
|
|
||||||
case buf = <-input:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
firstInput <- append(buf[:0], samples[:n]...)
|
|
||||||
samples = samples[n:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
deletesFunc := func(stones []Stone) {
|
|
||||||
for _, s := range stones {
|
|
||||||
for _, itv := range s.intervals {
|
|
||||||
if itv.Maxt < mint {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
h.tombstones.addInterval(s.ref, itv)
|
|
||||||
}
|
}
|
||||||
|
case RecordSamples:
|
||||||
|
samples, err := dec.Samples(rec, samples)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "decode samples")
|
||||||
|
}
|
||||||
|
// We split up the samples into chunks of 5000 samples or less.
|
||||||
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
|
||||||
|
// cause thousands of very large in flight buffers occupying large amounts
|
||||||
|
// of unused memory.
|
||||||
|
for len(samples) > 0 {
|
||||||
|
n := 5000
|
||||||
|
if len(samples) < n {
|
||||||
|
n = len(samples)
|
||||||
|
}
|
||||||
|
var buf []RefSample
|
||||||
|
select {
|
||||||
|
case buf = <-input:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
firstInput <- append(buf[:0], samples[:n]...)
|
||||||
|
samples = samples[n:]
|
||||||
|
}
|
||||||
|
case RecordTombstones:
|
||||||
|
tstones, err := dec.Tombstones(rec, tstones)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "decode tombstones")
|
||||||
|
}
|
||||||
|
for _, s := range tstones {
|
||||||
|
for _, itv := range s.intervals {
|
||||||
|
if itv.Maxt < mint {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
h.tombstones.addInterval(s.ref, itv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return errors.Errorf("invalid record type %v", dec.Type(rec))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if r.Err() != nil {
|
||||||
err := r.Read(seriesFunc, samplesFunc, deletesFunc)
|
return errors.Wrap(r.Err(), "read records")
|
||||||
|
}
|
||||||
|
|
||||||
// Signal termination to first worker and wait for last one to close its output channel.
|
// Signal termination to first worker and wait for last one to close its output channel.
|
||||||
close(firstInput)
|
close(firstInput)
|
||||||
|
@ -313,16 +347,58 @@ func (h *Head) ReadWAL() error {
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "consume WAL")
|
|
||||||
}
|
|
||||||
if unknownRefs > 0 {
|
if unknownRefs > 0 {
|
||||||
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Truncate removes all data before mint from the head block and truncates its WAL.
|
// Init loads data from the write ahead log and prepares the head for writes.
|
||||||
|
func (h *Head) Init() error {
|
||||||
|
defer h.postings.EnsureOrder()
|
||||||
|
|
||||||
|
if h.wal == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backfill the checkpoint first if it exists.
|
||||||
|
cp, n, err := LastCheckpoint(h.wal.Dir())
|
||||||
|
if err != nil && err != ErrNotFound {
|
||||||
|
return errors.Wrap(err, "find last checkpoint")
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "open checkpoint")
|
||||||
|
}
|
||||||
|
defer sr.Close()
|
||||||
|
|
||||||
|
// A corrupted checkpoint is a hard error for now and requires user
|
||||||
|
// intervention. There's likely little data that can be recovered anyway.
|
||||||
|
if err := h.loadWAL(wal.NewReader(sr)); err != nil {
|
||||||
|
return errors.Wrap(err, "backfill checkpoint")
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backfill segments from the last checkpoint onwards
|
||||||
|
sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "open WAL segments")
|
||||||
|
}
|
||||||
|
defer sr.Close()
|
||||||
|
|
||||||
|
err = h.loadWAL(wal.NewReader(sr))
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := h.wal.Repair(err); err != nil {
|
||||||
|
return errors.Wrap(err, "repair corrupted WAL")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Truncate removes old data before mint from the head.
|
||||||
func (h *Head) Truncate(mint int64) error {
|
func (h *Head) Truncate(mint int64) error {
|
||||||
initialize := h.MinTime() == math.MinInt64
|
initialize := h.MinTime() == math.MinInt64
|
||||||
|
|
||||||
|
@ -348,18 +424,37 @@ func (h *Head) Truncate(mint int64) error {
|
||||||
level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
|
level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
|
||||||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||||||
|
|
||||||
|
if h.wal == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
|
|
||||||
|
m, n, err := h.wal.Segments()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "get segment range")
|
||||||
|
}
|
||||||
|
n-- // Never consider last segment for checkpoint.
|
||||||
|
if n < 0 {
|
||||||
|
return nil // no segments yet.
|
||||||
|
}
|
||||||
|
// The lower third of segments should contain mostly obsolete samples.
|
||||||
|
// If we have too few segments, it's not worth checkpointing yet.
|
||||||
|
n = m + (n-m)/3
|
||||||
|
if n <= m {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
keep := func(id uint64) bool {
|
keep := func(id uint64) bool {
|
||||||
return h.series.getByID(id) != nil
|
return h.series.getByID(id) != nil
|
||||||
}
|
}
|
||||||
if err := h.wal.Truncate(mint, keep); err == nil {
|
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil {
|
||||||
level.Info(h.logger).Log("msg", "WAL truncation completed", "duration", time.Since(start))
|
return errors.Wrap(err, "create checkpoint")
|
||||||
} else {
|
|
||||||
level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
|
|
||||||
}
|
}
|
||||||
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
|
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
|
||||||
|
|
||||||
|
level.Info(h.logger).Log("msg", "WAL checkpoint complete",
|
||||||
|
"low", m, "high", n, "duration", time.Since(start))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -468,6 +563,18 @@ func (h *Head) putAppendBuffer(b []RefSample) {
|
||||||
h.appendPool.Put(b[:0])
|
h.appendPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Head) getBytesBuffer() []byte {
|
||||||
|
b := h.bytesPool.Get()
|
||||||
|
if b == nil {
|
||||||
|
return make([]byte, 0, 1024)
|
||||||
|
}
|
||||||
|
return b.([]byte)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Head) putBytesBuffer(b []byte) {
|
||||||
|
h.bytesPool.Put(b[:0])
|
||||||
|
}
|
||||||
|
|
||||||
type headAppender struct {
|
type headAppender struct {
|
||||||
head *Head
|
head *Head
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
@ -520,15 +627,42 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *headAppender) log() error {
|
||||||
|
if a.head.wal == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := a.head.getBytesBuffer()
|
||||||
|
defer func() { a.head.putBytesBuffer(buf) }()
|
||||||
|
|
||||||
|
var rec []byte
|
||||||
|
var enc RecordEncoder
|
||||||
|
|
||||||
|
if len(a.series) > 0 {
|
||||||
|
rec = enc.Series(a.series, buf)
|
||||||
|
buf = rec[:0]
|
||||||
|
|
||||||
|
if err := a.head.wal.Log(rec); err != nil {
|
||||||
|
return errors.Wrap(err, "log series")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(a.samples) > 0 {
|
||||||
|
rec = enc.Samples(a.samples, buf)
|
||||||
|
buf = rec[:0]
|
||||||
|
|
||||||
|
if err := a.head.wal.Log(rec); err != nil {
|
||||||
|
return errors.Wrap(err, "log samples")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *headAppender) Commit() error {
|
func (a *headAppender) Commit() error {
|
||||||
defer a.head.metrics.activeAppenders.Dec()
|
defer a.head.metrics.activeAppenders.Dec()
|
||||||
defer a.head.putAppendBuffer(a.samples)
|
defer a.head.putAppendBuffer(a.samples)
|
||||||
|
|
||||||
if err := a.head.wal.LogSeries(a.series); err != nil {
|
if err := a.log(); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "write to WAL")
|
||||||
}
|
|
||||||
if err := a.head.wal.LogSamples(a.samples); err != nil {
|
|
||||||
return errors.Wrap(err, "WAL log samples")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
total := len(a.samples)
|
total := len(a.samples)
|
||||||
|
@ -568,7 +702,8 @@ func (a *headAppender) Rollback() error {
|
||||||
|
|
||||||
// Series are created in the head memory regardless of rollback. Thus we have
|
// Series are created in the head memory regardless of rollback. Thus we have
|
||||||
// to log them to the WAL in any case.
|
// to log them to the WAL in any case.
|
||||||
return a.head.wal.LogSeries(a.series)
|
a.samples = nil
|
||||||
|
return a.log()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
||||||
|
@ -601,8 +736,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
if p.Err() != nil {
|
if p.Err() != nil {
|
||||||
return p.Err()
|
return p.Err()
|
||||||
}
|
}
|
||||||
if err := h.wal.LogDeletes(stones); err != nil {
|
var enc RecordEncoder
|
||||||
return err
|
|
||||||
|
if h.wal != nil {
|
||||||
|
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, s := range stones {
|
for _, s := range stones {
|
||||||
h.tombstones.addInterval(s.ref, s.intervals[0])
|
h.tombstones.addInterval(s.ref, s.intervals[0])
|
||||||
|
@ -694,6 +833,9 @@ func (h *Head) MaxTime() int64 {
|
||||||
|
|
||||||
// Close flushes the WAL and closes the head.
|
// Close flushes the WAL and closes the head.
|
||||||
func (h *Head) Close() error {
|
func (h *Head) Close() error {
|
||||||
|
if h.wal == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return h.wal.Close()
|
return h.wal.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
116
head_test.go
116
head_test.go
|
@ -14,7 +14,9 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/tsdb/chunkenc"
|
||||||
|
@ -22,6 +24,7 @@ import (
|
||||||
"github.com/prometheus/tsdb/index"
|
"github.com/prometheus/tsdb/index"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
"github.com/prometheus/tsdb/testutil"
|
"github.com/prometheus/tsdb/testutil"
|
||||||
|
"github.com/prometheus/tsdb/wal"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkCreateSeries(b *testing.B) {
|
func BenchmarkCreateSeries(b *testing.B) {
|
||||||
|
@ -42,42 +45,50 @@ func BenchmarkCreateSeries(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type memoryWAL struct {
|
func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
|
||||||
nopWAL
|
var enc RecordEncoder
|
||||||
entries []interface{}
|
for _, r := range recs {
|
||||||
}
|
switch v := r.(type) {
|
||||||
|
|
||||||
func (w *memoryWAL) LogSeries(s []RefSeries) error {
|
|
||||||
w.entries = append(w.entries, s)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *memoryWAL) LogSamples(s []RefSample) error {
|
|
||||||
w.entries = append(w.entries, s)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *memoryWAL) LogDeletes(s []Stone) error {
|
|
||||||
w.entries = append(w.entries, s)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *memoryWAL) Reader() WALReader {
|
|
||||||
return w
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), deletes func([]Stone)) error {
|
|
||||||
for _, e := range w.entries {
|
|
||||||
switch v := e.(type) {
|
|
||||||
case []RefSeries:
|
case []RefSeries:
|
||||||
series(v)
|
testutil.Ok(t, w.Log(enc.Series(v, nil)))
|
||||||
case []RefSample:
|
case []RefSample:
|
||||||
samples(v)
|
testutil.Ok(t, w.Log(enc.Samples(v, nil)))
|
||||||
case []Stone:
|
case []Stone:
|
||||||
deletes(v)
|
testutil.Ok(t, w.Log(enc.Tombstones(v, nil)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
}
|
||||||
|
|
||||||
|
func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
|
||||||
|
sr, err := wal.NewSegmentsReader(dir)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer sr.Close()
|
||||||
|
|
||||||
|
var dec RecordDecoder
|
||||||
|
r := wal.NewReader(sr)
|
||||||
|
|
||||||
|
for r.Next() {
|
||||||
|
rec := r.Record()
|
||||||
|
|
||||||
|
switch dec.Type(rec) {
|
||||||
|
case RecordSeries:
|
||||||
|
series, err := dec.Series(rec, nil)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
recs = append(recs, series)
|
||||||
|
case RecordSamples:
|
||||||
|
samples, err := dec.Samples(rec, nil)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
recs = append(recs, samples)
|
||||||
|
case RecordTombstones:
|
||||||
|
tstones, err := dec.Tombstones(rec, nil)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
recs = append(recs, tstones)
|
||||||
|
default:
|
||||||
|
t.Fatalf("unknown record type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
testutil.Ok(t, r.Err())
|
||||||
|
return recs
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_ReadWAL(t *testing.T) {
|
func TestHead_ReadWAL(t *testing.T) {
|
||||||
|
@ -100,13 +111,19 @@ func TestHead_ReadWAL(t *testing.T) {
|
||||||
{Ref: 50, T: 101, V: 6},
|
{Ref: 50, T: 101, V: 6},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
wal := &memoryWAL{entries: entries}
|
dir, err := ioutil.TempDir("", "test_read_wal")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
head, err := NewHead(nil, nil, wal, 1000)
|
w, err := wal.New(nil, nil, dir)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
populateTestWAL(t, w, entries)
|
||||||
|
|
||||||
|
head, err := NewHead(nil, nil, w, 1000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer head.Close()
|
defer head.Close()
|
||||||
|
|
||||||
testutil.Ok(t, head.ReadWAL())
|
testutil.Ok(t, head.Init())
|
||||||
testutil.Equals(t, uint64(100), head.lastSeriesID)
|
testutil.Equals(t, uint64(100), head.lastSeriesID)
|
||||||
|
|
||||||
s10 := head.series.getByID(10)
|
s10 := head.series.getByID(10)
|
||||||
|
@ -259,13 +276,19 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
||||||
{Ref: 50, T: 90, V: 1},
|
{Ref: 50, T: 90, V: 1},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
wal := &memoryWAL{entries: entries}
|
dir, err := ioutil.TempDir("", "test_delete_series")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
head, err := NewHead(nil, nil, wal, 1000)
|
w, err := wal.New(nil, nil, dir)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
populateTestWAL(t, w, entries)
|
||||||
|
|
||||||
|
head, err := NewHead(nil, nil, w, 1000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer head.Close()
|
defer head.Close()
|
||||||
|
|
||||||
testutil.Ok(t, head.ReadWAL())
|
testutil.Ok(t, head.Init())
|
||||||
|
|
||||||
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
|
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
|
||||||
}
|
}
|
||||||
|
@ -705,7 +728,7 @@ func TestMemSeries_append(t *testing.T) {
|
||||||
|
|
||||||
func TestGCChunkAccess(t *testing.T) {
|
func TestGCChunkAccess(t *testing.T) {
|
||||||
// Put a chunk, select it. GC it and then access it.
|
// Put a chunk, select it. GC it and then access it.
|
||||||
h, err := NewHead(nil, nil, NopWAL(), 1000)
|
h, err := NewHead(nil, nil, nil, 1000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer h.Close()
|
defer h.Close()
|
||||||
|
|
||||||
|
@ -745,7 +768,7 @@ func TestGCChunkAccess(t *testing.T) {
|
||||||
|
|
||||||
func TestGCSeriesAccess(t *testing.T) {
|
func TestGCSeriesAccess(t *testing.T) {
|
||||||
// Put a series, select it. GC it and then access it.
|
// Put a series, select it. GC it and then access it.
|
||||||
h, err := NewHead(nil, nil, NopWAL(), 1000)
|
h, err := NewHead(nil, nil, nil, 1000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer h.Close()
|
defer h.Close()
|
||||||
|
|
||||||
|
@ -786,7 +809,12 @@ func TestGCSeriesAccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_LogRollback(t *testing.T) {
|
func TestHead_LogRollback(t *testing.T) {
|
||||||
w := &memoryWAL{}
|
dir, err := ioutil.TempDir("", "wal_rollback")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
w, err := wal.New(nil, nil, dir)
|
||||||
|
testutil.Ok(t, err)
|
||||||
h, err := NewHead(nil, nil, w, 1000)
|
h, err := NewHead(nil, nil, w, 1000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
@ -795,9 +823,11 @@ func TestHead_LogRollback(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
testutil.Ok(t, app.Rollback())
|
testutil.Ok(t, app.Rollback())
|
||||||
testutil.Equals(t, 1, len(w.entries))
|
recs := readTestWAL(t, w.Dir())
|
||||||
|
|
||||||
series, ok := w.entries[0].([]RefSeries)
|
testutil.Equals(t, 1, len(recs))
|
||||||
testutil.Assert(t, ok, "expected series record but got %+v", w.entries[0])
|
|
||||||
|
series, ok := recs[0].([]RefSeries)
|
||||||
|
testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
|
||||||
testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}})
|
testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue