Don't update head postings mapper on every append

This commit is contained in:
Fabian Reinartz 2017-01-06 16:27:50 +01:00
parent 71efd2e08d
commit 63e12807da
3 changed files with 67 additions and 38 deletions

7
db.go
View file

@ -213,6 +213,13 @@ func (db *DB) compact(blocks []block) error {
} }
tmpdir := blocks[0].dir() + ".tmp" tmpdir := blocks[0].dir() + ".tmp"
// TODO(fabxc): find a better place to do this transparently.
for _, b := range blocks {
if h, ok := b.(*HeadBlock); ok {
h.updateMapping()
}
}
if err := db.compactor.compact(tmpdir, blocks...); err != nil { if err := db.compactor.compact(tmpdir, blocks...); err != nil {
return err return err
} }

97
head.go
View file

@ -50,6 +50,7 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
values: map[string]stringset{}, values: map[string]stringset{},
postings: &memPostings{m: make(map[term][]uint32)}, postings: &memPostings{m: make(map[term][]uint32)},
wal: wal, wal: wal,
mapper: newPositionMapper(nil),
} }
b.bstats.MinTime = math.MaxInt64 b.bstats.MinTime = math.MaxInt64
@ -81,7 +82,7 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
return nil, err return nil, err
} }
b.rewriteMapping() b.updateMapping()
return b, nil return b, nil
} }
@ -134,23 +135,6 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) {
return h.postings.get(term{name: name, value: value}), nil return h.postings.get(term{name: name, value: value}), nil
} }
// remapPostings changes the order of the postings from their ID to the ordering
// of the series they reference.
// Returned postings have no longer monotonic IDs and MUST NOT be used for regular
// postings set operations, i.e. intersect and merge.
func (h *HeadBlock) remapPostings(p Postings) Postings {
list, err := expandPostings(p)
if err != nil {
return errPostings{err: err}
}
slice.Sort(list, func(i, j int) bool {
return h.mapper.fw[list[i]] < h.mapper.fw[list[j]]
})
return newListPostings(list)
}
// Series returns the series for the given reference. // Series returns the series for the given reference.
func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
if int(ref) >= len(h.descs) { if int(ref) >= len(h.descs) {
@ -288,11 +272,6 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
for i, s := range newSeries { for i, s := range newSeries {
h.create(newHashes[i], s) h.create(newHashes[i], s)
} }
// TODO(fabxc): just mark as dirty instead and trigger a remapping
// periodically and upon querying.
if len(newSeries) > 0 {
h.rewriteMapping()
}
for _, s := range samples { for _, s := range samples {
cd := h.descs[s.ref] cd := h.descs[s.ref]
@ -314,7 +293,14 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
return nil return nil
} }
func (h *HeadBlock) rewriteMapping() { func (h *HeadBlock) updateMapping() {
h.mapper.mtx.Lock()
defer h.mapper.mtx.Unlock()
if h.mapper.sortable != nil && h.mapper.Len() == len(h.descs) {
return
}
cds := make([]*chunkDesc, len(h.descs)) cds := make([]*chunkDesc, len(h.descs))
copy(cds, h.descs) copy(cds, h.descs)
@ -322,31 +308,44 @@ func (h *HeadBlock) rewriteMapping() {
return labels.Compare(cds[i].lset, cds[j].lset) < 0 return labels.Compare(cds[i].lset, cds[j].lset) < 0
}) })
h.mapper = newPositionMapper(s) h.mapper.update(s)
}
// remapPostings changes the order of the postings from their ID to the ordering
// of the series they reference.
// Returned postings have no longer monotonic IDs and MUST NOT be used for regular
// postings set operations, i.e. intersect and merge.
func (h *HeadBlock) remapPostings(p Postings) Postings {
list, err := expandPostings(p)
if err != nil {
return errPostings{err: err}
}
h.mapper.mtx.RLock()
defer h.mapper.mtx.RUnlock()
h.mapper.Sort(list)
slice.Sort(list, func(i, j int) bool {
return h.mapper.fw[list[i]] < h.mapper.fw[list[j]]
})
return newListPostings(list)
} }
// positionMapper stores a position mapping from unsorted to // positionMapper stores a position mapping from unsorted to
// sorted indices of a sortable collection. // sorted indices of a sortable collection.
type positionMapper struct { type positionMapper struct {
mtx sync.RWMutex
sortable sort.Interface sortable sort.Interface
iv, fw []int iv, fw []int
} }
func newPositionMapper(s sort.Interface) *positionMapper { func newPositionMapper(s sort.Interface) *positionMapper {
m := &positionMapper{ m := &positionMapper{}
sortable: s, if s != nil {
iv: make([]int, s.Len()), m.update(s)
fw: make([]int, s.Len()),
} }
for i := range m.iv {
m.iv[i] = i
}
sort.Sort(m)
for i, k := range m.iv {
m.fw[k] = i
}
return m return m
} }
@ -358,3 +357,25 @@ func (m *positionMapper) Swap(i, j int) {
m.iv[i], m.iv[j] = m.iv[j], m.iv[i] m.iv[i], m.iv[j] = m.iv[j], m.iv[i]
} }
func (m *positionMapper) Sort(l []uint32) {
slice.Sort(l, func(i, j int) bool {
return m.fw[l[i]] < m.fw[l[j]]
})
}
func (m *positionMapper) update(s sort.Interface) {
m.sortable = s
m.iv = make([]int, s.Len())
m.fw = make([]int, s.Len())
for i := range m.iv {
m.iv[i] = i
}
sort.Sort(m)
for i, k := range m.iv {
m.fw[k] = i
}
}

View file

@ -65,6 +65,7 @@ func (s *DB) Querier(mint, maxt int64) Querier {
// TODO(fabxc): find nicer solution. // TODO(fabxc): find nicer solution.
if hb, ok := b.(*HeadBlock); ok { if hb, ok := b.(*HeadBlock); ok {
hb.updateMapping()
q.postingsMapper = hb.remapPostings q.postingsMapper = hb.remapPostings
} }
} }