mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Add tests for GC and chunk truncation
This commit is contained in:
parent
4f037da462
commit
893b6ec506
5
db.go
5
db.go
|
@ -492,11 +492,8 @@ func (db *DB) reload() (err error) {
|
|||
if len(blocks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
maxt := blocks[len(db.blocks)-1].Meta().MaxTime
|
||||
db.head.Truncate(maxt)
|
||||
|
||||
return nil
|
||||
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
|
||||
}
|
||||
|
||||
func validateBlockSequence(bs []DiskBlock) error {
|
||||
|
|
39
head.go
39
head.go
|
@ -235,9 +235,12 @@ func (h *Head) String() string {
|
|||
}
|
||||
|
||||
// Truncate removes all data before mint from the head block and truncates its WAL.
|
||||
func (h *Head) Truncate(mint int64) {
|
||||
func (h *Head) Truncate(mint int64) error {
|
||||
if mint%h.chunkRange != 0 {
|
||||
return errors.Errorf("truncating at %d not aligned", mint)
|
||||
}
|
||||
if h.minTime >= mint {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
atomic.StoreInt64(&h.minTime, mint)
|
||||
|
||||
|
@ -255,6 +258,8 @@ func (h *Head) Truncate(mint int64) {
|
|||
h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
|
||||
}
|
||||
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// initTime initializes a head with the first timestamp. This only needs to be called
|
||||
|
@ -764,10 +769,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
|||
s.mtx.RUnlock()
|
||||
|
||||
// Do not expose chunks that are outside of the specified range.
|
||||
if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
|
||||
if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
return &safeChunk{
|
||||
Chunk: c.chunk,
|
||||
s: s,
|
||||
|
@ -1023,10 +1027,10 @@ func newMemSeries(lset labels.Labels, id uint32, chunkRange int64) *memSeries {
|
|||
|
||||
// appendable checks whether the given sample is valid for appending to the series.
|
||||
func (s *memSeries) appendable(t int64, v float64) error {
|
||||
if len(s.chunks) == 0 {
|
||||
c := s.head()
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
c := s.head()
|
||||
|
||||
if t > c.maxTime {
|
||||
return nil
|
||||
|
@ -1043,7 +1047,11 @@ func (s *memSeries) appendable(t int64, v float64) error {
|
|||
}
|
||||
|
||||
func (s *memSeries) chunk(id int) *memChunk {
|
||||
return s.chunks[id-s.firstChunkID]
|
||||
ix := id - s.firstChunkID
|
||||
if ix < 0 || ix >= len(s.chunks) {
|
||||
return nil
|
||||
}
|
||||
return s.chunks[ix]
|
||||
}
|
||||
|
||||
func (s *memSeries) chunkID(pos int) int {
|
||||
|
@ -1072,27 +1080,25 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
|||
|
||||
s.mtx.Lock()
|
||||
|
||||
var c *memChunk
|
||||
c := s.head()
|
||||
|
||||
if len(s.chunks) == 0 {
|
||||
if c == nil {
|
||||
c = s.cut(t)
|
||||
chunkCreated = true
|
||||
}
|
||||
c = s.head()
|
||||
if c.maxTime >= t {
|
||||
s.mtx.Unlock()
|
||||
return false, chunkCreated
|
||||
}
|
||||
if c.samples > samplesPerChunk/4 && t >= s.nextAt {
|
||||
if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt {
|
||||
c = s.cut(t)
|
||||
chunkCreated = true
|
||||
}
|
||||
s.app.Append(t, v)
|
||||
|
||||
c.maxTime = t
|
||||
c.samples++
|
||||
|
||||
if c.samples == samplesPerChunk/4 {
|
||||
if c.chunk.NumSamples() == samplesPerChunk/4 {
|
||||
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
|
||||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
|
||||
}
|
||||
|
@ -1123,7 +1129,6 @@ func computeChunkEndTime(start, cur, max int64) int64 {
|
|||
func (s *memSeries) iterator(id int) chunks.Iterator {
|
||||
c := s.chunk(id)
|
||||
|
||||
// TODO(fabxc): !!! Test this and everything around chunk ID != list pos.
|
||||
if id-s.firstChunkID < len(s.chunks)-1 {
|
||||
return c.chunk.Iterator()
|
||||
}
|
||||
|
@ -1132,20 +1137,22 @@ func (s *memSeries) iterator(id int) chunks.Iterator {
|
|||
it := &memSafeIterator{
|
||||
Iterator: c.chunk.Iterator(),
|
||||
i: -1,
|
||||
total: c.samples,
|
||||
total: c.chunk.NumSamples(),
|
||||
buf: s.sampleBuf,
|
||||
}
|
||||
return it
|
||||
}
|
||||
|
||||
func (s *memSeries) head() *memChunk {
|
||||
if len(s.chunks) == 0 {
|
||||
return nil
|
||||
}
|
||||
return s.chunks[len(s.chunks)-1]
|
||||
}
|
||||
|
||||
type memChunk struct {
|
||||
chunk chunks.Chunk
|
||||
minTime, maxTime int64
|
||||
samples int
|
||||
}
|
||||
|
||||
type memSafeIterator struct {
|
||||
|
|
110
head_test.go
110
head_test.go
|
@ -82,6 +82,116 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) {
|
|||
return mets, nil
|
||||
}
|
||||
|
||||
func TestHead_Truncate(t *testing.T) {
|
||||
h, err := NewHead(nil, nil, nil, 1000)
|
||||
require.NoError(t, err)
|
||||
|
||||
h.initTime(0)
|
||||
|
||||
s1 := h.create(1, labels.FromStrings("a", "1", "b", "1"))
|
||||
s2 := h.create(2, labels.FromStrings("a", "2", "b", "1"))
|
||||
s3 := h.create(3, labels.FromStrings("a", "1", "b", "2"))
|
||||
s4 := h.create(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
|
||||
|
||||
s1.chunks = []*memChunk{
|
||||
{minTime: 0, maxTime: 999},
|
||||
{minTime: 1000, maxTime: 1999},
|
||||
{minTime: 2000, maxTime: 2999},
|
||||
}
|
||||
s2.chunks = []*memChunk{
|
||||
{minTime: 1000, maxTime: 1999},
|
||||
{minTime: 2000, maxTime: 2999},
|
||||
{minTime: 3000, maxTime: 3999},
|
||||
}
|
||||
s3.chunks = []*memChunk{
|
||||
{minTime: 0, maxTime: 999},
|
||||
{minTime: 1000, maxTime: 1999},
|
||||
}
|
||||
s4.chunks = []*memChunk{}
|
||||
|
||||
// Truncation must be aligned.
|
||||
require.Error(t, h.Truncate(1))
|
||||
|
||||
h.Truncate(2000)
|
||||
|
||||
require.Equal(t, []*memChunk{
|
||||
{minTime: 2000, maxTime: 2999},
|
||||
}, h.series[s1.ref].chunks)
|
||||
|
||||
require.Equal(t, []*memChunk{
|
||||
{minTime: 2000, maxTime: 2999},
|
||||
{minTime: 3000, maxTime: 3999},
|
||||
}, h.series[s2.ref].chunks)
|
||||
|
||||
require.Nil(t, h.series[s3.ref])
|
||||
require.Nil(t, h.series[s4.ref])
|
||||
|
||||
postingsA1, _ := expandPostings(h.postings.get(term{"a", "1"}))
|
||||
postingsA2, _ := expandPostings(h.postings.get(term{"a", "2"}))
|
||||
postingsB1, _ := expandPostings(h.postings.get(term{"b", "1"}))
|
||||
postingsB2, _ := expandPostings(h.postings.get(term{"b", "2"}))
|
||||
postingsC1, _ := expandPostings(h.postings.get(term{"c", "1"}))
|
||||
postingsAll, _ := expandPostings(h.postings.get(term{"", ""}))
|
||||
|
||||
require.Equal(t, []uint32{s1.ref}, postingsA1)
|
||||
require.Equal(t, []uint32{s2.ref}, postingsA2)
|
||||
require.Equal(t, []uint32{s1.ref, s2.ref}, postingsB1)
|
||||
require.Equal(t, []uint32{s1.ref, s2.ref}, postingsAll)
|
||||
require.Nil(t, postingsB2)
|
||||
require.Nil(t, postingsC1)
|
||||
|
||||
require.Equal(t, map[string]struct{}{
|
||||
"": struct{}{}, // from 'all' postings list
|
||||
"a": struct{}{},
|
||||
"b": struct{}{},
|
||||
"1": struct{}{},
|
||||
"2": struct{}{},
|
||||
}, h.symbols)
|
||||
|
||||
require.Equal(t, map[string]stringset{
|
||||
"a": stringset{"1": struct{}{}, "2": struct{}{}},
|
||||
"b": stringset{"1": struct{}{}},
|
||||
"": stringset{"": struct{}{}},
|
||||
}, h.values)
|
||||
}
|
||||
|
||||
// Validate various behaviors brought on by firstChunkID accounting for
|
||||
// garbage collected chunks.
|
||||
func TestMemSeries_truncateChunks(t *testing.T) {
|
||||
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000)
|
||||
|
||||
for i := 0; i < 4000; i += 5 {
|
||||
ok, _ := s.append(int64(i), float64(i))
|
||||
require.True(t, ok, "sample appen failed")
|
||||
}
|
||||
|
||||
// Check that truncate removes half of the chunks and afterwards
|
||||
// that the ID of the last chunk still gives us the same chunk afterwards.
|
||||
countBefore := len(s.chunks)
|
||||
lastID := s.chunkID(countBefore - 1)
|
||||
lastChunk := s.chunk(lastID)
|
||||
|
||||
require.NotNil(t, s.chunk(0))
|
||||
require.NotNil(t, lastChunk)
|
||||
|
||||
s.truncateChunksBefore(2000)
|
||||
|
||||
require.Equal(t, int64(2000), s.chunks[0].minTime, "unexpected start time of first chunks")
|
||||
require.Nil(t, s.chunk(0), "first chunk not gone")
|
||||
require.Equal(t, countBefore/2, len(s.chunks), "chunks not truncated correctly")
|
||||
require.Equal(t, lastChunk, s.chunk(lastID), "last chunk does not match")
|
||||
|
||||
// Validate that the series' sample buffer is applied correctly to the last chunk
|
||||
// after truncation.
|
||||
it1 := s.iterator(s.chunkID(len(s.chunks) - 1))
|
||||
_, ok := it1.(*memSafeIterator)
|
||||
require.True(t, ok, "last chunk not wrapped with sample buffer")
|
||||
|
||||
it2 := s.iterator(s.chunkID(len(s.chunks) - 2))
|
||||
_, ok = it2.(*memSafeIterator)
|
||||
require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer")
|
||||
}
|
||||
|
||||
// func TestHBDeleteSimple(t *testing.T) {
|
||||
// numSamples := int64(10)
|
||||
|
||||
|
|
Loading…
Reference in a new issue