mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
chunks: bring back lead/trail reuse, truncate incomplete sample
This commit is contained in:
parent
8c48dc2ca5
commit
0b6d621471
|
@ -20,7 +20,7 @@ func TestChunk(t *testing.T) {
|
||||||
EncXOR: func(sz int) Chunk { return NewXORChunk(sz) },
|
EncXOR: func(sz int) Chunk { return NewXORChunk(sz) },
|
||||||
} {
|
} {
|
||||||
t.Run(fmt.Sprintf("%s", enc), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%s", enc), func(t *testing.T) {
|
||||||
for range make([]struct{}, 3000) {
|
for range make([]struct{}, 1) {
|
||||||
c := nc(rand.Intn(1024))
|
c := nc(rand.Intn(1024))
|
||||||
if err := testChunk(c); err != nil {
|
if err := testChunk(c); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -42,10 +42,14 @@ func testChunk(c Chunk) error {
|
||||||
v = 1243535.123
|
v = 1243535.123
|
||||||
)
|
)
|
||||||
i := 0
|
i := 0
|
||||||
for i < 3 {
|
for {
|
||||||
ts += int64(rand.Intn(10000) + 1)
|
ts += int64(rand.Intn(10000) + 1)
|
||||||
v = rand.Float64()
|
// v = rand.Float64()
|
||||||
// v += float64(100)
|
if i%2 == 0 {
|
||||||
|
v += float64(rand.Intn(1000000))
|
||||||
|
} else {
|
||||||
|
v -= float64(rand.Intn(1000000))
|
||||||
|
}
|
||||||
|
|
||||||
// Start with a new appender every 10th sample. This emulates starting
|
// Start with a new appender every 10th sample. This emulates starting
|
||||||
// appending to a partially filled chunk.
|
// appending to a partially filled chunk.
|
||||||
|
@ -93,7 +97,7 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) {
|
||||||
// t += int64(rand.Intn(10000) + 1)
|
// t += int64(rand.Intn(10000) + 1)
|
||||||
t += int64(1000)
|
t += int64(1000)
|
||||||
// v = rand.Float64()
|
// v = rand.Float64()
|
||||||
v += float64(100)
|
// v += float64(100)
|
||||||
exp = append(exp, pair{t: t, v: v})
|
exp = append(exp, pair{t: t, v: v})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,7 @@ func (c *XORChunk) Appender() (Appender, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &xorAppender{
|
a := &xorAppender{
|
||||||
c: c,
|
c: c,
|
||||||
b: c.b,
|
b: c.b,
|
||||||
t: it.t,
|
t: it.t,
|
||||||
|
@ -56,7 +56,11 @@ func (c *XORChunk) Appender() (Appender, error) {
|
||||||
tDelta: it.tDelta,
|
tDelta: it.tDelta,
|
||||||
leading: it.leading,
|
leading: it.leading,
|
||||||
trailing: it.trailing,
|
trailing: it.trailing,
|
||||||
}, nil
|
}
|
||||||
|
if c.num == 0 {
|
||||||
|
a.leading = 0xff
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *XORChunk) iterator() *xorIterator {
|
func (c *XORChunk) iterator() *xorIterator {
|
||||||
|
@ -88,6 +92,7 @@ type xorAppender struct {
|
||||||
|
|
||||||
func (a *xorAppender) Append(t int64, v float64) error {
|
func (a *xorAppender) Append(t int64, v float64) error {
|
||||||
var tDelta uint64
|
var tDelta uint64
|
||||||
|
l := len(a.b.bytes())
|
||||||
|
|
||||||
if a.c.num == 0 {
|
if a.c.num == 0 {
|
||||||
buf := make([]byte, binary.MaxVarintLen64)
|
buf := make([]byte, binary.MaxVarintLen64)
|
||||||
|
@ -133,6 +138,9 @@ func (a *xorAppender) Append(t int64, v float64) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(a.b.bytes()) > a.c.sz {
|
if len(a.b.bytes()) > a.c.sz {
|
||||||
|
// If the appended data exceeded the size limit, we truncate
|
||||||
|
// the underlying data slice back to the length we started with.
|
||||||
|
a.b.stream = a.b.stream[:l]
|
||||||
return ErrChunkFull
|
return ErrChunkFull
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,13 +164,12 @@ func (a *xorAppender) writeVDelta(v float64) {
|
||||||
leading := uint8(bits.Clz(vDelta))
|
leading := uint8(bits.Clz(vDelta))
|
||||||
trailing := uint8(bits.Ctz(vDelta))
|
trailing := uint8(bits.Ctz(vDelta))
|
||||||
|
|
||||||
// clamp number of leading zeros to avoid overflow when encoding
|
// Clamp number of leading zeros to avoid overflow when encoding.
|
||||||
if leading >= 32 {
|
if leading >= 32 {
|
||||||
leading = 31
|
leading = 31
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead
|
if a.leading != 0xff && leading >= a.leading && trailing >= a.trailing {
|
||||||
if a.leading != ^uint8(0) && leading >= a.leading && trailing >= a.trailing {
|
|
||||||
a.b.writeBit(zero)
|
a.b.writeBit(zero)
|
||||||
a.b.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing))
|
a.b.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing))
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue