mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Merge pull request #168 from prometheus/fasterwal
wal: decode and process in separate threads.
This commit is contained in:
commit
c3e502b194
114
head.go
114
head.go
|
@ -15,6 +15,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -186,30 +187,19 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadWAL initializes the head by consuming the write ahead log.
|
// processWALSamples adds a partition of samples it receives to the head and passes
|
||||||
func (h *Head) ReadWAL() error {
|
// them on to other workers.
|
||||||
defer h.postings.ensureOrder()
|
// Samples before the mint timestamp are discarded.
|
||||||
|
func (h *Head) processWALSamples(
|
||||||
|
mint int64,
|
||||||
|
partition, total uint64,
|
||||||
|
input <-chan []RefSample, output chan<- []RefSample,
|
||||||
|
) (unknownRefs uint64) {
|
||||||
|
defer close(output)
|
||||||
|
|
||||||
r := h.wal.Reader()
|
for samples := range input {
|
||||||
mint := h.MinTime()
|
|
||||||
|
|
||||||
// Track number of samples that referenced a series we don't know about
|
|
||||||
// for error reporting.
|
|
||||||
var unknownRefs int
|
|
||||||
|
|
||||||
seriesFunc := func(series []RefSeries) error {
|
|
||||||
for _, s := range series {
|
|
||||||
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
|
||||||
|
|
||||||
if h.lastSeriesID < s.Ref {
|
|
||||||
h.lastSeriesID = s.Ref
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
samplesFunc := func(samples []RefSample) error {
|
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
if s.T < mint {
|
if s.T < mint || s.Ref%total != partition {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
|
@ -223,9 +213,65 @@ func (h *Head) ReadWAL() error {
|
||||||
h.metrics.chunks.Inc()
|
h.metrics.chunks.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
output <- samples
|
||||||
}
|
}
|
||||||
deletesFunc := func(stones []Stone) error {
|
return unknownRefs
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadWAL initializes the head by consuming the write ahead log.
|
||||||
|
func (h *Head) ReadWAL() error {
|
||||||
|
defer h.postings.ensureOrder()
|
||||||
|
|
||||||
|
r := h.wal.Reader()
|
||||||
|
mint := h.MinTime()
|
||||||
|
|
||||||
|
// Track number of samples that referenced a series we don't know about
|
||||||
|
// for error reporting.
|
||||||
|
var unknownRefs uint64
|
||||||
|
|
||||||
|
// Start workers that each process samples for a partition of the series ID space.
|
||||||
|
// They are connected through a ring of channels which ensures that all sample batches
|
||||||
|
// read from the WAL are processed in order.
|
||||||
|
var (
|
||||||
|
n = runtime.GOMAXPROCS(0)
|
||||||
|
firstInput = make(chan []RefSample, 300)
|
||||||
|
input = firstInput
|
||||||
|
)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
output := make(chan []RefSample, 300)
|
||||||
|
|
||||||
|
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
|
||||||
|
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output)
|
||||||
|
atomic.AddUint64(&unknownRefs, unknown)
|
||||||
|
}(i, input, output)
|
||||||
|
|
||||||
|
// The output feeds the next worker goroutine. For the last worker,
|
||||||
|
// it feeds the initial input again to reuse the RefSample slices.
|
||||||
|
input = output
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(fabxc): series entries spread between samples can starve the sample workers.
|
||||||
|
// Even with bufferd channels, this can impact startup time with lots of series churn.
|
||||||
|
// We must not pralellize series creation itself but could make the indexing asynchronous.
|
||||||
|
seriesFunc := func(series []RefSeries) {
|
||||||
|
for _, s := range series {
|
||||||
|
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||||
|
|
||||||
|
if h.lastSeriesID < s.Ref {
|
||||||
|
h.lastSeriesID = s.Ref
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
samplesFunc := func(samples []RefSample) {
|
||||||
|
var buf []RefSample
|
||||||
|
select {
|
||||||
|
case buf = <-input:
|
||||||
|
default:
|
||||||
|
buf = make([]RefSample, 0, len(samples)*11/10)
|
||||||
|
}
|
||||||
|
firstInput <- append(buf[:0], samples...)
|
||||||
|
}
|
||||||
|
deletesFunc := func(stones []Stone) {
|
||||||
for _, s := range stones {
|
for _, s := range stones {
|
||||||
for _, itv := range s.intervals {
|
for _, itv := range s.intervals {
|
||||||
if itv.Maxt < mint {
|
if itv.Maxt < mint {
|
||||||
|
@ -234,16 +280,20 @@ func (h *Head) ReadWAL() error {
|
||||||
h.tombstones.add(s.ref, itv)
|
h.tombstones.add(s.ref, itv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := r.Read(seriesFunc, samplesFunc, deletesFunc)
|
||||||
|
|
||||||
|
// Signal termination to first worker and wait for last one to close its output channel.
|
||||||
|
close(firstInput)
|
||||||
|
for range input {
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "consume WAL")
|
||||||
|
}
|
||||||
if unknownRefs > 0 {
|
if unknownRefs > 0 {
|
||||||
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
|
||||||
return errors.Wrap(err, "consume WAL")
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1172,10 +1222,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
||||||
c = s.cut(t)
|
c = s.cut(t)
|
||||||
chunkCreated = true
|
chunkCreated = true
|
||||||
}
|
}
|
||||||
|
numSamples := c.chunk.NumSamples()
|
||||||
|
|
||||||
if c.maxTime >= t {
|
if c.maxTime >= t {
|
||||||
return false, chunkCreated
|
return false, chunkCreated
|
||||||
}
|
}
|
||||||
if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt {
|
if numSamples > samplesPerChunk/4 && t >= s.nextAt {
|
||||||
c = s.cut(t)
|
c = s.cut(t)
|
||||||
chunkCreated = true
|
chunkCreated = true
|
||||||
}
|
}
|
||||||
|
@ -1183,7 +1235,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
||||||
|
|
||||||
c.maxTime = t
|
c.maxTime = t
|
||||||
|
|
||||||
if c.chunk.NumSamples() == samplesPerChunk/4 {
|
if numSamples == samplesPerChunk/4 {
|
||||||
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
|
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
|
||||||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
|
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ func (w *memoryWAL) Reader() WALReader {
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *memoryWAL) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error {
|
func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), deletes func([]Stone)) error {
|
||||||
for _, e := range w.entries {
|
for _, e := range w.entries {
|
||||||
switch v := e.(type) {
|
switch v := e.(type) {
|
||||||
case []RefSeries:
|
case []RefSeries:
|
||||||
|
|
243
wal.go
243
wal.go
|
@ -27,16 +27,16 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/tsdb/fileutil"
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/labels"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WALEntryType indicates what data a WAL entry contains.
|
// WALEntryType indicates what data a WAL entry contains.
|
||||||
type WALEntryType byte
|
type WALEntryType uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// WALMagic is a 4 byte number every WAL segment file starts with.
|
// WALMagic is a 4 byte number every WAL segment file starts with.
|
||||||
|
@ -54,18 +54,6 @@ const (
|
||||||
WALEntryDeletes WALEntryType = 4
|
WALEntryDeletes WALEntryType = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
// SamplesCB is the callback after reading samples. The passed slice
|
|
||||||
// is only valid until the call returns.
|
|
||||||
type SamplesCB func([]RefSample) error
|
|
||||||
|
|
||||||
// SeriesCB is the callback after reading series. The passed slice
|
|
||||||
// is only valid until the call returns.
|
|
||||||
type SeriesCB func([]RefSeries) error
|
|
||||||
|
|
||||||
// DeletesCB is the callback after reading deletes. The passed slice
|
|
||||||
// is only valid until the call returns.
|
|
||||||
type DeletesCB func([]Stone) error
|
|
||||||
|
|
||||||
type walMetrics struct {
|
type walMetrics struct {
|
||||||
fsyncDuration prometheus.Summary
|
fsyncDuration prometheus.Summary
|
||||||
corruptions prometheus.Counter
|
corruptions prometheus.Counter
|
||||||
|
@ -110,17 +98,27 @@ func NopWAL() WAL {
|
||||||
|
|
||||||
type nopWAL struct{}
|
type nopWAL struct{}
|
||||||
|
|
||||||
func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil }
|
func (nopWAL) Read(
|
||||||
func (w nopWAL) Reader() WALReader { return w }
|
seriesf func([]RefSeries),
|
||||||
func (nopWAL) LogSeries([]RefSeries) error { return nil }
|
samplesf func([]RefSample),
|
||||||
func (nopWAL) LogSamples([]RefSample) error { return nil }
|
deletesf func([]Stone),
|
||||||
func (nopWAL) LogDeletes([]Stone) error { return nil }
|
) error {
|
||||||
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil }
|
return nil
|
||||||
func (nopWAL) Close() error { return nil }
|
}
|
||||||
|
func (w nopWAL) Reader() WALReader { return w }
|
||||||
|
func (nopWAL) LogSeries([]RefSeries) error { return nil }
|
||||||
|
func (nopWAL) LogSamples([]RefSample) error { return nil }
|
||||||
|
func (nopWAL) LogDeletes([]Stone) error { return nil }
|
||||||
|
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil }
|
||||||
|
func (nopWAL) Close() error { return nil }
|
||||||
|
|
||||||
// WALReader reads entries from a WAL.
|
// WALReader reads entries from a WAL.
|
||||||
type WALReader interface {
|
type WALReader interface {
|
||||||
Read(SeriesCB, SamplesCB, DeletesCB) error
|
Read(
|
||||||
|
seriesf func([]RefSeries),
|
||||||
|
samplesf func([]RefSample),
|
||||||
|
deletesf func([]Stone),
|
||||||
|
) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RefSeries is the series labels with the series ID.
|
// RefSeries is the series labels with the series ID.
|
||||||
|
@ -176,7 +174,7 @@ func newCRC32() hash.Hash32 {
|
||||||
|
|
||||||
// SegmentWAL is a write ahead log for series data.
|
// SegmentWAL is a write ahead log for series data.
|
||||||
type SegmentWAL struct {
|
type SegmentWAL struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
metrics *walMetrics
|
metrics *walMetrics
|
||||||
|
|
||||||
dirFile *os.File
|
dirFile *os.File
|
||||||
|
@ -244,12 +242,16 @@ type repairingWALReader struct {
|
||||||
r WALReader
|
r WALReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error {
|
func (r *repairingWALReader) Read(
|
||||||
err := r.r.Read(series, samples, deletes)
|
seriesf func([]RefSeries),
|
||||||
|
samplesf func([]RefSample),
|
||||||
|
deletesf func([]Stone),
|
||||||
|
) error {
|
||||||
|
err := r.r.Read(seriesf, samplesf, deletesf)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
cerr, ok := err.(walCorruptionErr)
|
cerr, ok := errors.Cause(err).(walCorruptionErr)
|
||||||
if !ok {
|
if !ok {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -343,6 +345,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
|
||||||
var (
|
var (
|
||||||
csf = newSegmentFile(f)
|
csf = newSegmentFile(f)
|
||||||
crc32 = newCRC32()
|
crc32 = newCRC32()
|
||||||
|
decSeries = []RefSeries{}
|
||||||
activeSeries = []RefSeries{}
|
activeSeries = []RefSeries{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -352,13 +355,14 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
|
||||||
if rt != WALEntrySeries {
|
if rt != WALEntrySeries {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
series, err := r.decodeSeries(flag, byt)
|
decSeries = decSeries[:0]
|
||||||
|
activeSeries = activeSeries[:0]
|
||||||
|
|
||||||
|
err := r.decodeSeries(flag, byt, &decSeries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode samples while truncating")
|
return errors.Wrap(err, "decode samples while truncating")
|
||||||
}
|
}
|
||||||
activeSeries = activeSeries[:0]
|
for _, s := range decSeries {
|
||||||
|
|
||||||
for _, s := range series {
|
|
||||||
if keep(s.Ref) {
|
if keep(s.Ref) {
|
||||||
activeSeries = append(activeSeries, s)
|
activeSeries = append(activeSeries, s)
|
||||||
}
|
}
|
||||||
|
@ -814,10 +818,6 @@ type walReader struct {
|
||||||
curBuf []byte
|
curBuf []byte
|
||||||
lastOffset int64 // offset after last successfully read entry
|
lastOffset int64 // offset after last successfully read entry
|
||||||
|
|
||||||
seriesBuf []RefSeries
|
|
||||||
sampleBuf []RefSample
|
|
||||||
tombstoneBuf []Stone
|
|
||||||
|
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -838,70 +838,118 @@ func (r *walReader) Err() error {
|
||||||
return r.err
|
return r.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error {
|
func (r *walReader) Read(
|
||||||
if seriesf == nil {
|
seriesf func([]RefSeries),
|
||||||
seriesf = func([]RefSeries) error { return nil }
|
samplesf func([]RefSample),
|
||||||
}
|
deletesf func([]Stone),
|
||||||
if samplesf == nil {
|
) error {
|
||||||
samplesf = func([]RefSample) error { return nil }
|
// Concurrency for replaying the WAL is very limited. We at least split out decoding and
|
||||||
}
|
// processing into separate threads.
|
||||||
if deletesf == nil {
|
// Historically, the processing is the bottleneck with reading and decoding using only
|
||||||
deletesf = func([]Stone) error { return nil }
|
// 15% of the CPU.
|
||||||
}
|
var (
|
||||||
|
seriesPool sync.Pool
|
||||||
|
samplePool sync.Pool
|
||||||
|
deletePool sync.Pool
|
||||||
|
)
|
||||||
|
donec := make(chan struct{})
|
||||||
|
datac := make(chan interface{}, 100)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(donec)
|
||||||
|
|
||||||
|
for x := range datac {
|
||||||
|
switch v := x.(type) {
|
||||||
|
case []RefSeries:
|
||||||
|
if seriesf != nil {
|
||||||
|
seriesf(v)
|
||||||
|
}
|
||||||
|
seriesPool.Put(v[:0])
|
||||||
|
case []RefSample:
|
||||||
|
if samplesf != nil {
|
||||||
|
samplesf(v)
|
||||||
|
}
|
||||||
|
samplePool.Put(v[:0])
|
||||||
|
case []Stone:
|
||||||
|
if deletesf != nil {
|
||||||
|
deletesf(v)
|
||||||
|
}
|
||||||
|
deletePool.Put(v[:0])
|
||||||
|
default:
|
||||||
|
level.Error(r.logger).Log("msg", "unexpected data type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
for r.next() {
|
for r.next() {
|
||||||
et, flag, b := r.at()
|
et, flag, b := r.at()
|
||||||
|
|
||||||
// In decoding below we never return a walCorruptionErr for now.
|
// In decoding below we never return a walCorruptionErr for now.
|
||||||
// Those should generally be catched by entry decoding before.
|
// Those should generally be catched by entry decoding before.
|
||||||
switch et {
|
switch et {
|
||||||
case WALEntrySeries:
|
case WALEntrySeries:
|
||||||
series, err := r.decodeSeries(flag, b)
|
var series []RefSeries
|
||||||
|
if v := seriesPool.Get(); v == nil {
|
||||||
|
series = make([]RefSeries, 0, 512)
|
||||||
|
} else {
|
||||||
|
series = v.([]RefSeries)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.decodeSeries(flag, b, &series)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode series entry")
|
err = errors.Wrap(err, "decode series entry")
|
||||||
}
|
break
|
||||||
if err := seriesf(series); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
datac <- series
|
||||||
|
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
if cf.minSeries > s.Ref {
|
if cf.minSeries > s.Ref {
|
||||||
cf.minSeries = s.Ref
|
cf.minSeries = s.Ref
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case WALEntrySamples:
|
case WALEntrySamples:
|
||||||
samples, err := r.decodeSamples(flag, b)
|
var samples []RefSample
|
||||||
|
if v := samplePool.Get(); v == nil {
|
||||||
|
samples = make([]RefSample, 0, 512)
|
||||||
|
} else {
|
||||||
|
samples = v.([]RefSample)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.decodeSamples(flag, b, &samples)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode samples entry")
|
err = errors.Wrap(err, "decode samples entry")
|
||||||
}
|
break
|
||||||
if err := samplesf(samples); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
datac <- samples
|
||||||
|
|
||||||
// Update the times for the WAL segment file.
|
// Update the times for the WAL segment file.
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
if cf.maxTime < s.T {
|
if cf.maxTime < s.T {
|
||||||
cf.maxTime = s.T
|
cf.maxTime = s.T
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case WALEntryDeletes:
|
case WALEntryDeletes:
|
||||||
stones, err := r.decodeDeletes(flag, b)
|
var deletes []Stone
|
||||||
|
if v := deletePool.Get(); v == nil {
|
||||||
|
deletes = make([]Stone, 0, 512)
|
||||||
|
} else {
|
||||||
|
deletes = v.([]Stone)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.decodeDeletes(flag, b, &deletes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode delete entry")
|
err = errors.Wrap(err, "decode delete entry")
|
||||||
}
|
break
|
||||||
if err := deletesf(stones); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
datac <- deletes
|
||||||
|
|
||||||
// Update the times for the WAL segment file.
|
// Update the times for the WAL segment file.
|
||||||
|
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
for _, s := range deletes {
|
||||||
for _, s := range stones {
|
|
||||||
for _, iv := range s.intervals {
|
for _, iv := range s.intervals {
|
||||||
if cf.maxTime < iv.Maxt {
|
if cf.maxTime < iv.Maxt {
|
||||||
cf.maxTime = iv.Maxt
|
cf.maxTime = iv.Maxt
|
||||||
|
@ -910,27 +958,16 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
close(datac)
|
||||||
|
<-donec
|
||||||
|
|
||||||
return r.Err()
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
|
||||||
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
|
||||||
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
|
||||||
if r.cur >= len(r.files) {
|
|
||||||
return 0, 0, nil, io.EOF
|
|
||||||
}
|
}
|
||||||
cf := r.current()
|
if r.Err() != nil {
|
||||||
|
return errors.Wrap(r.Err(), "read entry")
|
||||||
et, flag, b, err := r.entry(cf)
|
|
||||||
// If we reached the end of the reader, advance to the next one and close.
|
|
||||||
// Do not close on the last one as it will still be appended to.
|
|
||||||
if err == io.EOF && r.cur < len(r.files)-1 {
|
|
||||||
// Current reader completed. Leave the file open for later reads
|
|
||||||
// for truncating.
|
|
||||||
r.cur++
|
|
||||||
return r.nextEntry()
|
|
||||||
}
|
}
|
||||||
return et, flag, b, err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) at() (WALEntryType, byte, []byte) {
|
func (r *walReader) at() (WALEntryType, byte, []byte) {
|
||||||
|
@ -1050,9 +1087,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
||||||
return etype, flag, buf, nil
|
return etype, flag, buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
|
func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error {
|
||||||
r.seriesBuf = r.seriesBuf[:0]
|
|
||||||
|
|
||||||
dec := decbuf{b: b}
|
dec := decbuf{b: b}
|
||||||
|
|
||||||
for len(dec.b) > 0 && dec.err() == nil {
|
for len(dec.b) > 0 && dec.err() == nil {
|
||||||
|
@ -1066,25 +1101,24 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
|
||||||
}
|
}
|
||||||
sort.Sort(lset)
|
sort.Sort(lset)
|
||||||
|
|
||||||
r.seriesBuf = append(r.seriesBuf, RefSeries{
|
*res = append(*res, RefSeries{
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
Labels: lset,
|
Labels: lset,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if dec.err() != nil {
|
if dec.err() != nil {
|
||||||
return nil, dec.err()
|
return dec.err()
|
||||||
}
|
}
|
||||||
if len(dec.b) > 0 {
|
if len(dec.b) > 0 {
|
||||||
return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||||
}
|
}
|
||||||
return r.seriesBuf, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error {
|
||||||
if len(b) == 0 {
|
if len(b) == 0 {
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
r.sampleBuf = r.sampleBuf[:0]
|
|
||||||
dec := decbuf{b: b}
|
dec := decbuf{b: b}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -1097,7 +1131,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
||||||
dtime := dec.varint64()
|
dtime := dec.varint64()
|
||||||
val := dec.be64()
|
val := dec.be64()
|
||||||
|
|
||||||
r.sampleBuf = append(r.sampleBuf, RefSample{
|
*res = append(*res, RefSample{
|
||||||
Ref: uint64(int64(baseRef) + dref),
|
Ref: uint64(int64(baseRef) + dref),
|
||||||
T: baseTime + dtime,
|
T: baseTime + dtime,
|
||||||
V: math.Float64frombits(val),
|
V: math.Float64frombits(val),
|
||||||
|
@ -1105,20 +1139,19 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if dec.err() != nil {
|
if dec.err() != nil {
|
||||||
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf))
|
return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res))
|
||||||
}
|
}
|
||||||
if len(dec.b) > 0 {
|
if len(dec.b) > 0 {
|
||||||
return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||||
}
|
}
|
||||||
return r.sampleBuf, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
|
func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
|
||||||
dec := &decbuf{b: b}
|
dec := &decbuf{b: b}
|
||||||
r.tombstoneBuf = r.tombstoneBuf[:0]
|
|
||||||
|
|
||||||
for dec.len() > 0 && dec.err() == nil {
|
for dec.len() > 0 && dec.err() == nil {
|
||||||
r.tombstoneBuf = append(r.tombstoneBuf, Stone{
|
*res = append(*res, Stone{
|
||||||
ref: dec.be64(),
|
ref: dec.be64(),
|
||||||
intervals: Intervals{
|
intervals: Intervals{
|
||||||
{Mint: dec.varint64(), Maxt: dec.varint64()},
|
{Mint: dec.varint64(), Maxt: dec.varint64()},
|
||||||
|
@ -1126,10 +1159,10 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if dec.err() != nil {
|
if dec.err() != nil {
|
||||||
return nil, dec.err()
|
return dec.err()
|
||||||
}
|
}
|
||||||
if len(dec.b) > 0 {
|
if len(dec.b) > 0 {
|
||||||
return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||||
}
|
}
|
||||||
return r.tombstoneBuf, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
27
wal_test.go
27
wal_test.go
|
@ -187,9 +187,8 @@ func TestSegmentWAL_Truncate(t *testing.T) {
|
||||||
var readSeries []RefSeries
|
var readSeries []RefSeries
|
||||||
r := w.Reader()
|
r := w.Reader()
|
||||||
|
|
||||||
r.Read(func(s []RefSeries) error {
|
r.Read(func(s []RefSeries) {
|
||||||
readSeries = append(readSeries, s...)
|
readSeries = append(readSeries, s...)
|
||||||
return nil
|
|
||||||
}, nil, nil)
|
}, nil, nil)
|
||||||
|
|
||||||
require.Equal(t, expected, readSeries)
|
require.Equal(t, expected, readSeries)
|
||||||
|
@ -235,33 +234,27 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
||||||
resultDeletes [][]Stone
|
resultDeletes [][]Stone
|
||||||
)
|
)
|
||||||
|
|
||||||
serf := func(series []RefSeries) error {
|
serf := func(series []RefSeries) {
|
||||||
if len(series) > 0 {
|
if len(series) > 0 {
|
||||||
clsets := make([]RefSeries, len(series))
|
clsets := make([]RefSeries, len(series))
|
||||||
copy(clsets, series)
|
copy(clsets, series)
|
||||||
resultSeries = append(resultSeries, clsets)
|
resultSeries = append(resultSeries, clsets)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
smplf := func(smpls []RefSample) error {
|
smplf := func(smpls []RefSample) {
|
||||||
if len(smpls) > 0 {
|
if len(smpls) > 0 {
|
||||||
csmpls := make([]RefSample, len(smpls))
|
csmpls := make([]RefSample, len(smpls))
|
||||||
copy(csmpls, smpls)
|
copy(csmpls, smpls)
|
||||||
resultSamples = append(resultSamples, csmpls)
|
resultSamples = append(resultSamples, csmpls)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
delf := func(stones []Stone) error {
|
delf := func(stones []Stone) {
|
||||||
if len(stones) > 0 {
|
if len(stones) > 0 {
|
||||||
cst := make([]Stone, len(stones))
|
cst := make([]Stone, len(stones))
|
||||||
copy(cst, stones)
|
copy(cst, stones)
|
||||||
resultDeletes = append(resultDeletes, cst)
|
resultDeletes = append(resultDeletes, cst)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Read(serf, smplf, delf))
|
require.NoError(t, r.Read(serf, smplf, delf))
|
||||||
|
@ -420,26 +413,22 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
|
|
||||||
r := w2.Reader()
|
r := w2.Reader()
|
||||||
|
|
||||||
serf := func(l []RefSeries) error {
|
serf := func(l []RefSeries) {
|
||||||
require.Equal(t, 0, len(l))
|
require.Equal(t, 0, len(l))
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
delf := func([]Stone) error { return nil }
|
|
||||||
|
|
||||||
// Weird hack to check order of reads.
|
// Weird hack to check order of reads.
|
||||||
i := 0
|
i := 0
|
||||||
samplf := func(s []RefSample) error {
|
samplf := func(s []RefSample) {
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
||||||
i++
|
i++
|
||||||
} else {
|
} else {
|
||||||
require.Equal(t, []RefSample{{T: 99, V: 100}}, s)
|
require.Equal(t, []RefSample{{T: 99, V: 100}}, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Read(serf, samplf, delf))
|
require.NoError(t, r.Read(serf, samplf, nil))
|
||||||
|
|
||||||
require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}}))
|
require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}}))
|
||||||
require.NoError(t, w2.Close())
|
require.NoError(t, w2.Close())
|
||||||
|
@ -452,7 +441,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
r = w3.Reader()
|
r = w3.Reader()
|
||||||
|
|
||||||
i = 0
|
i = 0
|
||||||
require.NoError(t, r.Read(serf, samplf, delf))
|
require.NoError(t, r.Read(serf, samplf, nil))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue