Merge pull request #440 from prometheus/wal-reading

Improve WAL reading
This commit is contained in:
Brian Brazil 2018-11-14 13:59:41 +00:00 committed by GitHub
commit 10632217ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

94
head.go
View file

@ -237,22 +237,28 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
// Samples before the mint timestamp are discarded. // Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples( func (h *Head) processWALSamples(
minValidTime int64, minValidTime int64,
partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample, input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) { ) (unknownRefs uint64) {
defer close(output) defer close(output)
// Mitigate lock contention in getByID.
refSeries := map[uint64]*memSeries{}
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input { for samples := range input {
for _, s := range samples { for _, s := range samples {
if s.T < minValidTime || s.Ref%total != partition { if s.T < minValidTime {
continue continue
} }
ms := h.series.getByID(s.Ref) ms := refSeries[s.Ref]
if ms == nil { if ms == nil {
unknownRefs++ ms = h.series.getByID(s.Ref)
continue if ms == nil {
unknownRefs++
continue
}
refSeries[s.Ref] = ms
} }
_, chunkCreated := ms.append(s.T, s.V) _, chunkCreated := ms.append(s.T, s.V)
if chunkCreated { if chunkCreated {
@ -310,25 +316,22 @@ func (h *Head) loadWAL(r *wal.Reader) error {
// They are connected through a ring of channels which ensures that all sample batches // They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order. // read from the WAL are processed in order.
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
n = runtime.GOMAXPROCS(0) n = runtime.GOMAXPROCS(0)
firstInput = make(chan []RefSample, 300) inputs = make([]chan []RefSample, n)
input = firstInput outputs = make([]chan []RefSample, n)
) )
wg.Add(n) wg.Add(n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
output := make(chan []RefSample, 300) outputs[i] = make(chan []RefSample, 300)
inputs[i] = make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) { go func(input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) unknown := h.processWALSamples(minValidTime, input, output)
atomic.AddUint64(&unknownRefs, unknown) atomic.AddUint64(&unknownRefs, unknown)
wg.Done() wg.Done()
}(i, input, output) }(inputs[i], outputs[i])
// The output feeds the next worker goroutine. For the last worker,
// it feeds the initial input again to reuse the RefSample slices.
input = output
} }
var ( var (
@ -336,6 +339,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
series []RefSeries series []RefSeries
samples []RefSample samples []RefSample
tstones []Stone tstones []Stone
err error
) )
for r.Next() { for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0] series, samples, tstones = series[:0], samples[:0], tstones[:0]
@ -343,7 +347,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
switch dec.Type(rec) { switch dec.Type(rec) {
case RecordSeries: case RecordSeries:
series, err := dec.Series(rec, series) series, err = dec.Series(rec, series)
if err != nil { if err != nil {
return errors.Wrap(err, "decode series") return errors.Wrap(err, "decode series")
} }
@ -355,7 +359,8 @@ func (h *Head) loadWAL(r *wal.Reader) error {
} }
} }
case RecordSamples: case RecordSamples:
samples, err := dec.Samples(rec, samples) samples, err = dec.Samples(rec, samples)
s := samples
if err != nil { if err != nil {
return errors.Wrap(err, "decode samples") return errors.Wrap(err, "decode samples")
} }
@ -364,20 +369,31 @@ func (h *Head) loadWAL(r *wal.Reader) error {
// cause thousands of very large in flight buffers occupying large amounts // cause thousands of very large in flight buffers occupying large amounts
// of unused memory. // of unused memory.
for len(samples) > 0 { for len(samples) > 0 {
n := 5000 m := 5000
if len(samples) < n { if len(samples) < m {
n = len(samples) m = len(samples)
} }
var buf []RefSample shards := make([][]RefSample, n)
select { for i := 0; i < n; i++ {
case buf = <-input: var buf []RefSample
default: select {
case buf = <-outputs[i]:
default:
}
shards[i] = buf[:0]
} }
firstInput <- append(buf[:0], samples[:n]...) for _, sam := range samples[:m] {
samples = samples[n:] mod := sam.Ref % uint64(n)
shards[mod] = append(shards[mod], sam)
}
for i := 0; i < n; i++ {
inputs[i] <- shards[i]
}
samples = samples[m:]
} }
samples = s // Keep whole slice for reuse.
case RecordTombstones: case RecordTombstones:
tstones, err := dec.Tombstones(rec, tstones) tstones, err = dec.Tombstones(rec, tstones)
if err != nil { if err != nil {
return errors.Wrap(err, "decode tombstones") return errors.Wrap(err, "decode tombstones")
} }
@ -397,9 +413,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
return errors.Wrap(r.Err(), "read records") return errors.Wrap(r.Err(), "read records")
} }
// Signal termination to first worker and wait for last one to close its output channel. // Signal termination to each worker and wait for it to close its output channel.
close(firstInput) for i := 0; i < n; i++ {
for range input { close(inputs[i])
for range outputs[i] {
}
} }
wg.Wait() wg.Wait()
@ -1341,6 +1359,7 @@ type memSeries struct {
ref uint64 ref uint64
lset labels.Labels lset labels.Labels
chunks []*memChunk chunks []*memChunk
headChunk *memChunk
chunkRange int64 chunkRange int64
firstChunkID int firstChunkID int
@ -1374,6 +1393,7 @@ func (s *memSeries) cut(mint int64) *memChunk {
maxTime: math.MinInt64, maxTime: math.MinInt64,
} }
s.chunks = append(s.chunks, c) s.chunks = append(s.chunks, c)
s.headChunk = c
// Set upper bound on when the next chunk must be started. An earlier timestamp // Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point. // may be chosen dynamically at a later point.
@ -1442,6 +1462,11 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
} }
s.chunks = append(s.chunks[:0], s.chunks[k:]...) s.chunks = append(s.chunks[:0], s.chunks[k:]...)
s.firstChunkID += k s.firstChunkID += k
if len(s.chunks) == 0 {
s.headChunk = nil
} else {
s.headChunk = s.chunks[len(s.chunks)-1]
}
return k return k
} }
@ -1524,10 +1549,7 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator {
} }
func (s *memSeries) head() *memChunk { func (s *memSeries) head() *memChunk {
if len(s.chunks) == 0 { return s.headChunk
return nil
}
return s.chunks[len(s.chunks)-1]
} }
type memChunk struct { type memChunk struct {