diff --git a/block.go b/block.go index 53f8b6c4fb..06896b8946 100644 --- a/block.go +++ b/block.go @@ -252,7 +252,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { stones := map[uint32]intervals{} var lset labels.Labels - var chks []*ChunkMeta + var chks []ChunkMeta Outer: for p.Next() { diff --git a/chunks.go b/chunks.go index 075384cd59..477d9588c7 100644 --- a/chunks.go +++ b/chunks.go @@ -100,7 +100,7 @@ type ChunkWriter interface { // must be populated. // After returning successfully, the Ref fields in the ChunkMetas // are set and can be used to retrieve the chunks from the written data. - WriteChunks(chunks ...*ChunkMeta) error + WriteChunks(chunks ...ChunkMeta) error // Close writes any required finalization and closes the resources // associated with the underlying writer. @@ -222,7 +222,7 @@ func (w *chunkWriter) write(b []byte) error { return err } -func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { +func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { // Calculate maximum space we need and cut a new segment in case // we don't fit into the current one. maxLen := int64(binary.MaxVarintLen32) // The number of chunks. @@ -239,17 +239,14 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { } b := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(b, uint64(len(chks))) - - if err := w.write(b[:n]); err != nil { - return err - } seq := uint64(w.seq()) << 32 - for _, chk := range chks { + for i := range chks { + chk := &chks[i] + chk.Ref = seq | uint64(w.n) - n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) + n := binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) if err := w.write(b[:n]); err != nil { return err diff --git a/compact.go b/compact.go index 0027cd50ce..d0d90ea1e2 100644 --- a/compact.go +++ b/compact.go @@ -497,7 +497,9 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo } valset.set(l.Value) - postings.add(i, term{name: l.Name, value: l.Value}) + t := term{name: l.Name, value: l.Value} + + postings.add(i, t) } i++ } @@ -536,7 +538,7 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo type compactionSet interface { Next() bool - At() (labels.Labels, []*ChunkMeta, intervals) + At() (labels.Labels, []ChunkMeta, intervals) Err() error } @@ -548,7 +550,7 @@ type compactionSeriesSet struct { series SeriesSet l labels.Labels - c []*ChunkMeta + c []ChunkMeta intervals intervals err error } @@ -574,7 +576,7 @@ func (c *compactionSeriesSet) Next() bool { // Remove completely deleted chunks. if len(c.intervals) > 0 { - chks := make([]*ChunkMeta, 0, len(c.c)) + chks := make([]ChunkMeta, 0, len(c.c)) for _, chk := range c.c { if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { chks = append(chks, chk) @@ -584,7 +586,9 @@ func (c *compactionSeriesSet) Next() bool { c.c = chks } - for _, chk := range c.c { + for i := range c.c { + chk := &c.c[i] + chk.Chunk, c.err = c.chunks.Chunk(chk.Ref) if c.err != nil { return false @@ -601,7 +605,7 @@ func (c *compactionSeriesSet) Err() error { return c.p.Err() } -func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) { +func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) { return c.l, c.c, c.intervals } @@ -610,7 +614,7 @@ type compactionMerger struct { aok, bok bool l labels.Labels - c []*ChunkMeta + c []ChunkMeta intervals intervals } @@ -651,7 +655,7 @@ func (c *compactionMerger) Next() bool { // While advancing child iterators the memory used for labels and chunks // may be reused. When picking a series we have to store the result. var lset labels.Labels - var chks []*ChunkMeta + var chks []ChunkMeta d := c.compare() // Both sets contain the current series. Chain them into a single one. @@ -691,7 +695,7 @@ func (c *compactionMerger) Err() error { return c.b.Err() } -func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta, intervals) { +func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, intervals) { return c.l, c.c, c.intervals } diff --git a/head.go b/head.go index 9e99d3777d..4f5ed57557 100644 --- a/head.go +++ b/head.go @@ -702,7 +702,7 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings { } // Series returns the series for the given reference. -func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error { +func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { h.mtx.RLock() defer h.mtx.RUnlock() @@ -722,7 +722,7 @@ func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*Chunk *chks = (*chks)[:0] for i, c := range s.chunks { - *chks = append(*chks, &ChunkMeta{ + *chks = append(*chks, ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: (uint64(ref) << 32) | uint64(i), diff --git a/index.go b/index.go index c948ee27c2..e3cce3c00f 100644 --- a/index.go +++ b/index.go @@ -45,8 +45,8 @@ const compactionPageBytes = minSectorSize * 64 type indexWriterSeries struct { labels labels.Labels - chunks []*ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference + chunks []ChunkMeta // series file offset of chunks + offset uint32 // index file offset of series reference } type indexWriterSeriesSlice []*indexWriterSeries @@ -100,7 +100,7 @@ type IndexWriter interface { // their labels. // The reference numbers are used to resolve entries in postings lists that // are added later. - AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error + AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. @@ -261,7 +261,7 @@ func (w *indexWriter) writeMeta() error { return w.write(w.buf1.get()) } -func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { +func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { if err := w.ensureStage(idxStageSeries); err != nil { return err } @@ -471,6 +471,7 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { for _, r := range refs { w.buf2.putBE32(r) } + w.uint32s = refs w.buf1.reset() w.buf1.putBE32int(w.buf2.len()) @@ -524,7 +525,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. - Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error + Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error // LabelIndices returns the label pairs for which indices exist. LabelIndices() ([][]string, error) @@ -740,7 +741,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) { return res, nil } -func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error { +func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { d1 := r.decbufAt(int(ref)) d2 := d1.decbuf(int(d1.uvarint())) @@ -781,7 +782,7 @@ func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta return errors.Wrapf(d2.err(), "read meta for chunk %d", i) } - *chks = append(*chks, &ChunkMeta{ + *chks = append(*chks, ChunkMeta{ Ref: off, MinTime: mint, MaxTime: maxt, diff --git a/index_test.go b/index_test.go index 3616daf300..ab45ffccc2 100644 --- a/index_test.go +++ b/index_test.go @@ -29,7 +29,7 @@ import ( type series struct { l labels.Labels - chunks []*ChunkMeta + chunks []ChunkMeta } type mockIndex struct { @@ -52,7 +52,7 @@ func (m mockIndex) Symbols() (map[string]struct{}, error) { return m.symbols, nil } -func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error { +func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error { if _, ok := m.series[ref]; ok { return errors.Errorf("series with reference %d already added", ref) } @@ -64,9 +64,8 @@ func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) s := series{l: l} // Actual chunk data is not stored in the index. for _, c := range chunks { - cc := *c - cc.Chunk = nil - s.chunks = append(s.chunks, &cc) + c.Chunk = nil + s.chunks = append(s.chunks, c) } m.series[ref] = s @@ -126,7 +125,7 @@ func (m mockIndex) SortedPostings(p Postings) Postings { return newListPostings(ep) } -func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error { +func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error { s, ok := m.series[ref] if !ok { return ErrNotFound @@ -215,7 +214,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, err) var l labels.Labels - var c []*ChunkMeta + var c []ChunkMeta for i := 0; p.Next(); i++ { err := ir.Series(p.At(), &l, &c) @@ -252,10 +251,10 @@ func TestPersistence_index_e2e(t *testing.T) { // Generate ChunkMetas for every label set. for i, lset := range lbls { - var metas []*ChunkMeta + var metas []ChunkMeta for j := 0; j <= (i % 20); j++ { - metas = append(metas, &ChunkMeta{ + metas = append(metas, ChunkMeta{ MinTime: int64(j * 10000), MaxTime: int64((j + 1) * 10000), Ref: rand.Uint64(), @@ -333,7 +332,7 @@ func TestPersistence_index_e2e(t *testing.T) { expp, err := mi.Postings(p.name, p.value) var lset, explset labels.Labels - var chks, expchks []*ChunkMeta + var chks, expchks []ChunkMeta for gotp.Next() { require.True(t, expp.Next()) diff --git a/querier.go b/querier.go index a54acdd5a1..8c2f6cbee2 100644 --- a/querier.go +++ b/querier.go @@ -403,7 +403,7 @@ func (s *mergedSeriesSet) Next() bool { type chunkSeriesSet interface { Next() bool - At() (labels.Labels, []*ChunkMeta, intervals) + At() (labels.Labels, []ChunkMeta, intervals) Err() error } @@ -416,12 +416,12 @@ type baseChunkSeries struct { absent []string // labels that must be unset in results. lset labels.Labels - chks []*ChunkMeta + chks []ChunkMeta intervals intervals err error } -func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) { +func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) { return s.lset, s.chks, s.intervals } @@ -430,7 +430,7 @@ func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Next() bool { var ( lset labels.Labels - chunks []*ChunkMeta + chunks []ChunkMeta ) Outer: for s.p.Next() { @@ -453,7 +453,7 @@ Outer: if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. - chks := make([]*ChunkMeta, 0, len(s.chks)) + chks := make([]ChunkMeta, 0, len(s.chks)) for _, chk := range s.chks { if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) { chks = append(chks, chk) @@ -480,12 +480,12 @@ type populatedChunkSeries struct { mint, maxt int64 err error - chks []*ChunkMeta + chks []ChunkMeta lset labels.Labels intervals intervals } -func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) { +func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) { return s.lset, s.chks, s.intervals } func (s *populatedChunkSeries) Err() error { return s.err } @@ -501,8 +501,10 @@ func (s *populatedChunkSeries) Next() bool { chks = chks[1:] } - // Break out at the first chunk that has no overlap with mint, maxt. - for i, c := range chks { + for i := range chks { + c := &chks[i] + + // Break out at the first chunk that has no overlap with mint, maxt. if c.MinTime > s.maxt { chks = chks[:i] break @@ -564,7 +566,7 @@ func (s *blockSeriesSet) Err() error { return s.err } // time series data. type chunkSeries struct { labels labels.Labels - chunks []*ChunkMeta // in-order chunk refs + chunks []ChunkMeta // in-order chunk refs mint, maxt int64 @@ -667,7 +669,7 @@ func (it *chainedSeriesIterator) Err() error { // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - chunks []*ChunkMeta + chunks []ChunkMeta i int cur chunks.Iterator @@ -677,7 +679,7 @@ type chunkSeriesIterator struct { intervals intervals } -func newChunkSeriesIterator(cs []*ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator { it := cs[0].Chunk.Iterator() if len(dranges) > 0 { it = &deletedIterator{it: it, intervals: dranges} diff --git a/querier_test.go b/querier_test.go index d8e8f76574..7c87f83184 100644 --- a/querier_test.go +++ b/querier_test.go @@ -235,12 +235,12 @@ func createIdxChkReaders(tc []struct { for i, s := range tc { i = i + 1 // 0 is not a valid posting. - metas := make([]*ChunkMeta, 0, len(s.chunks)) + metas := make([]ChunkMeta, 0, len(s.chunks)) for _, chk := range s.chunks { // Collisions can be there, but for tests, its fine. ref := rand.Uint64() - metas = append(metas, &ChunkMeta{ + metas = append(metas, ChunkMeta{ MinTime: chk[0].t, MaxTime: chk[len(chk)-1].t, Ref: ref, @@ -661,7 +661,7 @@ Outer: func TestBaseChunkSeries(t *testing.T) { type refdSeries struct { lset labels.Labels - chunks []*ChunkMeta + chunks []ChunkMeta ref uint32 } @@ -677,7 +677,7 @@ func TestBaseChunkSeries(t *testing.T) { series: []refdSeries{ { lset: labels.New([]labels.Label{{"a", "a"}}...), - chunks: []*ChunkMeta{ + chunks: []ChunkMeta{ {Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344}, {Ref: 121}, }, @@ -685,19 +685,19 @@ func TestBaseChunkSeries(t *testing.T) { }, { lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), - chunks: []*ChunkMeta{ + chunks: []ChunkMeta{ {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, }, ref: 10, }, { lset: labels.New([]labels.Label{{"b", "c"}}...), - chunks: []*ChunkMeta{{Ref: 8282}}, + chunks: []ChunkMeta{{Ref: 8282}}, ref: 1, }, { lset: labels.New([]labels.Label{{"b", "b"}}...), - chunks: []*ChunkMeta{ + chunks: []ChunkMeta{ {Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269}, }, ref: 108, @@ -711,14 +711,14 @@ func TestBaseChunkSeries(t *testing.T) { series: []refdSeries{ { lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), - chunks: []*ChunkMeta{ + chunks: []ChunkMeta{ {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, }, ref: 10, }, { lset: labels.New([]labels.Label{{"b", "c"}}...), - chunks: []*ChunkMeta{{Ref: 8282}}, + chunks: []ChunkMeta{{Ref: 8282}}, ref: 1, }, }, @@ -766,7 +766,7 @@ type itSeries struct { func (s itSeries) Iterator() SeriesIterator { return s.si } func (s itSeries) Labels() labels.Labels { return labels.Labels{} } -func chunkFromSamples(s []sample) *ChunkMeta { +func chunkFromSamples(s []sample) ChunkMeta { mint, maxt := int64(0), int64(0) if len(s) > 0 { @@ -779,11 +779,10 @@ func chunkFromSamples(s []sample) *ChunkMeta { for _, s := range s { ca.Append(s.t, s.v) } - return &ChunkMeta{ + return ChunkMeta{ MinTime: mint, MaxTime: maxt, - - Chunk: c, + Chunk: c, } } @@ -945,7 +944,7 @@ func TestSeriesIterator(t *testing.T) { t.Run("Chunk", func(t *testing.T) { for _, tc := range itcases { - chkMetas := []*ChunkMeta{ + chkMetas := []ChunkMeta{ chunkFromSamples(tc.a), chunkFromSamples(tc.b), chunkFromSamples(tc.c), @@ -1016,7 +1015,7 @@ func TestSeriesIterator(t *testing.T) { seekcases2 := append(seekcases, extra...) for _, tc := range seekcases2 { - chkMetas := []*ChunkMeta{ + chkMetas := []ChunkMeta{ chunkFromSamples(tc.a), chunkFromSamples(tc.b), chunkFromSamples(tc.c), @@ -1103,7 +1102,7 @@ func TestSeriesIterator(t *testing.T) { // Regression for: https://github.com/prometheus/tsdb/pull/97 func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { - chkMetas := []*ChunkMeta{ + chkMetas := []ChunkMeta{ chunkFromSamples([]sample{}), chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}), chunkFromSamples([]sample{{4, 4}, {5, 5}}), @@ -1120,7 +1119,7 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { // Regression when seeked chunks were still found via binary search and we always // skipped to the end when seeking a value in the current chunk. func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { - metas := []*ChunkMeta{ + metas := []ChunkMeta{ chunkFromSamples([]sample{}), chunkFromSamples([]sample{{1, 2}, {3, 4}, {5, 6}, {7, 8}}), chunkFromSamples([]sample{}), @@ -1141,7 +1140,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})} - chunkMetas := [][]*ChunkMeta{ + chunkMetas := [][]ChunkMeta{ { {MinTime: 1, MaxTime: 2, Ref: 1}, {MinTime: 3, MaxTime: 4, Ref: 2}, @@ -1173,7 +1172,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { require.False(t, p.Next()) // Test the case where 1 chunk could cause an unpopulated chunk to be returned. - chunkMetas = [][]*ChunkMeta{ + chunkMetas = [][]ChunkMeta{ { {MinTime: 1, MaxTime: 2, Ref: 1}, }, @@ -1193,7 +1192,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { type mockChunkSeriesSet struct { l []labels.Labels - cm [][]*ChunkMeta + cm [][]ChunkMeta i int } @@ -1206,7 +1205,7 @@ func (m *mockChunkSeriesSet) Next() bool { return m.i < len(m.l) } -func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) { +func (m *mockChunkSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) { return m.l[m.i], m.cm[m.i], nil }