diff --git a/Documentation/format/tombstones.md b/Documentation/format/tombstones.md index e8da95aec..059d1ace5 100644 --- a/Documentation/format/tombstones.md +++ b/Documentation/format/tombstones.md @@ -10,46 +10,21 @@ The stones section is 0 padded to a multiple of 4 for fast scans. │ magic(0x130BA30) <4b> │ version(1) <1 byte> │ ├────────────────────────────┴─────────────────────┤ │ ┌──────────────────────────────────────────────┐ │ -│ │ Ranges 1 │ │ +│ │ Tombstone 1 │ │ │ ├──────────────────────────────────────────────┤ │ │ │ ... │ │ │ ├──────────────────────────────────────────────┤ │ -│ │ Ranges N │ │ +│ │ Tombstone N │ │ │ ├──────────────────────────────────────────────┤ │ -│ │ Stones │ │ -│ ├──────────────────────────────────────────────┤ │ -│ │ Ref(stones start)<8b> │ │ +│ │ CRC<4b> │ │ │ └──────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────┘ ``` -# Ranges +# Tombstone ``` -┌──────────────────────────────────────────┐ -│ ┌──────────────────────────────────────┐ │ -│ │ #ranges │ │ -│ ├───────────────────┬──────────────────┤ │ -│ │ mint │ maxt │ │ -│ ├───────────────────┴──────────────────┤ │ -│ │ . . . │ │ -│ ├──────────────────────────────────────┤ │ -│ │ CRC32 <4b> │ │ -│ └──────────────────────────────────────┘ │ -└──────────────────────────────────────────┘ +┌─────────────┬───────────────┬──────────────┐ +│ref │ mint │ maxt │ +└─────────────┴───────────────┴──────────────┘ ``` - -# Stones -``` -┌──────────────────────────────────────────┐ -│ ┌──────────────────────────────────────┐ │ -│ │ #stones <4b> │ │ -│ ├───────────────────┬──────────────────┤ │ -│ │ ref <4b> │ offset <8b> │ │ -│ ├───────────────────┴──────────────────┤ │ -│ │ . . . │ │ -│ └──────────────────────────────────────┘ │ -└──────────────────────────────────────────┘ -``` - -The offset here is the offset to the relevant ranges. diff --git a/encoding_helpers.go b/encoding_helpers.go index 50189e0bb..c1ea902a7 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -72,9 +72,10 @@ type decbuf struct { e error } -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } -func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } +func (d *decbuf) be64int64() int64 { return int64(d.be64()) } func (d *decbuf) uvarintStr() string { l := d.uvarint64() @@ -142,6 +143,20 @@ func (d *decbuf) be32() uint32 { return x } +func (d *decbuf) byte() byte { + if d.e != nil { + return 0 + } + if len(d.b) < 1 { + d.e = errInvalidSize + return 0 + } + x := d.b[0] + d.b = d.b[1:] + return x + +} + func (d *decbuf) decbuf(l int) decbuf { if d.e != nil { return decbuf{e: d.e} diff --git a/tombstones.go b/tombstones.go index 9e6c39931..7abb025a1 100644 --- a/tombstones.go +++ b/tombstones.go @@ -4,9 +4,12 @@ import ( "encoding/binary" "fmt" "hash/crc32" + "io" "io/ioutil" "os" "path/filepath" + + "github.com/pkg/errors" ) const tombstoneFilename = "tombstones" @@ -27,84 +30,36 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { if err != nil { return err } + defer f.Close() - stoneOff := make(map[uint32]int64) // The map that holds the ref to offset vals. - refs := []uint32{} // Sorted refs. - - pos := int64(0) - buf := encbuf{b: make([]byte, 2*binary.MaxVarintLen64)} + buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)} buf.reset() // Write the meta. buf.putBE32(MagicTombstone) buf.putByte(tombstoneFormatV1) - n, err := f.Write(buf.get()) - if err != nil { - return err - } - pos += int64(n) - - for k, v := range tr { - refs = append(refs, k) - stoneOff[k] = pos - - // Write the ranges. - buf.reset() - buf.putUvarint(len(v)) - n, err := f.Write(buf.get()) - if err != nil { - return err - } - pos += int64(n) - - buf.reset() - for _, r := range v { - buf.putVarint64(r.mint) - buf.putVarint64(r.maxt) - } - buf.putHash(hash) - - n, err = f.Write(buf.get()) - if err != nil { - return err - } - pos += int64(n) - } - - // Write the offset table. - // Pad first. - if p := 4 - (int(pos) % 4); p != 0 { - if _, err := f.Write(make([]byte, p)); err != nil { - return err - } - - pos += int64(p) - } - - buf.reset() - buf.putBE32int(len(refs)) - if _, err := f.Write(buf.get()); err != nil { - return err - } - - for _, ref := range refs { - buf.reset() - buf.putBE32(ref) - buf.putBE64int64(stoneOff[ref]) - _, err = f.Write(buf.get()) - if err != nil { - return err - } - } - - // Write the offset to the offset table. - buf.reset() - buf.putBE64int64(pos) _, err = f.Write(buf.get()) if err != nil { return err } - if err := f.Close(); err != nil { + mw := io.MultiWriter(f, hash) + + for k, v := range tr { + for _, itv := range v { + buf.reset() + buf.putUvarint32(k) + buf.putVarint64(itv.mint) + buf.putVarint64(itv.maxt) + + _, err = mw.Write(buf.get()) + if err != nil { + return err + } + } + } + + _, err = f.Write(hash.Sum(nil)) + if err != nil { return err } @@ -129,52 +84,33 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, err } - d := &decbuf{b: b} + d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { return nil, fmt.Errorf("invalid magic number %x", mg) } - - offsetBytes := b[len(b)-8:] - d = &decbuf{b: offsetBytes} - off := d.be64int64() - if err := d.err(); err != nil { - return nil, err + if flag := d.byte(); flag != tombstoneFormatV1 { + return nil, fmt.Errorf("invalid tombstone format %x", flag) } - d = &decbuf{b: b[off:]} - numStones := d.be32int() - if err := d.err(); err != nil { - return nil, err + // Verify checksum + hash := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + if _, err := hash.Write(d.get()); err != nil { + return nil, errors.Wrap(err, "write to hash") + } + if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { + return nil, errors.New("checksum did not match") } - off += 4 // For the numStones which has been read. - stones := b[off : off+int64(numStones*12)] stonesMap := make(map[uint32]intervals) - for len(stones) >= 12 { - d := &decbuf{b: stones[:12]} - ref := d.be32() - off := d.be64int64() - - d = &decbuf{b: b[off:]} - numRanges := d.uvarint() - if err := d.err(); err != nil { - return nil, err + for d.len() > 0 { + k := d.uvarint32() + mint := d.varint64() + maxt := d.varint64() + if d.err() != nil { + return nil, d.err() } - dranges := make(intervals, 0, numRanges) - for i := 0; i < int(numRanges); i++ { - mint := d.varint64() - maxt := d.varint64() - if err := d.err(); err != nil { - return nil, err - } - - dranges = append(dranges, interval{mint, maxt}) - } - - // TODO(gouthamve): Verify checksum. - stones = stones[12:] - stonesMap[ref] = dranges + stonesMap[k] = stonesMap[k].add(interval{mint, maxt}) } return newTombstoneReader(stonesMap), nil diff --git a/tombstones_test.go b/tombstones_test.go index 525d825f7..6469d0fbe 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -20,11 +20,11 @@ func TestWriteAndReadbackTombStones(t *testing.T) { // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint32(rand.Int31n(10)) + 1 - numRanges := rand.Intn(5) - dranges := make(intervals, numRanges) + numRanges := rand.Intn(5) + 1 + dranges := make(intervals, 0, numRanges) mint := rand.Int63n(time.Now().UnixNano()) for j := 0; j < numRanges; j++ { - dranges[j] = interval{mint, mint + rand.Int63n(1000)} + dranges = dranges.add(interval{mint, mint + rand.Int63n(1000)}) mint += rand.Int63n(1000) + 1 } stones[ref] = dranges @@ -36,7 +36,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { require.NoError(t, err) exptr := newTombstoneReader(stones) // Compare the two readers. - require.Equal(t, restr, exptr) + require.Equal(t, exptr, restr) } func TestAddingNewIntervals(t *testing.T) { diff --git a/wal.go b/wal.go index 831a6f7e5..bf0b1af87 100644 --- a/wal.go +++ b/wal.go @@ -488,12 +488,9 @@ func (w *SegmentWAL) encodeDeletes(tr tombstoneReader) error { eb := &encbuf{b: b} buf := getWALBuffer() for k, v := range tr { - eb.reset() - eb.putUvarint32(k) - eb.putUvarint(len(v)) - buf = append(buf, eb.get()...) for _, itv := range v { eb.reset() + eb.putUvarint32(k) eb.putVarint64(itv.mint) eb.putVarint64(itv.maxt) buf = append(buf, eb.get()...) @@ -787,19 +784,12 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) error { for db.len() > 0 { var s stone - s.ref = uint32(db.uvarint()) - l := db.uvarint() + s.ref = db.uvarint32() + s.intervals = intervals{{db.varint64(), db.varint64()}} if db.err() != nil { return db.err() } - for i := 0; i < l; i++ { - s.intervals = append(s.intervals, interval{db.varint64(), db.varint64()}) - if db.err() != nil { - return db.err() - } - } - r.stones = append(r.stones, s) } diff --git a/wal_test.go b/wal_test.go index 605f3d8e6..f706849d4 100644 --- a/wal_test.go +++ b/wal_test.go @@ -149,7 +149,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { var ( recordedSeries [][]labels.Labels recordedSamples [][]RefSample - recordedDeletes [][]stone + recordedDeletes []tombstoneReader ) var totalSamples int @@ -167,7 +167,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { var ( resultSeries [][]labels.Labels resultSamples [][]RefSample - resultDeletes [][]stone + resultDeletes []tombstoneReader ) serf := func(lsets []labels.Labels) error { @@ -191,9 +191,11 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { delf := func(stones []stone) error { if len(stones) > 0 { - cstones := make([]stone, len(stones)) - copy(cstones, stones) - resultDeletes = append(resultDeletes, cstones) + dels := make(map[uint32]intervals) + for _, s := range stones { + dels[s.ref] = s.intervals + } + resultDeletes = append(resultDeletes, newTombstoneReader(dels)) } return nil @@ -240,12 +242,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { } if len(stones) > 0 { tr := newTombstoneReader(stones) - newdels := []stone{} - for k, v := range tr { - newdels = append(newdels, stone{k, v}) - } - - recordedDeletes = append(recordedDeletes, newdels) + recordedDeletes = append(recordedDeletes, tr) } }