mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 13:44:05 -08:00
Fix offset errors, fix persisted postings order
This commit is contained in:
parent
1b23d62e3f
commit
d9ca4b47f5
3
block.go
3
block.go
|
@ -38,10 +38,8 @@ type persistedBlock struct {
|
|||
}
|
||||
|
||||
func newPersistedBlock(path string) (*persistedBlock, error) {
|
||||
// The directory must be named after the base timestamp for the block.
|
||||
// TODO(fabxc): validate match of name and stats time, validate magic.
|
||||
|
||||
fmt.Println("new persisted block", path)
|
||||
// mmap files belonging to the block.
|
||||
chunksf, err := openMmapFile(chunksFileName(path))
|
||||
if err != nil {
|
||||
|
@ -65,7 +63,6 @@ func newPersistedBlock(path string) (*persistedBlock, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Println("initialized new persisted block with", stats)
|
||||
|
||||
pb := &persistedBlock{
|
||||
chunksf: chunksf,
|
||||
|
|
2
db.go
2
db.go
|
@ -42,7 +42,7 @@ type DB struct {
|
|||
|
||||
// TODO(fabxc): make configurable
|
||||
const (
|
||||
shardShift = 2
|
||||
shardShift = 0
|
||||
numShards = 1 << shardShift
|
||||
maxChunkSize = 1024
|
||||
)
|
||||
|
|
44
querier.go
44
querier.go
|
@ -183,6 +183,22 @@ func (q *shardQuerier) LabelValuesFor(string, Label) ([]string, error) {
|
|||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (q *shardQuerier) Select(ms ...Matcher) SeriesSet {
|
||||
// Sets from different blocks have no time overlap. The reference numbers
|
||||
// they emit point to series sorted in lexicographic order.
|
||||
// We can fully connect partial series by simply comparing with the previous
|
||||
// label set.
|
||||
if len(q.blocks) == 0 {
|
||||
return nopSeriesSet{}
|
||||
}
|
||||
r := q.blocks[0].Select(ms...)
|
||||
|
||||
for _, s := range q.blocks[1:] {
|
||||
r = newShardSeriesSet(r, s.Select(ms...))
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (q *shardQuerier) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
@ -210,8 +226,6 @@ func (q *blockQuerier) Select(ms ...Matcher) SeriesSet {
|
|||
its = append(its, q.selectSingle(m))
|
||||
}
|
||||
|
||||
// TODO(fabxc): pass down time range so the series iterator
|
||||
// can be instantiated with it?
|
||||
return &blockSeriesSet{
|
||||
index: q.index,
|
||||
it: Intersect(its...),
|
||||
|
@ -256,6 +270,13 @@ func (q *blockQuerier) selectSingle(m Matcher) Postings {
|
|||
return Intersect(rit...)
|
||||
}
|
||||
|
||||
func expandPostings(p Postings) (res []uint32, err error) {
|
||||
for p.Next() {
|
||||
res = append(res, p.Value())
|
||||
}
|
||||
return res, p.Err()
|
||||
}
|
||||
|
||||
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
|
||||
tpls, err := q.index.LabelValues(name)
|
||||
if err != nil {
|
||||
|
@ -319,23 +340,6 @@ func (s *mergedSeriesSet) Next() bool {
|
|||
return s.Next()
|
||||
}
|
||||
|
||||
func (q *shardQuerier) Select(ms ...Matcher) SeriesSet {
|
||||
// Sets from different blocks have no time overlap. The reference numbers
|
||||
// they emit point to series sorted in lexicographic order.
|
||||
// We can fully connect partial series by simply comparing with the previous
|
||||
// label set.
|
||||
if len(q.blocks) == 0 {
|
||||
return nopSeriesSet{}
|
||||
}
|
||||
r := q.blocks[0].Select(ms...)
|
||||
|
||||
for _, s := range q.blocks[1:] {
|
||||
r = &shardSeriesSet{a: r, b: s.Select(ms...)}
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
type shardSeriesSet struct {
|
||||
a, b SeriesSet
|
||||
|
||||
|
@ -408,7 +412,7 @@ func (s *shardSeriesSet) advanceB() {
|
|||
}
|
||||
|
||||
func (s *shardSeriesSet) Next() bool {
|
||||
if s.as == nil && s.bs == nil {
|
||||
if s.as == nil && s.bs == nil || s.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,9 @@ func newSeriesReader(b []byte) (*seriesReader, error) {
|
|||
}
|
||||
|
||||
func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) {
|
||||
if int(offset) > len(s.b) {
|
||||
return nil, errors.Errorf("offset %d beyond data size %d", offset, len(s.b))
|
||||
}
|
||||
b := s.b[offset:]
|
||||
|
||||
l, n := binary.Uvarint(b)
|
||||
|
@ -427,14 +430,14 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
|
|||
if len(t.b) < (i+t.l)*4 {
|
||||
return nil, errInvalidSize
|
||||
}
|
||||
res := make([]string, t.l)
|
||||
res := make([]string, 0, t.l)
|
||||
|
||||
for k := 0; k < t.l; k++ {
|
||||
offset := binary.BigEndian.Uint32(t.b[i*4:])
|
||||
offset := binary.BigEndian.Uint32(t.b[(i+k)*4:])
|
||||
|
||||
b, err := t.lookup(offset)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("lookup: %s", err)
|
||||
return nil, errors.Wrap(err, "symbol lookup")
|
||||
}
|
||||
res = append(res, string(b))
|
||||
}
|
||||
|
|
56
writer.go
56
writer.go
|
@ -8,6 +8,9 @@ import (
|
|||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/bradfitz/slice"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -216,13 +219,16 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er
|
|||
binary.BigEndian.PutUint32(b[1:], l)
|
||||
|
||||
if err := w.write(wr, b[:]); err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "writing header")
|
||||
}
|
||||
|
||||
if err := f(wr); err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "contents write func")
|
||||
}
|
||||
return w.write(w.w, h.Sum(nil))
|
||||
if err := w.write(w.w, h.Sum(nil)); err != nil {
|
||||
return errors.Wrap(err, "writing checksum")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *indexWriter) writeMeta() error {
|
||||
|
@ -288,11 +294,15 @@ func (w *indexWriter) writeSymbols() error {
|
|||
}
|
||||
sort.Strings(symbols)
|
||||
|
||||
// The start of the section plus a 5 byte section header are our base.
|
||||
// TODO(fabxc): switch to relative offsets and hold sections in a TOC.
|
||||
base := uint32(w.n) + 5
|
||||
|
||||
buf := [binary.MaxVarintLen32]byte{}
|
||||
b := append(make([]byte, 0, 4096), flagStd)
|
||||
|
||||
for _, s := range symbols {
|
||||
w.symbols[s] = uint32(w.n) + uint32(len(b))
|
||||
w.symbols[s] = base + uint32(len(b))
|
||||
|
||||
n := binary.PutUvarint(buf[:], uint64(len(s)))
|
||||
b = append(b, buf[:n]...)
|
||||
|
@ -307,12 +317,26 @@ func (w *indexWriter) writeSymbols() error {
|
|||
}
|
||||
|
||||
func (w *indexWriter) writeSeries() error {
|
||||
b := make([]byte, 0, 4096)
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
// Series must be stored sorted along their labels.
|
||||
series := make([]*indexWriterSeries, 0, len(w.series))
|
||||
|
||||
for _, s := range w.series {
|
||||
series = append(series, s)
|
||||
}
|
||||
slice.Sort(series, func(i, j int) bool {
|
||||
return compareLabels(series[i].labels, series[j].labels) < 0
|
||||
})
|
||||
|
||||
// Current end of file plus 5 bytes for section header.
|
||||
// TODO(fabxc): switch to relative offsets.
|
||||
base := uint32(w.n) + 5
|
||||
|
||||
b := make([]byte, 0, 1<<20) // 1MiB
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
|
||||
for _, s := range series {
|
||||
// Write label set symbol references.
|
||||
s.offset = uint32(w.n) + uint32(len(b))
|
||||
s.offset = base + uint32(len(b))
|
||||
|
||||
n := binary.PutUvarint(buf, uint64(len(s.labels)))
|
||||
b = append(b, buf[:n]...)
|
||||
|
@ -391,10 +415,22 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
|||
b := make([]byte, 0, 4096)
|
||||
buf := [4]byte{}
|
||||
|
||||
for it.Next() {
|
||||
v := w.series[it.Value()].offset
|
||||
binary.BigEndian.PutUint32(buf[:], v)
|
||||
// Order of the references in the postings list does not imply order
|
||||
// of the series references within the persisted block they are mapped to.
|
||||
// We have to sort the new references again.
|
||||
var refs []uint32
|
||||
|
||||
for it.Next() {
|
||||
refs = append(refs, w.series[it.Value()].offset)
|
||||
}
|
||||
if err := it.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slice.Sort(refs, func(i, j int) bool { return refs[i] < refs[j] })
|
||||
|
||||
for _, r := range refs {
|
||||
binary.BigEndian.PutUint32(buf[:], r)
|
||||
b = append(b, buf[:]...)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue