mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Avoid WriteAt for Postings.
Flushing buffers and doing a pwrite per posting is expensive time wise, so go back to the old way for those. This doubles our memory usage, but that's still small as it's only ~8 bytes per time series in the index. This is 30-40% faster. benchmark old ns/op new ns/op delta BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 1101429174 724362123 -34.23% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 1074466374 720977022 -32.90% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 1166510282 677702636 -41.90% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 1075013071 696855960 -35.18% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 1231673790 829328610 -32.67% benchmark old allocs new allocs delta BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 832571 731435 -12.15% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 894875 793823 -11.29% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 912931 811804 -11.08% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 933511 832366 -10.83% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 1022791 921554 -9.90% benchmark old bytes new bytes delta BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 129063496 126472364 -2.01% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 124154888 122300764 -1.49% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 128790648 126394856 -1.86% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 120570696 118946548 -1.35% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 138754288 136317432 -1.76% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
971dafdfbe
commit
cf76daed2f
|
@ -277,7 +277,6 @@ func (d *Decbuf) ConsumePadding() {
|
||||||
}
|
}
|
||||||
if len(d.B) < 1 {
|
if len(d.B) < 1 {
|
||||||
d.E = ErrInvalidSize
|
d.E = ErrInvalidSize
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -610,10 +610,10 @@ func (w *Writer) writePostings() error {
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
// Write out the special all index.
|
// Write out the special all posting.
|
||||||
offsets := []uint32{}
|
offsets := []uint32{}
|
||||||
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
|
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
|
||||||
d.B = d.B[w.toc.Series:] // dec.Skip not merged yet
|
d.Skip(int(w.toc.Series))
|
||||||
for d.Len() > 0 {
|
for d.Len() > 0 {
|
||||||
d.ConsumePadding()
|
d.ConsumePadding()
|
||||||
startPos := w.toc.LabelIndices - uint64(d.Len())
|
startPos := w.toc.LabelIndices - uint64(d.Len())
|
||||||
|
@ -622,8 +622,7 @@ func (w *Writer) writePostings() error {
|
||||||
}
|
}
|
||||||
offsets = append(offsets, uint32(startPos/16))
|
offsets = append(offsets, uint32(startPos/16))
|
||||||
// Skip to next series. The 4 is for the CRC32.
|
// Skip to next series. The 4 is for the CRC32.
|
||||||
skip := d.Uvarint() + 4
|
d.Skip(d.Uvarint() + 4)
|
||||||
d.B = d.B[skip:]
|
|
||||||
if err := d.Err(); err != nil {
|
if err := d.Err(); err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -643,6 +642,7 @@ func (w *Writer) writePostings() error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
batchNames = append(batchNames, names[0])
|
batchNames = append(batchNames, names[0])
|
||||||
|
c += w.labelNames[names[0]]
|
||||||
names = names[1:]
|
names = names[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -678,8 +678,7 @@ func (w *Writer) writePostings() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Skip to next series. The 4 is for the CRC32.
|
// Skip to next series. The 4 is for the CRC32.
|
||||||
skip := l - (startLen - d.Len()) + 4
|
d.Skip(l - (startLen - d.Len()) + 4)
|
||||||
d.B = d.B[skip:]
|
|
||||||
if err := d.Err(); err != nil {
|
if err := d.Err(); err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -717,42 +716,20 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
||||||
offset: w.pos,
|
offset: w.pos,
|
||||||
})
|
})
|
||||||
|
|
||||||
startPos := w.pos
|
|
||||||
// Leave 4 bytes of space for the length, which will be calculated later.
|
|
||||||
if err := w.write([]byte("alen")); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
w.crc32.Reset()
|
|
||||||
|
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(len(offs))
|
w.buf1.PutBE32int(len(offs))
|
||||||
w.buf1.WriteToHash(w.crc32)
|
|
||||||
if err := w.write(w.buf1.Get()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, off := range offs {
|
for _, off := range offs {
|
||||||
if off > (1<<32)-1 {
|
if off > (1<<32)-1 {
|
||||||
return errors.Errorf("series offset %d exceeds 4 bytes", off)
|
return errors.Errorf("series offset %d exceeds 4 bytes", off)
|
||||||
}
|
}
|
||||||
w.buf1.Reset()
|
|
||||||
w.buf1.PutBE32(off)
|
w.buf1.PutBE32(off)
|
||||||
w.buf1.WriteToHash(w.crc32)
|
|
||||||
if err := w.write(w.buf1.Get()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write out the length.
|
w.buf2.Reset()
|
||||||
w.buf1.Reset()
|
w.buf2.PutBE32int(w.buf1.Len())
|
||||||
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
w.buf1.PutHash(w.crc32)
|
||||||
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
return w.write(w.buf2.Get(), w.buf1.Get())
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w.buf1.Reset()
|
|
||||||
w.buf1.PutHashSum(w.crc32)
|
|
||||||
return w.write(w.buf1.Get())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type uint32slice []uint32
|
type uint32slice []uint32
|
||||||
|
|
Loading…
Reference in a new issue