mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Write plain postings list index
This commit is contained in:
parent
4eba874b04
commit
3a528c3078
|
@ -134,7 +134,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
||||||
|
|
||||||
measureTime("ingestScrapes", func() {
|
measureTime("ingestScrapes", func() {
|
||||||
b.startProfiling()
|
b.startProfiling()
|
||||||
if err := b.ingestScrapes(metrics, 1000); err != nil {
|
if err := b.ingestScrapes(metrics, 3000); err != nil {
|
||||||
exitWithError(err)
|
exitWithError(err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
13
db.go
13
db.go
|
@ -282,8 +282,8 @@ func (s *SeriesShard) persist() error {
|
||||||
defer sw.Close()
|
defer sw.Close()
|
||||||
defer iw.Close()
|
defer iw.Close()
|
||||||
|
|
||||||
for _, cd := range head.index.forward {
|
for ref, cd := range head.index.forward {
|
||||||
if err := sw.WriteSeries(cd.lset, []*chunkDesc{cd}); err != nil {
|
if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -297,8 +297,15 @@ func (s *SeriesShard) persist() error {
|
||||||
s = append(s, x)
|
s = append(s, x)
|
||||||
}
|
}
|
||||||
|
|
||||||
iw.WriteLabelIndex([]string{n}, s)
|
if err := iw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for t := range head.index.postings.m {
|
||||||
|
if err := iw.WritePostings(t.name, t.value, head.index.postings.get(t)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sz := fmt.Sprintf("%fMiB", float64(sw.Size()+iw.Size())/1024/1024)
|
sz := fmt.Sprintf("%fMiB", float64(sw.Size()+iw.Size())/1024/1024)
|
||||||
|
|
28
index.go
28
index.go
|
@ -24,7 +24,7 @@ func newMemIndex() *memIndex {
|
||||||
lastID: 0,
|
lastID: 0,
|
||||||
forward: make(map[uint32]*chunkDesc),
|
forward: make(map[uint32]*chunkDesc),
|
||||||
values: make(map[string]stringset),
|
values: make(map[string]stringset),
|
||||||
postings: &memPostings{m: make(map[string][]uint32)},
|
postings: &memPostings{m: make(map[term][]uint32)},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,22 +32,20 @@ func (ix *memIndex) numSeries() int {
|
||||||
return len(ix.forward)
|
return len(ix.forward)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ix *memIndex) Postings(s string) Iterator {
|
func (ix *memIndex) Postings(t term) Iterator {
|
||||||
return ix.postings.get(s)
|
return ix.postings.get(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
type term struct {
|
||||||
|
name, value string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ix *memIndex) add(chkd *chunkDesc) {
|
func (ix *memIndex) add(chkd *chunkDesc) {
|
||||||
// Add each label pair as a term to the inverted index.
|
// Add each label pair as a term to the inverted index.
|
||||||
terms := make([]string, 0, len(chkd.lset))
|
terms := make([]term, 0, len(chkd.lset))
|
||||||
b := make([]byte, 0, 64)
|
|
||||||
|
|
||||||
for _, l := range chkd.lset {
|
for _, l := range chkd.lset {
|
||||||
b = append(b, l.Name...)
|
terms = append(terms, term{name: l.Name, value: l.Value})
|
||||||
b = append(b, sep)
|
|
||||||
b = append(b, l.Value...)
|
|
||||||
|
|
||||||
terms = append(terms, string(b))
|
|
||||||
b = b[:0]
|
|
||||||
|
|
||||||
// Add to label name to values index.
|
// Add to label name to values index.
|
||||||
valset, ok := ix.values[l.Name]
|
valset, ok := ix.values[l.Name]
|
||||||
|
@ -67,17 +65,17 @@ func (ix *memIndex) add(chkd *chunkDesc) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type memPostings struct {
|
type memPostings struct {
|
||||||
m map[string][]uint32
|
m map[term][]uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// Postings returns an iterator over the postings list for s.
|
// Postings returns an iterator over the postings list for s.
|
||||||
func (p *memPostings) get(s string) Iterator {
|
func (p *memPostings) get(t term) Iterator {
|
||||||
return &listIterator{list: p.m[s], idx: -1}
|
return &listIterator{list: p.m[t], idx: -1}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add adds a document to the index. The caller has to ensure that no
|
// add adds a document to the index. The caller has to ensure that no
|
||||||
// term argument appears twice.
|
// term argument appears twice.
|
||||||
func (p *memPostings) add(id uint32, terms ...string) {
|
func (p *memPostings) add(id uint32, terms ...term) {
|
||||||
for _, t := range terms {
|
for _, t := range terms {
|
||||||
p.m[t] = append(p.m[t], id)
|
p.m[t] = append(p.m[t], id)
|
||||||
}
|
}
|
||||||
|
|
86
writer.go
86
writer.go
|
@ -21,7 +21,9 @@ const (
|
||||||
// SeriesWriter serializes a time block of chunked series data.
|
// SeriesWriter serializes a time block of chunked series data.
|
||||||
type SeriesWriter interface {
|
type SeriesWriter interface {
|
||||||
// WriteSeries writes the time series data chunks for a single series.
|
// WriteSeries writes the time series data chunks for a single series.
|
||||||
WriteSeries(Labels, []*chunkDesc) error
|
// The reference is used to resolve the correct series in the written index.
|
||||||
|
// It only has to be valid for the duration of the write.
|
||||||
|
WriteSeries(ref uint32, l Labels, cds []*chunkDesc) error
|
||||||
|
|
||||||
// Size returns the size of the data written so far.
|
// Size returns the size of the data written so far.
|
||||||
Size() int64
|
Size() int64
|
||||||
|
@ -64,7 +66,7 @@ func (w *seriesWriter) writeMeta() error {
|
||||||
return w.write(w.w, metab)
|
return w.write(w.w, metab)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *seriesWriter) WriteSeries(lset Labels, chks []*chunkDesc) error {
|
func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) error {
|
||||||
// Initialize with meta data.
|
// Initialize with meta data.
|
||||||
if w.n == 0 {
|
if w.n == 0 {
|
||||||
if err := w.writeMeta(); err != nil {
|
if err := w.writeMeta(); err != nil {
|
||||||
|
@ -109,7 +111,7 @@ func (w *seriesWriter) WriteSeries(lset Labels, chks []*chunkDesc) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.index != nil {
|
if w.index != nil {
|
||||||
w.index.AddOffsets(lset, offsets...)
|
w.index.AddSeries(ref, lset, offsets...)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -142,9 +144,11 @@ type BlockStats struct {
|
||||||
// IndexWriter serialized the index for a block of series data.
|
// IndexWriter serialized the index for a block of series data.
|
||||||
// The methods must generally be called in order they are specified.
|
// The methods must generally be called in order they are specified.
|
||||||
type IndexWriter interface {
|
type IndexWriter interface {
|
||||||
// AddOffsets populates the index writer with offsets of chunks
|
// AddSeries populates the index writer witha series and its offsets
|
||||||
// for a series that the index can reference.
|
// of chunks that the index can reference.
|
||||||
AddOffsets(Labels, ...ChunkOffset)
|
// The reference number is used to resolve a series against the postings
|
||||||
|
// list iterator. It only has to be available during the write processing.
|
||||||
|
AddSeries(ref uint32, l Labels, o ...ChunkOffset)
|
||||||
|
|
||||||
// WriteStats writes final stats for the indexed block.
|
// WriteStats writes final stats for the indexed block.
|
||||||
WriteStats(*BlockStats) error
|
WriteStats(*BlockStats) error
|
||||||
|
@ -164,18 +168,23 @@ type IndexWriter interface {
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type indexWriterSeries struct {
|
||||||
|
labels Labels
|
||||||
|
chunks []ChunkOffset // series file offset of chunks
|
||||||
|
offset uint32 // index file offset of series reference
|
||||||
|
}
|
||||||
|
|
||||||
// indexWriter implements the IndexWriter interface for the standard
|
// indexWriter implements the IndexWriter interface for the standard
|
||||||
// serialization format.
|
// serialization format.
|
||||||
type indexWriter struct {
|
type indexWriter struct {
|
||||||
w io.Writer
|
w io.Writer
|
||||||
n int64
|
n int64
|
||||||
|
|
||||||
series []Labels // series in data section
|
series map[uint32]*indexWriterSeries
|
||||||
offsets [][]ChunkOffset // chunk offsets per series
|
|
||||||
seriesOffsets []uint32 // offset of series references
|
|
||||||
|
|
||||||
symbols map[string]uint32 // symbol offsets
|
symbols map[string]uint32 // symbol offsets
|
||||||
labelIndexes []hashEntry // label index offsets
|
labelIndexes []hashEntry // label index offsets
|
||||||
|
postings []hashEntry // postings lists offsets
|
||||||
}
|
}
|
||||||
|
|
||||||
func newIndexWriter(w io.Writer) *indexWriter {
|
func newIndexWriter(w io.Writer) *indexWriter {
|
||||||
|
@ -183,6 +192,7 @@ func newIndexWriter(w io.Writer) *indexWriter {
|
||||||
w: w,
|
w: w,
|
||||||
n: 0,
|
n: 0,
|
||||||
symbols: make(map[string]uint32, 4096),
|
symbols: make(map[string]uint32, 4096),
|
||||||
|
series: make(map[uint32]*indexWriterSeries, 4096),
|
||||||
labelIndexes: make([]hashEntry, 10),
|
labelIndexes: make([]hashEntry, 10),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -218,15 +228,17 @@ func (w *indexWriter) writeMeta() error {
|
||||||
return w.write(w.w, metab)
|
return w.write(w.w, metab)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) AddOffsets(lset Labels, offsets ...ChunkOffset) {
|
func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) {
|
||||||
w.series = append(w.series, lset)
|
|
||||||
w.offsets = append(w.offsets, offsets)
|
|
||||||
|
|
||||||
// Populate the symbol table from all label sets we have to reference.
|
// Populate the symbol table from all label sets we have to reference.
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
w.symbols[l.Name] = 0
|
w.symbols[l.Name] = 0
|
||||||
w.symbols[l.Value] = 0
|
w.symbols[l.Value] = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w.series[ref] = &indexWriterSeries{
|
||||||
|
labels: lset,
|
||||||
|
chunks: offsets,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) WriteStats(*BlockStats) error {
|
func (w *indexWriter) WriteStats(*BlockStats) error {
|
||||||
|
@ -274,11 +286,13 @@ func (w *indexWriter) writeSeries() error {
|
||||||
b := make([]byte, 0, 4096)
|
b := make([]byte, 0, 4096)
|
||||||
buf := [binary.MaxVarintLen32]byte{}
|
buf := [binary.MaxVarintLen32]byte{}
|
||||||
|
|
||||||
for _, lset := range w.series {
|
for _, s := range w.series {
|
||||||
for _, l := range lset {
|
s.offset = uint32(w.n) + uint32(len(b))
|
||||||
n := binary.PutUvarint(buf[:], uint64(len(lset)))
|
|
||||||
b = append(b, buf[:n]...)
|
|
||||||
|
|
||||||
|
n := binary.PutUvarint(buf[:], uint64(len(s.labels)))
|
||||||
|
b = append(b, buf[:n]...)
|
||||||
|
|
||||||
|
for _, l := range s.labels {
|
||||||
n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Name]))
|
n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Name]))
|
||||||
b = append(b, buf[:n]...)
|
b = append(b, buf[:n]...)
|
||||||
n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Value]))
|
n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Value]))
|
||||||
|
@ -320,7 +334,23 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) WritePostings(name, value string, it Iterator) error {
|
func (w *indexWriter) WritePostings(name, value string, it Iterator) error {
|
||||||
return nil
|
key := name + string(sep) + value
|
||||||
|
|
||||||
|
w.postings = append(w.postings, hashEntry{
|
||||||
|
name: key,
|
||||||
|
offset: uint32(w.n),
|
||||||
|
})
|
||||||
|
|
||||||
|
b := make([]byte, 0, 4096)
|
||||||
|
|
||||||
|
for it.Next() {
|
||||||
|
v := w.series[it.Value()].offset
|
||||||
|
b = append(b, ((*[4]byte)(unsafe.Pointer(&v)))[:]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error {
|
||||||
|
return w.write(wr, b)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) Size() int64 {
|
func (w *indexWriter) Size() int64 {
|
||||||
|
@ -337,7 +367,7 @@ const hashEntrySize = uint32(unsafe.Sizeof(hashEntry{}))
|
||||||
func (w *indexWriter) finalize() error {
|
func (w *indexWriter) finalize() error {
|
||||||
l := 1 + uint32(len(w.labelIndexes))*hashEntrySize
|
l := 1 + uint32(len(w.labelIndexes))*hashEntrySize
|
||||||
|
|
||||||
return w.section(l, flagStd, func(wr io.Writer) error {
|
err := w.section(l, flagStd, func(wr io.Writer) error {
|
||||||
for _, e := range w.labelIndexes {
|
for _, e := range w.labelIndexes {
|
||||||
b := ((*[hashEntrySize]byte)(unsafe.Pointer(&e)))[:]
|
b := ((*[hashEntrySize]byte)(unsafe.Pointer(&e)))[:]
|
||||||
|
|
||||||
|
@ -347,6 +377,24 @@ func (w *indexWriter) finalize() error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = w.section(l, flagStd, func(wr io.Writer) error {
|
||||||
|
for _, e := range w.postings {
|
||||||
|
b := ((*[hashEntrySize]byte)(unsafe.Pointer(&e)))[:]
|
||||||
|
|
||||||
|
if err := w.write(w.w, b); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
// TODO(fabxc): write hashmap offsets.
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) Close() error {
|
func (w *indexWriter) Close() error {
|
||||||
|
|
Loading…
Reference in a new issue