Fix Example() function in TSDB (#10153)

* Fix Example() function in TSDB

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix tests

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2022-01-11 17:24:03 +05:30 committed by GitHub
parent 714bc3ff29
commit 129ed4ec8b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 60 deletions

View file

@ -163,3 +163,22 @@ func (c *chunkWriteQueue) stop() {
c.workerWg.Wait() c.workerWg.Wait()
} }
func (c *chunkWriteQueue) queueIsEmpty() bool {
return c.queueSize() == 0
}
func (c *chunkWriteQueue) queueIsFull() bool {
// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
// because one job is currently being processed and blocked in the writer.
return c.queueSize() == cap(c.jobs)+1
}
func (c *chunkWriteQueue) queueSize() int {
c.chunkRefMapMtx.Lock()
defer c.chunkRefMapMtx.Unlock()
// Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has
// been fully processed, it remains in the chunkRefMap until the processing is complete.
return len(c.chunkRefMap)
}

View file

@ -146,7 +146,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
} }
// The queue should be full. // The queue should be full.
require.True(t, queueIsFull(q)) require.True(t, q.queueIsFull())
// Adding another job should block as long as no job from the queue gets consumed. // Adding another job should block as long as no job from the queue gets consumed.
addedJob := atomic.NewBool(false) addedJob := atomic.NewBool(false)
@ -166,19 +166,19 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10) require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10)
// The queue should be full again. // The queue should be full again.
require.True(t, queueIsFull(q)) require.True(t, q.queueIsFull())
// Consume <sizeLimit>+1 jobs from the queue. // Consume <sizeLimit>+1 jobs from the queue.
// To drain the queue we need to consume <sizeLimit>+1 jobs because 1 job // To drain the queue we need to consume <sizeLimit>+1 jobs because 1 job
// is already in the state of being processed. // is already in the state of being processed.
for job := 0; job < sizeLimit+1; job++ { for job := 0; job < sizeLimit+1; job++ {
require.False(t, queueIsEmpty(q)) require.False(t, q.queueIsEmpty())
unblockChunkWriter() unblockChunkWriter()
} }
// Wait until all jobs have been processed. // Wait until all jobs have been processed.
callbackWg.Wait() callbackWg.Wait()
require.True(t, queueIsEmpty(q)) require.True(t, q.queueIsEmpty())
} }
func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
@ -266,22 +266,3 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) {
}) })
} }
} }
func queueIsEmpty(q *chunkWriteQueue) bool {
return queueSize(q) == 0
}
func queueIsFull(q *chunkWriteQueue) bool {
// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
// because one job is currently being processed and blocked in the writer.
return queueSize(q) == cap(q.jobs)+1
}
func queueSize(q *chunkWriteQueue) int {
q.chunkRefMapMtx.Lock()
defer q.chunkRefMapMtx.Unlock()
// Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has
// been fully processed, it remains in the chunkRefMap until the processing is complete.
return len(q.chunkRefMap)
}

View file

@ -473,6 +473,10 @@ func (cdm *ChunkDiskMapper) CutNewFile() {
cdm.evtlPos.cutFileOnNextChunk() cdm.evtlPos.cutFileOnNextChunk()
} }
func (cdm *ChunkDiskMapper) IsQueueEmpty() bool {
return cdm.writeQueue.queueIsEmpty()
}
// cutAndExpectRef creates a new m-mapped file. // cutAndExpectRef creates a new m-mapped file.
// The write lock should be held before calling this. // The write lock should be held before calling this.
// It ensures that the position in the new file matches the given chunk reference, if not then it errors. // It ensures that the position in the new file matches the given chunk reference, if not then it errors.

View file

@ -1461,6 +1461,10 @@ func TestSizeRetention(t *testing.T) {
} }
require.NoError(t, headApp.Commit()) require.NoError(t, headApp.Commit())
require.Eventually(t, func() bool {
return db.Head().chunkDiskMapper.IsQueueEmpty()
}, 2*time.Second, 100*time.Millisecond)
// Test that registered size matches the actual disk size. // Test that registered size matches the actual disk size.
require.NoError(t, db.reloadBlocks()) // Reload the db to register the new db size. require.NoError(t, db.reloadBlocks()) // Reload the db to register the new db size.
require.Equal(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. require.Equal(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.

View file

@ -16,97 +16,89 @@ package tsdb
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"math" "math"
"testing" "os"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
) )
func TestExample(t *testing.T) { func Example() {
// Create a random dir to work in. Open() doesn't require a pre-existing dir, but // Create a random dir to work in. Open() doesn't require a pre-existing dir, but
// we want to make sure not to make a mess where we shouldn't. // we want to make sure not to make a mess where we shouldn't.
dir := t.TempDir() dir, err := ioutil.TempDir("", "tsdb-test")
noErr(err)
// Open a TSDB for reading and/or writing. // Open a TSDB for reading and/or writing.
db, err := Open(dir, nil, nil, DefaultOptions(), nil) db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err) noErr(err)
// Open an appender for writing. // Open an appender for writing.
app := db.Appender(context.Background()) app := db.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar") series := labels.FromStrings("foo", "bar")
var appendedSamples []sample
// Ref is 0 for the first append since we don't know the reference for the series. // Ref is 0 for the first append since we don't know the reference for the series.
ts, v := time.Now().Unix(), 123.0 ref, err := app.Append(0, series, time.Now().Unix(), 123)
ref, err := app.Append(0, lbls, ts, v) noErr(err)
require.NoError(t, err)
appendedSamples = append(appendedSamples, sample{ts, v})
// Another append for a second later. // Another append for a second later.
// Re-using the ref from above since it's the same series, makes append faster. // Re-using the ref from above since it's the same series, makes append faster.
time.Sleep(time.Second) time.Sleep(time.Second)
ts, v = time.Now().Unix(), 124 _, err = app.Append(ref, series, time.Now().Unix(), 124)
_, err = app.Append(ref, lbls, ts, v) noErr(err)
require.NoError(t, err)
appendedSamples = append(appendedSamples, sample{ts, v})
// Commit to storage. // Commit to storage.
err = app.Commit() err = app.Commit()
require.NoError(t, err) noErr(err)
// In case you want to do more appends after app.Commit(), // In case you want to do more appends after app.Commit(),
// you need a new appender. // you need a new appender.
// app = db.Appender(context.Background()) app = db.Appender(context.Background())
//
// ... adding more samples. // ... adding more samples.
//
// Commit to storage.
// err = app.Commit()
// require.NoError(t, err)
// Open a querier for reading. // Open a querier for reading.
querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err) noErr(err)
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var queriedSamples []sample
for ss.Next() { for ss.Next() {
series := ss.At() series := ss.At()
fmt.Println("series:", series.Labels().String()) fmt.Println("series:", series.Labels().String())
it := series.Iterator() it := series.Iterator()
for it.Next() { for it.Next() {
ts, v := it.At() _, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below)
fmt.Println("sample", ts, v) fmt.Println("sample", v)
queriedSamples = append(queriedSamples, sample{ts, v})
} }
require.NoError(t, it.Err())
fmt.Println("it.Err():", it.Err()) fmt.Println("it.Err():", it.Err())
} }
require.NoError(t, ss.Err())
fmt.Println("ss.Err():", ss.Err()) fmt.Println("ss.Err():", ss.Err())
ws := ss.Warnings() ws := ss.Warnings()
if len(ws) > 0 { if len(ws) > 0 {
fmt.Println("warnings:", ws) fmt.Println("warnings:", ws)
} }
err = querier.Close() err = querier.Close()
require.NoError(t, err) noErr(err)
// Clean up any last resources when done. // Clean up any last resources when done.
err = db.Close() err = db.Close()
require.NoError(t, err) noErr(err)
err = os.RemoveAll(dir)
require.Equal(t, appendedSamples, queriedSamples) noErr(err)
// Output: // Output:
// series: {foo="bar"} // series: {foo="bar"}
// sample <ts1> 123 // sample 123
// sample <ts2> 124 // sample 124
// it.Err(): <nil> // it.Err(): <nil>
// ss.Err(): <nil> // ss.Err(): <nil>
} }
func noErr(err error) {
if err != nil {
panic(err)
}
}