prometheus/tsdb/async_block_writer.go
Marco Pracucci 583e746a82
Fix error reported by asyncBlockWriter.addSeries()
Signed-off-by: Marco Pracucci <marco@pracucci.com>
2022-02-10 15:07:13 +01:00

177 lines
4.4 KiB
Go

package tsdb
import (
"context"
"github.com/pkg/errors"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
)
var (
errAsyncBlockWriterNotRunning = errors.New("asyncBlockWriter doesn't run anymore")
)
// asyncBlockWriter runs a background goroutine that writes series and chunks to the block asynchronously.
type asyncBlockWriter struct {
chunkPool chunkenc.Pool // Where to return chunks after writing.
chunkw ChunkWriter
indexw IndexWriter
closeSemaphore *semaphore.Weighted
seriesChan chan seriesToWrite
finishedCh chan asyncBlockWriterResult
closed bool
result asyncBlockWriterResult
}
type asyncBlockWriterResult struct {
stats BlockStats
err error
}
type seriesToWrite struct {
lbls labels.Labels
chks []chunks.Meta
}
func newAsyncBlockWriter(chunkPool chunkenc.Pool, chunkw ChunkWriter, indexw IndexWriter, closeSema *semaphore.Weighted) *asyncBlockWriter {
bw := &asyncBlockWriter{
chunkPool: chunkPool,
chunkw: chunkw,
indexw: indexw,
seriesChan: make(chan seriesToWrite, 64),
finishedCh: make(chan asyncBlockWriterResult, 1),
closeSemaphore: closeSema,
}
go bw.loop()
return bw
}
// loop doing the writes. Return value is only used by defer statement, and is sent to the channel,
// before closing it.
func (bw *asyncBlockWriter) loop() (res asyncBlockWriterResult) {
defer func() {
bw.finishedCh <- res
close(bw.finishedCh)
}()
stats := BlockStats{}
ref := storage.SeriesRef(0)
for sw := range bw.seriesChan {
if err := bw.chunkw.WriteChunks(sw.chks...); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "write chunks")}
}
if err := bw.indexw.AddSeries(ref, sw.lbls, sw.chks...); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "add series")}
}
stats.NumChunks += uint64(len(sw.chks))
stats.NumSeries++
for _, chk := range sw.chks {
stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
for _, chk := range sw.chks {
if err := bw.chunkPool.Put(chk.Chunk); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "put chunk")}
}
}
ref++
}
err := bw.closeSemaphore.Acquire(context.Background(), 1)
if err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "failed to acquire semaphore before closing writers")}
}
defer bw.closeSemaphore.Release(1)
// If everything went fine with writing so far, close writers.
if err := bw.chunkw.Close(); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "closing chunk writer")}
}
if err := bw.indexw.Close(); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "closing index writer")}
}
return asyncBlockWriterResult{stats: stats}
}
func (bw *asyncBlockWriter) addSeries(lbls labels.Labels, chks []chunks.Meta) error {
select {
case bw.seriesChan <- seriesToWrite{lbls: lbls, chks: chks}:
return nil
case result, ok := <-bw.finishedCh:
if ok {
bw.result = result
}
// If the writer isn't running anymore because of an error occurred in loop()
// then we should return that error too, otherwise it may be never reported
// and we'll never know the actual root cause.
if bw.result.err != nil {
return errors.Wrap(bw.result.err, errAsyncBlockWriterNotRunning.Error())
}
return errAsyncBlockWriterNotRunning
}
}
func (bw *asyncBlockWriter) closeAsync() {
if !bw.closed {
bw.closed = true
close(bw.seriesChan)
}
}
func (bw *asyncBlockWriter) waitFinished() (BlockStats, error) {
// Wait for flusher to finish.
result, ok := <-bw.finishedCh
if ok {
bw.result = result
}
return bw.result.stats, bw.result.err
}
type preventDoubleCloseIndexWriter struct {
IndexWriter
closed atomic.Bool
}
func newPreventDoubleCloseIndexWriter(iw IndexWriter) *preventDoubleCloseIndexWriter {
return &preventDoubleCloseIndexWriter{IndexWriter: iw}
}
func (p *preventDoubleCloseIndexWriter) Close() error {
if p.closed.CAS(false, true) {
return p.IndexWriter.Close()
}
return nil
}
type preventDoubleCloseChunkWriter struct {
ChunkWriter
closed atomic.Bool
}
func newPreventDoubleCloseChunkWriter(cw ChunkWriter) *preventDoubleCloseChunkWriter {
return &preventDoubleCloseChunkWriter{ChunkWriter: cw}
}
func (p *preventDoubleCloseChunkWriter) Close() error {
if p.closed.CAS(false, true) {
return p.ChunkWriter.Close()
}
return nil
}