Use ChunkMeta references for clarity

This has been a common source of hard to debug issues. Its a premature
and unbenchmarked optimization and semantically, we want ChunkMetas to
be references in all changed cases.
This commit is contained in:
Fabian Reinartz 2017-03-14 15:40:16 +01:00
parent d6fb6aaaa8
commit a8e8903350
6 changed files with 34 additions and 40 deletions

View file

@ -36,7 +36,7 @@ type ChunkWriter interface {
// must be populated. // must be populated.
// After returning successfully, the Ref fields in the ChunkMetas // After returning successfully, the Ref fields in the ChunkMetas
// is set and can be used to retrieve the chunks from the written data. // is set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...ChunkMeta) error WriteChunks(chunks ...*ChunkMeta) error
// Close writes any required finalization and closes the resources // Close writes any required finalization and closes the resources
// associated with the underlying writer. // associated with the underlying writer.
@ -156,7 +156,7 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error {
return err return err
} }
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
// Calculate maximum space we need and cut a new segment in case // Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one. // we don't fit into the current one.
maxLen := int64(binary.MaxVarintLen32) maxLen := int64(binary.MaxVarintLen32)
@ -184,9 +184,7 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
} }
seq := uint64(w.seq()) << 32 seq := uint64(w.seq()) << 32
for i := range chks { for _, chk := range chks {
chk := &chks[i]
chk.Ref = seq | uint64(w.n) chk.Ref = seq | uint64(w.n)
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))

View file

@ -334,7 +334,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
type compactionSet interface { type compactionSet interface {
Next() bool Next() bool
At() (labels.Labels, []ChunkMeta) At() (labels.Labels, []*ChunkMeta)
Err() error Err() error
} }
@ -344,7 +344,7 @@ type compactionSeriesSet struct {
chunks ChunkReader chunks ChunkReader
l labels.Labels l labels.Labels
c []ChunkMeta c []*ChunkMeta
err error err error
} }
@ -365,9 +365,7 @@ func (c *compactionSeriesSet) Next() bool {
if c.err != nil { if c.err != nil {
return false return false
} }
for i := range c.c { for _, chk := range c.c {
chk := &c.c[i]
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref) chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
if c.err != nil { if c.err != nil {
return false return false
@ -384,7 +382,7 @@ func (c *compactionSeriesSet) Err() error {
return c.p.Err() return c.p.Err()
} }
func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta) { func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta) {
return c.l, c.c return c.l, c.c
} }
@ -393,12 +391,12 @@ type compactionMerger struct {
aok, bok bool aok, bok bool
l labels.Labels l labels.Labels
c []ChunkMeta c []*ChunkMeta
} }
type compactionSeries struct { type compactionSeries struct {
labels labels.Labels labels labels.Labels
chunks []ChunkMeta chunks []*ChunkMeta
} }
func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) {
@ -459,7 +457,7 @@ func (c *compactionMerger) Err() error {
return c.b.Err() return c.b.Err()
} }
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) { func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta) {
return c.l, c.c return c.l, c.c
} }

View file

@ -480,7 +480,7 @@ func (h *headIndexReader) Postings(name, value string) (Postings, error) {
} }
// Series returns the series for the given reference. // Series returns the series for the given reference.
func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
h.mtx.RLock() h.mtx.RLock()
defer h.mtx.RUnlock() defer h.mtx.RUnlock()
@ -488,13 +488,13 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error)
return nil, nil, ErrNotFound return nil, nil, ErrNotFound
} }
s := h.series[ref] s := h.series[ref]
metas := make([]ChunkMeta, 0, len(s.chunks)) metas := make([]*ChunkMeta, 0, len(s.chunks))
s.mtx.RLock() s.mtx.RLock()
defer s.mtx.RUnlock() defer s.mtx.RUnlock()
for i, c := range s.chunks { for i, c := range s.chunks {
metas = append(metas, ChunkMeta{ metas = append(metas, &ChunkMeta{
MinTime: c.minTime, MinTime: c.minTime,
MaxTime: c.maxTime, MaxTime: c.maxTime,
Ref: (uint64(ref) << 32) | uint64(i), Ref: (uint64(ref) << 32) | uint64(i),

View file

@ -33,7 +33,7 @@ type IndexWriter interface {
// of chunks that the index can reference. // of chunks that the index can reference.
// The reference number is used to resolve a series against the postings // The reference number is used to resolve a series against the postings
// list iterator. It only has to be available during the write processing. // list iterator. It only has to be available during the write processing.
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error
// WriteLabelIndex serializes an index from label names to values. // WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names. // The passed in values chained tuples of strings of the length of names.
@ -49,7 +49,7 @@ type IndexWriter interface {
type indexWriterSeries struct { type indexWriterSeries struct {
labels labels.Labels labels labels.Labels
chunks []ChunkMeta // series file offset of chunks chunks []*ChunkMeta // series file offset of chunks
offset uint32 // index file offset of series reference offset uint32 // index file offset of series reference
} }
@ -142,7 +142,7 @@ func (w *indexWriter) writeMeta() error {
return w.write(w.bufw, b[:]) return w.write(w.bufw, b[:])
} }
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
if _, ok := w.series[ref]; ok { if _, ok := w.series[ref]; ok {
return errors.Errorf("series with reference %d already added", ref) return errors.Errorf("series with reference %d already added", ref)
} }
@ -419,7 +419,7 @@ type IndexReader interface {
Postings(name, value string) (Postings, error) Postings(name, value string) (Postings, error)
// Series returns the series for the given reference. // Series returns the series for the given reference.
Series(ref uint32) (labels.Labels, []ChunkMeta, error) Series(ref uint32) (labels.Labels, []*ChunkMeta, error)
// LabelIndices returns the label pairs for which indices exist. // LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error) LabelIndices() ([][]string, error)
@ -599,7 +599,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
return res, nil return res, nil
} }
func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
k, n := binary.Uvarint(r.b[ref:]) k, n := binary.Uvarint(r.b[ref:])
if n < 1 { if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "number of labels") return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
@ -642,7 +642,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
} }
b = b[n:] b = b[n:]
chunks := make([]ChunkMeta, 0, k) chunks := make([]*ChunkMeta, 0, k)
for i := 0; i < int(k); i++ { for i := 0; i < int(k); i++ {
firstTime, n := binary.Varint(b) firstTime, n := binary.Varint(b)
@ -663,7 +663,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
} }
b = b[n:] b = b[n:]
chunks = append(chunks, ChunkMeta{ chunks = append(chunks, &ChunkMeta{
Ref: o, Ref: o,
MinTime: firstTime, MinTime: firstTime,
MaxTime: lastTime, MaxTime: lastTime,

View file

@ -121,10 +121,10 @@ func TestPersistence_index_e2e(t *testing.T) {
// Generate ChunkMetas for every label set. // Generate ChunkMetas for every label set.
for i, lset := range lbls { for i, lset := range lbls {
var metas []ChunkMeta var metas []*ChunkMeta
for j := 0; j <= (i % 20); j++ { for j := 0; j <= (i % 20); j++ {
metas = append(metas, ChunkMeta{ metas = append(metas, &ChunkMeta{
MinTime: int64(j * 10000), MinTime: int64(j * 10000),
MaxTime: int64((j + 1) * 10000), MaxTime: int64((j + 1) * 10000),
Ref: rand.Uint64(), Ref: rand.Uint64(),

View file

@ -345,7 +345,7 @@ func (s *mergedSeriesSet) Next() bool {
type chunkSeriesSet interface { type chunkSeriesSet interface {
Next() bool Next() bool
At() (labels.Labels, []ChunkMeta) At() (labels.Labels, []*ChunkMeta)
Err() error Err() error
} }
@ -357,11 +357,11 @@ type baseChunkSeries struct {
absent []string // labels that must be unset in results. absent []string // labels that must be unset in results.
lset labels.Labels lset labels.Labels
chks []ChunkMeta chks []*ChunkMeta
err error err error
} }
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks }
func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) Next() bool { func (s *baseChunkSeries) Next() bool {
@ -400,20 +400,18 @@ type populatedChunkSeries struct {
mint, maxt int64 mint, maxt int64
err error err error
chks []ChunkMeta chks []*ChunkMeta
lset labels.Labels lset labels.Labels
} }
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks }
func (s *populatedChunkSeries) Err() error { return s.err } func (s *populatedChunkSeries) Err() error { return s.err }
func (s *populatedChunkSeries) Next() bool { func (s *populatedChunkSeries) Next() bool {
for s.set.Next() { for s.set.Next() {
lset, chks := s.set.At() lset, chks := s.set.At()
for i := range chks { for i, c := range chks {
c := &chks[i]
if c.MaxTime < s.mint { if c.MaxTime < s.mint {
chks = chks[1:] chks = chks[1:]
continue continue
@ -468,7 +466,7 @@ func (s *blockSeriesSet) Err() error { return s.err }
// time series data. // time series data.
type chunkSeries struct { type chunkSeries struct {
labels labels.Labels labels labels.Labels
chunks []ChunkMeta // in-order chunk refs chunks []*ChunkMeta // in-order chunk refs
} }
func (s *chunkSeries) Labels() labels.Labels { func (s *chunkSeries) Labels() labels.Labels {
@ -562,13 +560,13 @@ func (it *chainedSeriesIterator) Err() error {
// chunkSeriesIterator implements a series iterator on top // chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks. // of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct { type chunkSeriesIterator struct {
chunks []ChunkMeta chunks []*ChunkMeta
i int i int
cur chunks.Iterator cur chunks.Iterator
} }
func newChunkSeriesIterator(cs []ChunkMeta) *chunkSeriesIterator { func newChunkSeriesIterator(cs []*ChunkMeta) *chunkSeriesIterator {
return &chunkSeriesIterator{ return &chunkSeriesIterator{
chunks: cs, chunks: cs,
i: 0, i: 0,