mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Add contexts to index writer to fix test races.
With recent speed improvements to populate block, the cancellation test now fails regularly on CI. Use contexts to get the index writer to shut down much faster, and that allows us to make the cancellation test faster too. Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
cf76daed2f
commit
0482d93fe6
|
@ -569,7 +569,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
|||
}
|
||||
}
|
||||
|
||||
indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename))
|
||||
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open index writer")
|
||||
}
|
||||
|
|
|
@ -956,8 +956,8 @@ func TestCancelCompactions(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Create some blocks to fall within the compaction range.
|
||||
createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000))
|
||||
createBlock(t, tmpdir, genSeries(10, 10000, 1000, 2000))
|
||||
createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000))
|
||||
createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000))
|
||||
createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one.
|
||||
|
||||
// Copy the db so we have an exact copy to compare compaction times.
|
||||
|
|
|
@ -15,6 +15,7 @@ package index
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"hash/crc32"
|
||||
|
@ -111,6 +112,7 @@ func newCRC32() hash.Hash32 {
|
|||
// Writer implements the IndexWriter interface for the standard
|
||||
// serialization format.
|
||||
type Writer struct {
|
||||
ctx context.Context
|
||||
f *os.File
|
||||
fbuf *bufio.Writer
|
||||
pos uint64
|
||||
|
@ -176,7 +178,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
|
|||
}
|
||||
|
||||
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
|
||||
func NewWriter(fn string) (*Writer, error) {
|
||||
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
||||
dir := filepath.Dir(fn)
|
||||
|
||||
df, err := fileutil.OpenDir(dir)
|
||||
|
@ -198,6 +200,7 @@ func NewWriter(fn string) (*Writer, error) {
|
|||
}
|
||||
|
||||
iw := &Writer{
|
||||
ctx: ctx,
|
||||
f: f,
|
||||
fbuf: bufio.NewWriterSize(f, 1<<22),
|
||||
pos: 0,
|
||||
|
@ -256,6 +259,12 @@ func (w *Writer) addPadding(size int) error {
|
|||
// ensureStage handles transitions between write stages and ensures that IndexWriter
|
||||
// methods are called in an order valid for the implementation.
|
||||
func (w *Writer) ensureStage(s indexWriterStage) error {
|
||||
select {
|
||||
case <-w.ctx.Done():
|
||||
return w.ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if w.stage == s {
|
||||
return nil
|
||||
}
|
||||
|
@ -699,6 +708,11 @@ func (w *Writer) writePostings() error {
|
|||
}
|
||||
}
|
||||
}
|
||||
select {
|
||||
case <-w.ctx.Done():
|
||||
return w.ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package index
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
|
@ -155,7 +156,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
|
|||
fn := filepath.Join(dir, indexFilename)
|
||||
|
||||
// An empty index must still result in a readable file.
|
||||
iw, err := NewWriter(fn)
|
||||
iw, err := NewWriter(context.Background(), fn)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, iw.Close())
|
||||
|
||||
|
@ -183,7 +184,7 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
|
||||
fn := filepath.Join(dir, indexFilename)
|
||||
|
||||
iw, err := NewWriter(fn)
|
||||
iw, err := NewWriter(context.Background(), fn)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
series := []labels.Labels{
|
||||
|
@ -247,7 +248,7 @@ func TestPostingsMany(t *testing.T) {
|
|||
|
||||
fn := filepath.Join(dir, indexFilename)
|
||||
|
||||
iw, err := NewWriter(fn)
|
||||
iw, err := NewWriter(context.Background(), fn)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
// Create a label in the index which has 999 values.
|
||||
|
@ -368,7 +369,7 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
iw, err := NewWriter(filepath.Join(dir, indexFilename))
|
||||
iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename))
|
||||
testutil.Ok(t, err)
|
||||
|
||||
testutil.Ok(t, iw.AddSymbols(symbols))
|
||||
|
|
Loading…
Reference in a new issue