mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Handle V1 indexes, some of which have unsorted posting offset tables. (#6564)
Fixes #6535 Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
ada0945b8f
commit
13e88e5a49
|
@ -267,16 +267,20 @@ func TestBlockSize(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReadIndexFormatV1(t *testing.T) {
|
func TestReadIndexFormatV1(t *testing.T) {
|
||||||
/* The block here was produced at commit
|
/* The block here was produced at the commit
|
||||||
07ef80820ef1250db82f9544f3fcf7f0f63ccee0 with:
|
706602daed1487f7849990678b4ece4599745905 used in 2.0.0 with:
|
||||||
db, _ := Open("v1db", nil, nil, nil)
|
db, _ := Open("v1db", nil, nil, nil)
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
app.Add(labels.FromStrings("foo", "bar"), 1, 2)
|
app.Add(labels.FromStrings("foo", "bar"), 1, 2)
|
||||||
app.Add(labels.FromStrings("foo", "baz"), 3, 4)
|
app.Add(labels.FromStrings("foo", "baz"), 3, 4)
|
||||||
app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block.
|
app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block.
|
||||||
app.Commit()
|
// Make sure we've enough values for the lack of sorting of postings offsets to show up.
|
||||||
db.compact()
|
for i := 0; i < 100; i++ {
|
||||||
db.Close()
|
app.Add(labels.FromStrings("bar", strconv.FormatInt(int64(i), 10)), 0, 0)
|
||||||
|
}
|
||||||
|
app.Commit()
|
||||||
|
db.compact()
|
||||||
|
db.Close()
|
||||||
*/
|
*/
|
||||||
|
|
||||||
blockDir := filepath.Join("testdata", "index_format_v1")
|
blockDir := filepath.Join("testdata", "index_format_v1")
|
||||||
|
@ -290,7 +294,7 @@ func TestReadIndexFormatV1(t *testing.T) {
|
||||||
|
|
||||||
q, err = NewBlockQuerier(block, 0, 1000)
|
q, err = NewBlockQuerier(block, 0, 1000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Equals(t, query(t, q, labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^.$")),
|
testutil.Equals(t, query(t, q, labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^.?$")),
|
||||||
map[string][]tsdbutil.Sample{
|
map[string][]tsdbutil.Sample{
|
||||||
`{foo="bar"}`: []tsdbutil.Sample{sample{t: 1, v: 2}},
|
`{foo="bar"}`: []tsdbutil.Sample{sample{t: 1, v: 2}},
|
||||||
`{foo="baz"}`: []tsdbutil.Sample{sample{t: 3, v: 4}},
|
`{foo="baz"}`: []tsdbutil.Sample{sample{t: 3, v: 4}},
|
||||||
|
|
|
@ -1024,6 +1024,8 @@ type Reader struct {
|
||||||
// Map of LabelName to a list of some LabelValues's position in the offset table.
|
// Map of LabelName to a list of some LabelValues's position in the offset table.
|
||||||
// The first and last values for each name are always present.
|
// The first and last values for each name are always present.
|
||||||
postings map[string][]postingOffset
|
postings map[string][]postingOffset
|
||||||
|
// For the v1 format, labelname -> labelvalue -> offset.
|
||||||
|
postingsV1 map[string]map[string]uint64
|
||||||
|
|
||||||
symbols *Symbols
|
symbols *Symbols
|
||||||
nameSymbols map[uint32]string // Cache of the label name symbol lookups,
|
nameSymbols map[uint32]string // Cache of the label name symbol lookups,
|
||||||
|
@ -1113,45 +1115,64 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
||||||
return nil, errors.Wrap(err, "read symbols")
|
return nil, errors.Wrap(err, "read symbols")
|
||||||
}
|
}
|
||||||
|
|
||||||
var lastKey []string
|
if r.version == FormatV1 {
|
||||||
lastOff := 0
|
// Earlier V1 formats don't have a sorted postings offset table, so
|
||||||
valueCount := 0
|
// load the whole offset table into memory.
|
||||||
// For the postings offset table we keep every label name but only every nth
|
r.postingsV1 = map[string]map[string]uint64{}
|
||||||
// label value (plus the first and last one), to save memory.
|
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error {
|
||||||
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
|
if len(key) != 2 {
|
||||||
if len(key) != 2 {
|
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
||||||
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
|
||||||
}
|
|
||||||
if _, ok := r.postings[key[0]]; !ok {
|
|
||||||
// Next label name.
|
|
||||||
r.postings[key[0]] = []postingOffset{}
|
|
||||||
if lastKey != nil {
|
|
||||||
// Always include last value for each label name.
|
|
||||||
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
|
|
||||||
}
|
}
|
||||||
lastKey = nil
|
if _, ok := r.postingsV1[key[0]]; !ok {
|
||||||
valueCount = 0
|
r.postingsV1[key[0]] = map[string]uint64{}
|
||||||
|
r.postings[key[0]] = nil // Used to get a list of labelnames in places.
|
||||||
|
}
|
||||||
|
r.postingsV1[key[0]][key[1]] = off
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read postings table")
|
||||||
}
|
}
|
||||||
if valueCount%32 == 0 {
|
} else {
|
||||||
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off})
|
var lastKey []string
|
||||||
lastKey = nil
|
lastOff := 0
|
||||||
} else {
|
valueCount := 0
|
||||||
lastKey = key
|
// For the postings offset table we keep every label name but only every nth
|
||||||
lastOff = off
|
// label value (plus the first and last one), to save memory.
|
||||||
|
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
|
||||||
|
if len(key) != 2 {
|
||||||
|
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
||||||
|
}
|
||||||
|
if _, ok := r.postings[key[0]]; !ok {
|
||||||
|
// Next label name.
|
||||||
|
r.postings[key[0]] = []postingOffset{}
|
||||||
|
if lastKey != nil {
|
||||||
|
// Always include last value for each label name.
|
||||||
|
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
|
||||||
|
}
|
||||||
|
lastKey = nil
|
||||||
|
valueCount = 0
|
||||||
|
}
|
||||||
|
if valueCount%32 == 0 {
|
||||||
|
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off})
|
||||||
|
lastKey = nil
|
||||||
|
} else {
|
||||||
|
lastKey = key
|
||||||
|
lastOff = off
|
||||||
|
}
|
||||||
|
valueCount++
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read postings table")
|
||||||
|
}
|
||||||
|
if lastKey != nil {
|
||||||
|
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
|
||||||
|
}
|
||||||
|
// Trim any extra space in the slices.
|
||||||
|
for k, v := range r.postings {
|
||||||
|
l := make([]postingOffset, len(v))
|
||||||
|
copy(l, v)
|
||||||
|
r.postings[k] = l
|
||||||
}
|
}
|
||||||
valueCount++
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
return nil, errors.Wrap(err, "read postings table")
|
|
||||||
}
|
|
||||||
if lastKey != nil {
|
|
||||||
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
|
|
||||||
}
|
|
||||||
// Trim any extra space in the slices.
|
|
||||||
for k, v := range r.postings {
|
|
||||||
l := make([]postingOffset, len(v))
|
|
||||||
copy(l, v)
|
|
||||||
r.postings[k] = l
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r.nameSymbols = make(map[uint32]string, len(r.postings))
|
r.nameSymbols = make(map[uint32]string, len(r.postings))
|
||||||
|
@ -1408,6 +1429,19 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
if len(names) != 1 {
|
if len(names) != 1 {
|
||||||
return nil, errors.Errorf("only one label name supported")
|
return nil, errors.Errorf("only one label name supported")
|
||||||
}
|
}
|
||||||
|
if r.version == FormatV1 {
|
||||||
|
e, ok := r.postingsV1[names[0]]
|
||||||
|
if !ok {
|
||||||
|
return emptyStringTuples{}, nil
|
||||||
|
}
|
||||||
|
values := make([]string, 0, len(e))
|
||||||
|
for k := range e {
|
||||||
|
values = append(values, k)
|
||||||
|
}
|
||||||
|
sort.Strings(values)
|
||||||
|
return NewStringTuples(values, 1)
|
||||||
|
|
||||||
|
}
|
||||||
e, ok := r.postings[names[0]]
|
e, ok := r.postings[names[0]]
|
||||||
if !ok {
|
if !ok {
|
||||||
return emptyStringTuples{}, nil
|
return emptyStringTuples{}, nil
|
||||||
|
@ -1467,6 +1501,28 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) Postings(name string, values ...string) (Postings, error) {
|
func (r *Reader) Postings(name string, values ...string) (Postings, error) {
|
||||||
|
if r.version == FormatV1 {
|
||||||
|
e, ok := r.postingsV1[name]
|
||||||
|
if !ok {
|
||||||
|
return EmptyPostings(), nil
|
||||||
|
}
|
||||||
|
res := make([]Postings, 0, len(values))
|
||||||
|
for _, v := range values {
|
||||||
|
postingsOff, ok := e[v]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Read from the postings table.
|
||||||
|
d := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
||||||
|
_, p, err := r.dec.Postings(d.Get())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "decode postings")
|
||||||
|
}
|
||||||
|
res = append(res, p)
|
||||||
|
}
|
||||||
|
return Merge(res...), nil
|
||||||
|
}
|
||||||
|
|
||||||
e, ok := r.postings[name]
|
e, ok := r.postings[name]
|
||||||
if !ok {
|
if !ok {
|
||||||
return EmptyPostings(), nil
|
return EmptyPostings(), nil
|
||||||
|
|
BIN
tsdb/testdata/index_format_v1/chunks/000001
vendored
BIN
tsdb/testdata/index_format_v1/chunks/000001
vendored
Binary file not shown.
BIN
tsdb/testdata/index_format_v1/index
vendored
BIN
tsdb/testdata/index_format_v1/index
vendored
Binary file not shown.
10
tsdb/testdata/index_format_v1/meta.json
vendored
10
tsdb/testdata/index_format_v1/meta.json
vendored
|
@ -1,17 +1,17 @@
|
||||||
{
|
{
|
||||||
"version": 1,
|
"version": 1,
|
||||||
"ulid": "01DVZX4CHY2EGZ6JQVS80AB9CF",
|
"ulid": "01DXXFZDYD1MQW6079WK0K6EDQ",
|
||||||
"minTime": 0,
|
"minTime": 0,
|
||||||
"maxTime": 7200000,
|
"maxTime": 7200000,
|
||||||
"stats": {
|
"stats": {
|
||||||
"numSamples": 2,
|
"numSamples": 102,
|
||||||
"numSeries": 2,
|
"numSeries": 102,
|
||||||
"numChunks": 2
|
"numChunks": 102
|
||||||
},
|
},
|
||||||
"compaction": {
|
"compaction": {
|
||||||
"level": 1,
|
"level": 1,
|
||||||
"sources": [
|
"sources": [
|
||||||
"01DVZX4CHY2EGZ6JQVS80AB9CF"
|
"01DXXFZDYD1MQW6079WK0K6EDQ"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue