From 9790aa98acca47158dcffce639064cff7ec2526f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 5 Jan 2017 15:13:01 +0100 Subject: [PATCH] Add postings wrapper that emits head postings in label set order This adds a position mapper that takes series from a head block in the order they were appended and creates a mapping representing them in order of their label sets. Write-repair of the postings list would cause very expensive writing. Hence, we keep them as they are and only apply the postition mapping at the very end, after a postings list has been sufficienctly reduced through intersections etc. --- compact.go | 5 +++- head.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++---- head_test.go | 35 +++++++++++++++++++++++ querier.go | 46 ++++++++++++------------------ 4 files changed, 131 insertions(+), 34 deletions(-) create mode 100644 head_test.go diff --git a/compact.go b/compact.go index 13e29a56f..9c2103d47 100644 --- a/compact.go +++ b/compact.go @@ -249,6 +249,10 @@ func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWrite if err != nil { return err } + // TODO(fabxc): find more transparent way of handling this. + if hb, ok := b.(*HeadBlock); ok { + all = hb.remapPostings(all) + } s := newCompactionSeriesSet(b.index(), b.series(), all) if i == 0 { @@ -274,7 +278,6 @@ func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWrite if err := chunkw.WriteSeries(i, lset, chunks); err != nil { return err } - fmt.Println("next", lset, chunks) stats.ChunkCount += uint32(len(chunks)) stats.SeriesCount++ diff --git a/head.go b/head.go index f946d9e3d..55a508441 100644 --- a/head.go +++ b/head.go @@ -6,6 +6,7 @@ import ( "sort" "sync" + "github.com/bradfitz/slice" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" ) @@ -18,6 +19,9 @@ type HeadBlock struct { // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. descs []*chunkDesc + // mapping maps a series ID to its position in an ordered list + // of all series. The orderDirty flag indicates that it has gone stale. + mapper *positionMapper // hashes contains a collision map of label set hashes of chunks // to their chunk descs. hashes map[uint64][]*chunkDesc @@ -60,6 +64,8 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { return nil, err } + b.rewriteMapping() + return b, nil } @@ -103,11 +109,7 @@ func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) { } sort.Strings(sl) - t := &stringTuples{ - l: len(names), - s: sl, - } - return t, nil + return &stringTuples{l: len(names), s: sl}, nil } // Postings returns the postings list iterator for the label pair. @@ -115,6 +117,23 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) { return h.postings.get(term{name: name, value: value}), nil } +// remapPostings changes the order of the postings from their ID to the ordering +// of the series they reference. +// Returned postings have no longer monotonic IDs and MUST NOT be used for regular +// postings set operations, i.e. intersect and merge. +func (h *HeadBlock) remapPostings(p Postings) Postings { + list, err := expandPostings(p) + if err != nil { + return errPostings{err: err} + } + + slice.Sort(list, func(i, j int) bool { + return h.mapper.fw[list[i]] < h.mapper.fw[list[j]] + }) + + return newListPostings(list) +} + // Series returns the series for the given reference. func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { if int(ref) >= len(h.descs) { @@ -253,6 +272,11 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { for i, s := range newSeries { h.create(newHashes[i], s) } + // TODO(fabxc): just mark as dirty instead and trigger a remapping + // periodically and upon querying. + if len(newSeries) > 0 { + h.rewriteMapping() + } for _, s := range samples { cd := h.descs[s.ref] @@ -270,3 +294,48 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { return nil } + +func (h *HeadBlock) rewriteMapping() { + cds := make([]*chunkDesc, len(h.descs)) + copy(cds, h.descs) + + s := slice.SortInterface(cds, func(i, j int) bool { + return labels.Compare(cds[i].lset, cds[j].lset) < 0 + }) + + h.mapper = newPositionMapper(s) +} + +// positionMapper stores a position mapping from unsorted to +// sorted indices of a sortable collection. +type positionMapper struct { + sortable sort.Interface + iv, fw []int +} + +func newPositionMapper(s sort.Interface) *positionMapper { + m := &positionMapper{ + sortable: s, + iv: make([]int, s.Len()), + fw: make([]int, s.Len()), + } + for i := range m.iv { + m.iv[i] = i + } + sort.Sort(m) + + for i, k := range m.iv { + m.fw[k] = i + } + + return m +} + +func (m *positionMapper) Len() int { return m.sortable.Len() } +func (m *positionMapper) Less(i, j int) bool { return m.sortable.Less(i, j) } + +func (m *positionMapper) Swap(i, j int) { + m.sortable.Swap(i, j) + + m.iv[i], m.iv[j] = m.iv[j], m.iv[i] +} diff --git a/head_test.go b/head_test.go new file mode 100644 index 000000000..7cecb235c --- /dev/null +++ b/head_test.go @@ -0,0 +1,35 @@ +package tsdb + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPositionMapper(t *testing.T) { + cases := []struct { + in []int + res []int + }{ + { + in: []int{5, 4, 3, 2, 1, 0}, + res: []int{5, 4, 3, 2, 1, 0}, + }, + { + in: []int{1, 2, 0, 3}, + res: []int{1, 2, 0, 3}, + }, + { + in: []int{1, 2, 0, 3, 10, 100, -10}, + res: []int{2, 3, 1, 4, 5, 6, 0}, + }, + } + + for _, c := range cases { + m := newPositionMapper(sort.IntSlice(c.in)) + + require.True(t, sort.IsSorted(m.sortable)) + require.Equal(t, c.res, m.fw) + } +} diff --git a/querier.go b/querier.go index 9931a2423..587c941cb 100644 --- a/querier.go +++ b/querier.go @@ -6,7 +6,6 @@ import ( "sort" "strings" - "github.com/bradfitz/slice" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" ) @@ -147,19 +146,24 @@ func (s *Shard) Querier(mint, maxt int64) Querier { } for _, b := range blocks { - sq.blocks = append(sq.blocks, &blockQuerier{ + q := &blockQuerier{ mint: mint, maxt: maxt, index: b.index(), series: b.series(), - }) + } + sq.blocks = append(sq.blocks, q) + + // TODO(fabxc): find nicer solution. + if hb, ok := b.(*HeadBlock); ok { + q.postingsMapper = hb.remapPostings + } } return sq } func (q *shardQuerier) LabelValues(n string) ([]string, error) { - // TODO(fabxc): return returned merged result. res, err := q.blocks[0].LabelValues(n) if err != nil { return nil, err @@ -211,6 +215,8 @@ type blockQuerier struct { index IndexReader series SeriesReader + postingsMapper func(Postings) Postings + mint, maxt int64 } @@ -238,36 +244,20 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { its = append(its, q.selectSingle(m)) } - set := &blockSeriesSet{ + p := Intersect(its...) + + if q.postingsMapper != nil { + p = q.postingsMapper(p) + } + + return &blockSeriesSet{ index: q.index, chunks: q.series, - it: Intersect(its...), + it: p, absent: absent, mint: q.mint, maxt: q.maxt, } - // TODO(fabxc): the head block indexes new series in order they come in. - // SeriesSets are expected to emit labels in order of their label sets. - // We expand the set and sort it for now. This is not a scalable approach - // however, and the head block should re-sort itself eventually. - // This comes with an initial cost as long as new series come in but should - // flatten out quickly after a warump. - // When cutting new head blocks, the index would ideally be transferred to - // the new head. - var all []Series - for set.Next() { - all = append(all, set.At()) - } - if set.Err() != nil { - return errSeriesSet{err: set.Err()} - } - slice.Sort(all, func(i, j int) bool { - return labels.Compare(all[i].Labels(), all[j].Labels()) < 0 - }) - - // TODO(fabxc): additionally bad because this static set uses function pointers - // in a mock series set. - return newListSeriesSet(all) } func (q *blockQuerier) selectSingle(m labels.Matcher) Postings {