tsdb/index: use ScratchBuilder to create Labels

This necessitates a change to the `tsdb.IndexReader` interface:
`index.Reader` is used from multiple goroutines concurrently, so we
can't have state in it.

We do retain a `ScratchBuilder` in `blockBaseSeriesSet` which is
iterator-like.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2022-06-28 16:03:26 +01:00
parent 927a14b0e9
commit d3d96ec887
10 changed files with 45 additions and 30 deletions

View file

@ -472,8 +472,9 @@ func analyzeBlock(path, blockID string, limit int, runExtended bool) error {
}
lbls := labels.Labels{}
chks := []chunks.Meta{}
builder := labels.ScratchBuilder{}
for p.Next() {
if err = ir.Series(p.At(), &lbls, &chks); err != nil {
if err = ir.Series(p.At(), &builder, &lbls, &chks); err != nil {
return err
}
// Amount of the block time range not covered by this series.
@ -589,10 +590,11 @@ func analyzeCompaction(block tsdb.BlockReader, indexr tsdb.IndexReader) (err err
nBuckets := 10
histogram := make([]int, nBuckets)
totalChunks := 0
var builder labels.ScratchBuilder
for postingsr.Next() {
lbsl := labels.Labels{}
var chks []chunks.Meta
if err := indexr.Series(postingsr.At(), &lbsl, &chks); err != nil {
if err := indexr.Series(postingsr.At(), &builder, &lbsl, &chks); err != nil {
return err
}

View file

@ -82,7 +82,7 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]chunks.Meta) error
Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, lset *labels.Labels, chks *[]chunks.Meta) error
// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)
@ -499,8 +499,8 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}
func (r blockIndexReader) Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]chunks.Meta) error {
if err := r.ir.Series(ref, lset, chks); err != nil {
func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, lset *labels.Labels, chks *[]chunks.Meta) error {
if err := r.ir.Series(ref, builder, lset, chks); err != nil {
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return nil
@ -563,10 +563,11 @@ func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
var lset labels.Labels
var chks []chunks.Meta
var builder labels.ScratchBuilder
Outer:
for p.Next() {
err := ir.Series(p.At(), &lset, &chks)
err := ir.Series(p.At(), &builder, &lset, &chks)
if err != nil {
return err
}

View file

@ -1830,6 +1830,8 @@ func TestChunkAtBlockBoundary(t *testing.T) {
err = db.Compact()
require.NoError(t, err)
var builder labels.ScratchBuilder
for _, block := range db.Blocks() {
r, err := block.Index()
require.NoError(t, err)
@ -1849,7 +1851,7 @@ func TestChunkAtBlockBoundary(t *testing.T) {
chunkCount := 0
for p.Next() {
err = r.Series(p.At(), &lset, &chks)
err = r.Series(p.At(), &builder, &lset, &chks)
require.NoError(t, err)
for _, c := range chks {
require.True(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime,

View file

@ -148,7 +148,7 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
}
// Series returns the series for the given reference.
func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chks *[]chunks.Meta) error {
func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, lbls *labels.Labels, chks *[]chunks.Meta) error {
s := h.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {

View file

@ -1446,10 +1446,11 @@ func TestGCChunkAccess(t *testing.T) {
idx := h.indexRange(0, 1500)
var (
lset labels.Labels
chunks []chunks.Meta
lset labels.Labels
chunks []chunks.Meta
builder labels.ScratchBuilder
)
require.NoError(t, idx.Series(1, &lset, &chunks))
require.NoError(t, idx.Series(1, &builder, &lset, &chunks))
require.Equal(t, labels.FromStrings("a", "1"), lset)
require.Equal(t, 2, len(chunks))
@ -1499,10 +1500,11 @@ func TestGCSeriesAccess(t *testing.T) {
idx := h.indexRange(0, 2000)
var (
lset labels.Labels
chunks []chunks.Meta
lset labels.Labels
chunks []chunks.Meta
builder labels.ScratchBuilder
)
require.NoError(t, idx.Series(1, &lset, &chunks))
require.NoError(t, idx.Series(1, &builder, &lset, &chunks))
require.Equal(t, labels.FromStrings("a", "1"), lset)
require.Equal(t, 2, len(chunks))

View file

@ -1597,7 +1597,7 @@ func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, erro
}
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]chunks.Meta) error {
func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, lbls *labels.Labels, chks *[]chunks.Meta) error {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
@ -1608,7 +1608,7 @@ func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]chunk
if d.Err() != nil {
return d.Err()
}
return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series")
return errors.Wrap(r.dec.Series(d.Get(), builder, lbls, chks), "read series")
}
func (r *Reader) Postings(name string, values ...string) (Postings, error) {
@ -1836,8 +1836,9 @@ func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
}
// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
*lbls = (*lbls)[:0]
// Previous contents of lbls can be overwritten - make sure you copy before retaining.
func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, lbls *labels.Labels, chks *[]chunks.Meta) error {
builder.Reset()
*chks = (*chks)[:0]
d := encoding.Decbuf{B: b}
@ -1861,8 +1862,9 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
return errors.Wrap(err, "lookup label value")
}
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
builder.Add(ln, lv)
}
builder.Overwrite(lbls)
// Read the chunks meta data.
k = d.Uvarint()

View file

@ -124,7 +124,7 @@ func (m mockIndex) SortedPostings(p Postings) Postings {
return NewListPostings(ep)
}
func (m mockIndex) Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]chunks.Meta) error {
func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, lset *labels.Labels, chks *[]chunks.Meta) error {
s, ok := m.series[ref]
if !ok {
return errors.New("not found")
@ -199,9 +199,10 @@ func TestIndexRW_Postings(t *testing.T) {
var l labels.Labels
var c []chunks.Meta
var builder labels.ScratchBuilder
for i := 0; p.Next(); i++ {
err := ir.Series(p.At(), &l, &c)
err := ir.Series(p.At(), &builder, &l, &c)
require.NoError(t, err)
require.Equal(t, 0, len(c))
@ -311,6 +312,7 @@ func TestPostingsMany(t *testing.T) {
{in: []string{"126a", "126b", "127", "127a", "127b", "128", "128a", "128b", "129", "129a", "129b"}},
}
var builder labels.ScratchBuilder
for _, c := range cases {
it, err := ir.Postings("i", c.in...)
require.NoError(t, err)
@ -319,7 +321,7 @@ func TestPostingsMany(t *testing.T) {
var lbls labels.Labels
var metas []chunks.Meta
for it.Next() {
require.NoError(t, ir.Series(it.At(), &lbls, &metas))
require.NoError(t, ir.Series(it.At(), &builder, &lbls, &metas))
got = append(got, lbls.Get("i"))
}
require.NoError(t, it.Err())
@ -421,16 +423,17 @@ func TestPersistence_index_e2e(t *testing.T) {
var lset, explset labels.Labels
var chks, expchks []chunks.Meta
var builder labels.ScratchBuilder
for gotp.Next() {
require.True(t, expp.Next())
ref := gotp.At()
err := ir.Series(ref, &lset, &chks)
err := ir.Series(ref, &builder, &lset, &chks)
require.NoError(t, err)
err = mi.Series(expp.At(), &explset, &expchks)
err = mi.Series(expp.At(), &builder, &explset, &expchks)
require.NoError(t, err)
require.Equal(t, explset, lset)
require.Equal(t, expchks, chks)

View file

@ -452,12 +452,13 @@ type blockBaseSeriesSet struct {
bufChks []chunks.Meta
bufLbls labels.Labels
builder labels.ScratchBuilder
err error
}
func (b *blockBaseSeriesSet) Next() bool {
for b.p.Next() {
if err := b.index.Series(b.p.At(), &b.bufLbls, &b.bufChks); err != nil {
if err := b.index.Series(b.p.At(), &b.builder, &b.bufLbls, &b.bufChks); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == storage.ErrNotFound {
continue

View file

@ -1270,7 +1270,7 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
return index.NewListPostings(ep)
}
func (m mockIndex) Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]chunks.Meta) error {
func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, lset *labels.Labels, chks *[]chunks.Meta) error {
s, ok := m.series[ref]
if !ok {
return storage.ErrNotFound
@ -1884,9 +1884,10 @@ func TestPostingsForMatchers(t *testing.T) {
p, err := PostingsForMatchers(ir, c.matchers...)
require.NoError(t, err)
var builder labels.ScratchBuilder
for p.Next() {
lbls := labels.Labels{}
require.NoError(t, ir.Series(p.At(), &lbls, &[]chunks.Meta{}))
require.NoError(t, ir.Series(p.At(), &builder, &lbls, &[]chunks.Meta{}))
if _, ok := exp[lbls.String()]; !ok {
t.Errorf("Evaluating %v, unexpected result %s", c.matchers, lbls.String())
} else {
@ -2097,7 +2098,7 @@ func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings {
return index.EmptyPostings()
}
func (m mockMatcherIndex) Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]chunks.Meta) error {
func (m mockMatcherIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, lset *labels.Labels, chks *[]chunks.Meta) error {
return nil
}

View file

@ -80,11 +80,12 @@ func TestRepairBadIndexVersion(t *testing.T) {
require.NoError(t, err)
p, err := r.Postings("b", "1")
require.NoError(t, err)
var builder labels.ScratchBuilder
for p.Next() {
t.Logf("next ID %d", p.At())
var lset labels.Labels
require.Error(t, r.Series(p.At(), &lset, nil))
require.Error(t, r.Series(p.At(), &builder, &lset, nil))
}
require.NoError(t, p.Err())
require.NoError(t, r.Close())
@ -106,7 +107,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
var lset labels.Labels
var chks []chunks.Meta
require.NoError(t, r.Series(p.At(), &lset, &chks))
require.NoError(t, r.Series(p.At(), &builder, &lset, &chks))
res = append(res, lset)
}