diff --git a/db.go b/db.go index 4a629c1f3e..d7c5649fb9 100644 --- a/db.go +++ b/db.go @@ -492,11 +492,8 @@ func (db *DB) reload() (err error) { if len(blocks) == 0 { return nil } - maxt := blocks[len(db.blocks)-1].Meta().MaxTime - db.head.Truncate(maxt) - - return nil + return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } func validateBlockSequence(bs []DiskBlock) error { diff --git a/head.go b/head.go index d49cea535c..3094891193 100644 --- a/head.go +++ b/head.go @@ -235,9 +235,12 @@ func (h *Head) String() string { } // Truncate removes all data before mint from the head block and truncates its WAL. -func (h *Head) Truncate(mint int64) { +func (h *Head) Truncate(mint int64) error { + if mint%h.chunkRange != 0 { + return errors.Errorf("truncating at %d not aligned", mint) + } if h.minTime >= mint { - return + return nil } atomic.StoreInt64(&h.minTime, mint) @@ -255,6 +258,8 @@ func (h *Head) Truncate(mint int64) { h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) } h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) + + return nil } // initTime initializes a head with the first timestamp. This only needs to be called @@ -764,10 +769,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { s.mtx.RUnlock() // Do not expose chunks that are outside of the specified range. - if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { + if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { return nil, ErrNotFound } - return &safeChunk{ Chunk: c.chunk, s: s, @@ -1023,10 +1027,10 @@ func newMemSeries(lset labels.Labels, id uint32, chunkRange int64) *memSeries { // appendable checks whether the given sample is valid for appending to the series. func (s *memSeries) appendable(t int64, v float64) error { - if len(s.chunks) == 0 { + c := s.head() + if c == nil { return nil } - c := s.head() if t > c.maxTime { return nil @@ -1043,7 +1047,11 @@ func (s *memSeries) appendable(t int64, v float64) error { } func (s *memSeries) chunk(id int) *memChunk { - return s.chunks[id-s.firstChunkID] + ix := id - s.firstChunkID + if ix < 0 || ix >= len(s.chunks) { + return nil + } + return s.chunks[ix] } func (s *memSeries) chunkID(pos int) int { @@ -1072,27 +1080,25 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.mtx.Lock() - var c *memChunk + c := s.head() - if len(s.chunks) == 0 { + if c == nil { c = s.cut(t) chunkCreated = true } - c = s.head() if c.maxTime >= t { s.mtx.Unlock() return false, chunkCreated } - if c.samples > samplesPerChunk/4 && t >= s.nextAt { + if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { c = s.cut(t) chunkCreated = true } s.app.Append(t, v) c.maxTime = t - c.samples++ - if c.samples == samplesPerChunk/4 { + if c.chunk.NumSamples() == samplesPerChunk/4 { _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) } @@ -1123,7 +1129,6 @@ func computeChunkEndTime(start, cur, max int64) int64 { func (s *memSeries) iterator(id int) chunks.Iterator { c := s.chunk(id) - // TODO(fabxc): !!! Test this and everything around chunk ID != list pos. if id-s.firstChunkID < len(s.chunks)-1 { return c.chunk.Iterator() } @@ -1132,20 +1137,22 @@ func (s *memSeries) iterator(id int) chunks.Iterator { it := &memSafeIterator{ Iterator: c.chunk.Iterator(), i: -1, - total: c.samples, + total: c.chunk.NumSamples(), buf: s.sampleBuf, } return it } func (s *memSeries) head() *memChunk { + if len(s.chunks) == 0 { + return nil + } return s.chunks[len(s.chunks)-1] } type memChunk struct { chunk chunks.Chunk minTime, maxTime int64 - samples int } type memSafeIterator struct { diff --git a/head_test.go b/head_test.go index 5b9574ebd7..225a840ccb 100644 --- a/head_test.go +++ b/head_test.go @@ -82,6 +82,116 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { return mets, nil } +func TestHead_Truncate(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + require.NoError(t, err) + + h.initTime(0) + + s1 := h.create(1, labels.FromStrings("a", "1", "b", "1")) + s2 := h.create(2, labels.FromStrings("a", "2", "b", "1")) + s3 := h.create(3, labels.FromStrings("a", "1", "b", "2")) + s4 := h.create(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) + + s1.chunks = []*memChunk{ + {minTime: 0, maxTime: 999}, + {minTime: 1000, maxTime: 1999}, + {minTime: 2000, maxTime: 2999}, + } + s2.chunks = []*memChunk{ + {minTime: 1000, maxTime: 1999}, + {minTime: 2000, maxTime: 2999}, + {minTime: 3000, maxTime: 3999}, + } + s3.chunks = []*memChunk{ + {minTime: 0, maxTime: 999}, + {minTime: 1000, maxTime: 1999}, + } + s4.chunks = []*memChunk{} + + // Truncation must be aligned. + require.Error(t, h.Truncate(1)) + + h.Truncate(2000) + + require.Equal(t, []*memChunk{ + {minTime: 2000, maxTime: 2999}, + }, h.series[s1.ref].chunks) + + require.Equal(t, []*memChunk{ + {minTime: 2000, maxTime: 2999}, + {minTime: 3000, maxTime: 3999}, + }, h.series[s2.ref].chunks) + + require.Nil(t, h.series[s3.ref]) + require.Nil(t, h.series[s4.ref]) + + postingsA1, _ := expandPostings(h.postings.get(term{"a", "1"})) + postingsA2, _ := expandPostings(h.postings.get(term{"a", "2"})) + postingsB1, _ := expandPostings(h.postings.get(term{"b", "1"})) + postingsB2, _ := expandPostings(h.postings.get(term{"b", "2"})) + postingsC1, _ := expandPostings(h.postings.get(term{"c", "1"})) + postingsAll, _ := expandPostings(h.postings.get(term{"", ""})) + + require.Equal(t, []uint32{s1.ref}, postingsA1) + require.Equal(t, []uint32{s2.ref}, postingsA2) + require.Equal(t, []uint32{s1.ref, s2.ref}, postingsB1) + require.Equal(t, []uint32{s1.ref, s2.ref}, postingsAll) + require.Nil(t, postingsB2) + require.Nil(t, postingsC1) + + require.Equal(t, map[string]struct{}{ + "": struct{}{}, // from 'all' postings list + "a": struct{}{}, + "b": struct{}{}, + "1": struct{}{}, + "2": struct{}{}, + }, h.symbols) + + require.Equal(t, map[string]stringset{ + "a": stringset{"1": struct{}{}, "2": struct{}{}}, + "b": stringset{"1": struct{}{}}, + "": stringset{"": struct{}{}}, + }, h.values) +} + +// Validate various behaviors brought on by firstChunkID accounting for +// garbage collected chunks. +func TestMemSeries_truncateChunks(t *testing.T) { + s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000) + + for i := 0; i < 4000; i += 5 { + ok, _ := s.append(int64(i), float64(i)) + require.True(t, ok, "sample appen failed") + } + + // Check that truncate removes half of the chunks and afterwards + // that the ID of the last chunk still gives us the same chunk afterwards. + countBefore := len(s.chunks) + lastID := s.chunkID(countBefore - 1) + lastChunk := s.chunk(lastID) + + require.NotNil(t, s.chunk(0)) + require.NotNil(t, lastChunk) + + s.truncateChunksBefore(2000) + + require.Equal(t, int64(2000), s.chunks[0].minTime, "unexpected start time of first chunks") + require.Nil(t, s.chunk(0), "first chunk not gone") + require.Equal(t, countBefore/2, len(s.chunks), "chunks not truncated correctly") + require.Equal(t, lastChunk, s.chunk(lastID), "last chunk does not match") + + // Validate that the series' sample buffer is applied correctly to the last chunk + // after truncation. + it1 := s.iterator(s.chunkID(len(s.chunks) - 1)) + _, ok := it1.(*memSafeIterator) + require.True(t, ok, "last chunk not wrapped with sample buffer") + + it2 := s.iterator(s.chunkID(len(s.chunks) - 2)) + _, ok = it2.(*memSafeIterator) + require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer") +} + // func TestHBDeleteSimple(t *testing.T) { // numSamples := int64(10) diff --git a/wal.go b/wal.go index fc0109aefa..dcf3fc9e9f 100644 --- a/wal.go +++ b/wal.go @@ -320,7 +320,7 @@ func (w *SegmentWAL) Sync() error { return } tail = w.tail() - } () + }() if err != nil { return err