Drop position mapper from head block

The position mapper was intended to pre-computed "expensive" ordering
of label sets. It was expensive to update and caused a lot of trouble.
Skipping this optimization entirely actually revelead it was pointless
and even harmful from the e2e perspective.
This commit is contained in:
Fabian Reinartz 2017-03-15 14:44:29 +01:00
parent ad5812d83a
commit 34efe4e2c8
2 changed files with 3 additions and 133 deletions

108
head.go
View file

@ -10,7 +10,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/bradfitz/slice"
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
@ -47,9 +46,6 @@ type headBlock struct {
// descs holds all chunk descs for the head block. Each chunk implicitly // descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID. // is assigned the index as its ID.
series []*memSeries series []*memSeries
// mapping maps a series ID to its position in an ordered list
// of all series. The orderDirty flag indicates that it has gone stale.
mapper *positionMapper
// hashes contains a collision map of label set hashes of chunks // hashes contains a collision map of label set hashes of chunks
// to their chunk descs. // to their chunk descs.
hashes map[uint64][]*memSeries hashes map[uint64][]*memSeries
@ -109,11 +105,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
hashes: map[uint64][]*memSeries{}, hashes: map[uint64][]*memSeries{},
values: map[string]stringset{}, values: map[string]stringset{},
postings: &memPostings{m: make(map[term][]uint32)}, postings: &memPostings{m: make(map[term][]uint32)},
mapper: newPositionMapper(nil),
meta: *meta, meta: *meta,
updatec: make(chan struct{}, 1),
stopc: make(chan struct{}),
donec: make(chan struct{}),
} }
r := wal.Reader() r := wal.Reader()
@ -138,25 +130,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
return nil, errors.Wrap(err, "consume WAL") return nil, errors.Wrap(err, "consume WAL")
} }
go h.run()
h.updatec <- struct{}{}
return h, nil return h, nil
} }
func (h *headBlock) run() {
defer close(h.donec)
for {
select {
case <-h.stopc:
return
case <-h.updatec:
h.updateMapping()
}
}
}
// inBounds returns true if the given timestamp is within the valid // inBounds returns true if the given timestamp is within the valid
// time bounds of the block. // time bounds of the block.
func (h *headBlock) inBounds(t int64) bool { func (h *headBlock) inBounds(t int64) bool {
@ -348,11 +324,6 @@ func (a *headAppender) createSeries() {
a.mtx.Unlock() a.mtx.Unlock()
a.mtx.RLock() a.mtx.RLock()
select {
case a.updatec <- struct{}{}:
default:
}
} }
func (a *headAppender) Commit() error { func (a *headAppender) Commit() error {
@ -556,31 +527,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
return s return s
} }
func (h *headBlock) updateMapping() {
h.mtx.RLock()
// No need to rlock the mapper as this method is not run concurrently and
// the only one ever modifying the mapper.
if h.mapper.sortable != nil && h.mapper.Len() == len(h.series) {
h.mtx.RUnlock()
return
}
series := make([]*memSeries, len(h.series))
copy(series, h.series)
h.mtx.RUnlock()
s := slice.SortInterface(series, func(i, j int) bool {
return labels.Compare(series[i].lset, series[j].lset) < 0
})
m := newPositionMapper(s)
h.mtx.Lock()
h.mapper = m
h.mtx.Unlock()
}
// remapPostings changes the order of the postings from their ID to the ordering // remapPostings changes the order of the postings from their ID to the ordering
// of the series they reference. // of the series they reference.
// Returned postings have no longer monotonic IDs and MUST NOT be used for regular // Returned postings have no longer monotonic IDs and MUST NOT be used for regular
@ -590,20 +536,16 @@ func (h *headBlock) remapPostings(p Postings) Postings {
// covers existing metrics. // covers existing metrics.
ep := make([]uint32, 0, 64) ep := make([]uint32, 0, 64)
max := uint32(h.mapper.Len())
for p.Next() { for p.Next() {
if p.At() >= max {
break
}
ep = append(ep, p.At()) ep = append(ep, p.At())
} }
if err := p.Err(); err != nil { if err := p.Err(); err != nil {
return errPostings{err: errors.Wrap(err, "expand postings")} return errPostings{err: errors.Wrap(err, "expand postings")}
} }
h.mapper.Sort(ep) sort.Slice(ep, func(i, j int) bool {
return labels.Compare(h.series[i].lset, h.series[j].lset) < 0
})
return newListPostings(ep) return newListPostings(ep)
} }
@ -716,47 +658,3 @@ func (it *memSafeIterator) At() (int64, float64) {
s := it.buf[4-(it.total-it.i)] s := it.buf[4-(it.total-it.i)]
return s.t, s.v return s.t, s.v
} }
// positionMapper stores a position mapping from unsorted to
// sorted indices of a sortable collection.
type positionMapper struct {
sortable sort.Interface
iv, fw []int
}
func newPositionMapper(s sort.Interface) *positionMapper {
m := &positionMapper{}
if s != nil {
m.update(s)
}
return m
}
func (m *positionMapper) Len() int { return m.sortable.Len() }
func (m *positionMapper) Less(i, j int) bool { return m.sortable.Less(i, j) }
func (m *positionMapper) Swap(i, j int) {
m.sortable.Swap(i, j)
m.iv[i], m.iv[j] = m.iv[j], m.iv[i]
}
func (m *positionMapper) Sort(l []uint32) {
slice.Sort(l, func(i, j int) bool { return m.fw[l[i]] < m.fw[l[j]] })
}
func (m *positionMapper) update(s sort.Interface) {
m.sortable = s
m.iv = make([]int, s.Len())
m.fw = make([]int, s.Len())
for i := range m.iv {
m.iv[i] = i
}
sort.Sort(m)
for i, k := range m.iv {
m.fw[k] = i
}
}

View file

@ -3,7 +3,6 @@ package tsdb
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"sort"
"testing" "testing"
"unsafe" "unsafe"
@ -15,33 +14,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestPositionMapper(t *testing.T) {
cases := []struct {
in []int
res []int
}{
{
in: []int{5, 4, 3, 2, 1, 0},
res: []int{5, 4, 3, 2, 1, 0},
},
{
in: []int{1, 2, 0, 3},
res: []int{1, 2, 0, 3},
},
{
in: []int{1, 2, 0, 3, 10, 100, -10},
res: []int{2, 3, 1, 4, 5, 6, 0},
},
}
for _, c := range cases {
m := newPositionMapper(sort.IntSlice(c.in))
require.True(t, sort.IsSorted(m.sortable))
require.Equal(t, c.res, m.fw)
}
}
func BenchmarkCreateSeries(b *testing.B) { func BenchmarkCreateSeries(b *testing.B) {
lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6) lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6)
require.NoError(b, err) require.NoError(b, err)