mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-26 13:11:11 -08:00
Simplify tombstone and WAL Delete formats.
Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
parent
f29fb62fba
commit
bacb143b7e
|
@ -10,46 +10,21 @@ The stones section is 0 padded to a multiple of 4 for fast scans.
|
|||
│ magic(0x130BA30) <4b> │ version(1) <1 byte> │
|
||||
├────────────────────────────┴─────────────────────┤
|
||||
│ ┌──────────────────────────────────────────────┐ │
|
||||
│ │ Ranges 1 │ │
|
||||
│ │ Tombstone 1 │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ ... │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Ranges N │ │
|
||||
│ │ Tombstone N │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Stones │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Ref(stones start)<8b> │ │
|
||||
│ │ CRC<4b> │ │
|
||||
│ └──────────────────────────────────────────────┘ │
|
||||
└──────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
# Ranges
|
||||
# Tombstone
|
||||
|
||||
```
|
||||
┌──────────────────────────────────────────┐
|
||||
│ ┌──────────────────────────────────────┐ │
|
||||
│ │ #ranges <uvarint> │ │
|
||||
│ ├───────────────────┬──────────────────┤ │
|
||||
│ │ mint <varint64> │ maxt <varint64> │ │
|
||||
│ ├───────────────────┴──────────────────┤ │
|
||||
│ │ . . . │ │
|
||||
│ ├──────────────────────────────────────┤ │
|
||||
│ │ CRC32 <4b> │ │
|
||||
│ └──────────────────────────────────────┘ │
|
||||
└──────────────────────────────────────────┘
|
||||
┌─────────────┬───────────────┬──────────────┐
|
||||
│ref <varint> │ mint <varint> │ maxt <varint>│
|
||||
└─────────────┴───────────────┴──────────────┘
|
||||
```
|
||||
|
||||
# Stones
|
||||
```
|
||||
┌──────────────────────────────────────────┐
|
||||
│ ┌──────────────────────────────────────┐ │
|
||||
│ │ #stones <4b> │ │
|
||||
│ ├───────────────────┬──────────────────┤ │
|
||||
│ │ ref <4b> │ offset <8b> │ │
|
||||
│ ├───────────────────┴──────────────────┤ │
|
||||
│ │ . . . │ │
|
||||
│ └──────────────────────────────────────┘ │
|
||||
└──────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
The offset here is the offset to the relevant ranges.
|
||||
|
|
|
@ -72,9 +72,10 @@ type decbuf struct {
|
|||
e error
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
||||
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
|
||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||
|
||||
func (d *decbuf) uvarintStr() string {
|
||||
l := d.uvarint64()
|
||||
|
@ -142,6 +143,20 @@ func (d *decbuf) be32() uint32 {
|
|||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) byte() byte {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := d.b[0]
|
||||
d.b = d.b[1:]
|
||||
return x
|
||||
|
||||
}
|
||||
|
||||
func (d *decbuf) decbuf(l int) decbuf {
|
||||
if d.e != nil {
|
||||
return decbuf{e: d.e}
|
||||
|
|
144
tombstones.go
144
tombstones.go
|
@ -4,9 +4,12 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const tombstoneFilename = "tombstones"
|
||||
|
@ -27,84 +30,36 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
stoneOff := make(map[uint32]int64) // The map that holds the ref to offset vals.
|
||||
refs := []uint32{} // Sorted refs.
|
||||
|
||||
pos := int64(0)
|
||||
buf := encbuf{b: make([]byte, 2*binary.MaxVarintLen64)}
|
||||
buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)}
|
||||
buf.reset()
|
||||
// Write the meta.
|
||||
buf.putBE32(MagicTombstone)
|
||||
buf.putByte(tombstoneFormatV1)
|
||||
n, err := f.Write(buf.get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pos += int64(n)
|
||||
|
||||
for k, v := range tr {
|
||||
refs = append(refs, k)
|
||||
stoneOff[k] = pos
|
||||
|
||||
// Write the ranges.
|
||||
buf.reset()
|
||||
buf.putUvarint(len(v))
|
||||
n, err := f.Write(buf.get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pos += int64(n)
|
||||
|
||||
buf.reset()
|
||||
for _, r := range v {
|
||||
buf.putVarint64(r.mint)
|
||||
buf.putVarint64(r.maxt)
|
||||
}
|
||||
buf.putHash(hash)
|
||||
|
||||
n, err = f.Write(buf.get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pos += int64(n)
|
||||
}
|
||||
|
||||
// Write the offset table.
|
||||
// Pad first.
|
||||
if p := 4 - (int(pos) % 4); p != 0 {
|
||||
if _, err := f.Write(make([]byte, p)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pos += int64(p)
|
||||
}
|
||||
|
||||
buf.reset()
|
||||
buf.putBE32int(len(refs))
|
||||
if _, err := f.Write(buf.get()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, ref := range refs {
|
||||
buf.reset()
|
||||
buf.putBE32(ref)
|
||||
buf.putBE64int64(stoneOff[ref])
|
||||
_, err = f.Write(buf.get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write the offset to the offset table.
|
||||
buf.reset()
|
||||
buf.putBE64int64(pos)
|
||||
_, err = f.Write(buf.get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
mw := io.MultiWriter(f, hash)
|
||||
|
||||
for k, v := range tr {
|
||||
for _, itv := range v {
|
||||
buf.reset()
|
||||
buf.putUvarint32(k)
|
||||
buf.putVarint64(itv.mint)
|
||||
buf.putVarint64(itv.maxt)
|
||||
|
||||
_, err = mw.Write(buf.get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err = f.Write(hash.Sum(nil))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -129,52 +84,33 @@ func readTombstones(dir string) (tombstoneReader, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
d := &decbuf{b: b}
|
||||
d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum.
|
||||
if mg := d.be32(); mg != MagicTombstone {
|
||||
return nil, fmt.Errorf("invalid magic number %x", mg)
|
||||
}
|
||||
|
||||
offsetBytes := b[len(b)-8:]
|
||||
d = &decbuf{b: offsetBytes}
|
||||
off := d.be64int64()
|
||||
if err := d.err(); err != nil {
|
||||
return nil, err
|
||||
if flag := d.byte(); flag != tombstoneFormatV1 {
|
||||
return nil, fmt.Errorf("invalid tombstone format %x", flag)
|
||||
}
|
||||
|
||||
d = &decbuf{b: b[off:]}
|
||||
numStones := d.be32int()
|
||||
if err := d.err(); err != nil {
|
||||
return nil, err
|
||||
// Verify checksum
|
||||
hash := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
if _, err := hash.Write(d.get()); err != nil {
|
||||
return nil, errors.Wrap(err, "write to hash")
|
||||
}
|
||||
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
|
||||
return nil, errors.New("checksum did not match")
|
||||
}
|
||||
off += 4 // For the numStones which has been read.
|
||||
|
||||
stones := b[off : off+int64(numStones*12)]
|
||||
stonesMap := make(map[uint32]intervals)
|
||||
for len(stones) >= 12 {
|
||||
d := &decbuf{b: stones[:12]}
|
||||
ref := d.be32()
|
||||
off := d.be64int64()
|
||||
|
||||
d = &decbuf{b: b[off:]}
|
||||
numRanges := d.uvarint()
|
||||
if err := d.err(); err != nil {
|
||||
return nil, err
|
||||
for d.len() > 0 {
|
||||
k := d.uvarint32()
|
||||
mint := d.varint64()
|
||||
maxt := d.varint64()
|
||||
if d.err() != nil {
|
||||
return nil, d.err()
|
||||
}
|
||||
|
||||
dranges := make(intervals, 0, numRanges)
|
||||
for i := 0; i < int(numRanges); i++ {
|
||||
mint := d.varint64()
|
||||
maxt := d.varint64()
|
||||
if err := d.err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dranges = append(dranges, interval{mint, maxt})
|
||||
}
|
||||
|
||||
// TODO(gouthamve): Verify checksum.
|
||||
stones = stones[12:]
|
||||
stonesMap[ref] = dranges
|
||||
stonesMap[k] = stonesMap[k].add(interval{mint, maxt})
|
||||
}
|
||||
|
||||
return newTombstoneReader(stonesMap), nil
|
||||
|
|
|
@ -20,11 +20,11 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
|
|||
// Generate the tombstones.
|
||||
for i := 0; i < 100; i++ {
|
||||
ref += uint32(rand.Int31n(10)) + 1
|
||||
numRanges := rand.Intn(5)
|
||||
dranges := make(intervals, numRanges)
|
||||
numRanges := rand.Intn(5) + 1
|
||||
dranges := make(intervals, 0, numRanges)
|
||||
mint := rand.Int63n(time.Now().UnixNano())
|
||||
for j := 0; j < numRanges; j++ {
|
||||
dranges[j] = interval{mint, mint + rand.Int63n(1000)}
|
||||
dranges = dranges.add(interval{mint, mint + rand.Int63n(1000)})
|
||||
mint += rand.Int63n(1000) + 1
|
||||
}
|
||||
stones[ref] = dranges
|
||||
|
@ -36,7 +36,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
exptr := newTombstoneReader(stones)
|
||||
// Compare the two readers.
|
||||
require.Equal(t, restr, exptr)
|
||||
require.Equal(t, exptr, restr)
|
||||
}
|
||||
|
||||
func TestAddingNewIntervals(t *testing.T) {
|
||||
|
|
16
wal.go
16
wal.go
|
@ -488,12 +488,9 @@ func (w *SegmentWAL) encodeDeletes(tr tombstoneReader) error {
|
|||
eb := &encbuf{b: b}
|
||||
buf := getWALBuffer()
|
||||
for k, v := range tr {
|
||||
eb.reset()
|
||||
eb.putUvarint32(k)
|
||||
eb.putUvarint(len(v))
|
||||
buf = append(buf, eb.get()...)
|
||||
for _, itv := range v {
|
||||
eb.reset()
|
||||
eb.putUvarint32(k)
|
||||
eb.putVarint64(itv.mint)
|
||||
eb.putVarint64(itv.maxt)
|
||||
buf = append(buf, eb.get()...)
|
||||
|
@ -787,19 +784,12 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) error {
|
|||
|
||||
for db.len() > 0 {
|
||||
var s stone
|
||||
s.ref = uint32(db.uvarint())
|
||||
l := db.uvarint()
|
||||
s.ref = db.uvarint32()
|
||||
s.intervals = intervals{{db.varint64(), db.varint64()}}
|
||||
if db.err() != nil {
|
||||
return db.err()
|
||||
}
|
||||
|
||||
for i := 0; i < l; i++ {
|
||||
s.intervals = append(s.intervals, interval{db.varint64(), db.varint64()})
|
||||
if db.err() != nil {
|
||||
return db.err()
|
||||
}
|
||||
}
|
||||
|
||||
r.stones = append(r.stones, s)
|
||||
}
|
||||
|
||||
|
|
19
wal_test.go
19
wal_test.go
|
@ -149,7 +149,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
var (
|
||||
recordedSeries [][]labels.Labels
|
||||
recordedSamples [][]RefSample
|
||||
recordedDeletes [][]stone
|
||||
recordedDeletes []tombstoneReader
|
||||
)
|
||||
var totalSamples int
|
||||
|
||||
|
@ -167,7 +167,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
var (
|
||||
resultSeries [][]labels.Labels
|
||||
resultSamples [][]RefSample
|
||||
resultDeletes [][]stone
|
||||
resultDeletes []tombstoneReader
|
||||
)
|
||||
|
||||
serf := func(lsets []labels.Labels) error {
|
||||
|
@ -191,9 +191,11 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
|
||||
delf := func(stones []stone) error {
|
||||
if len(stones) > 0 {
|
||||
cstones := make([]stone, len(stones))
|
||||
copy(cstones, stones)
|
||||
resultDeletes = append(resultDeletes, cstones)
|
||||
dels := make(map[uint32]intervals)
|
||||
for _, s := range stones {
|
||||
dels[s.ref] = s.intervals
|
||||
}
|
||||
resultDeletes = append(resultDeletes, newTombstoneReader(dels))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -240,12 +242,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
}
|
||||
if len(stones) > 0 {
|
||||
tr := newTombstoneReader(stones)
|
||||
newdels := []stone{}
|
||||
for k, v := range tr {
|
||||
newdels = append(newdels, stone{k, v})
|
||||
}
|
||||
|
||||
recordedDeletes = append(recordedDeletes, newdels)
|
||||
recordedDeletes = append(recordedDeletes, tr)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue